CMS 3D CMS Logo

FedRawDataInputSource.cc
Go to the documentation of this file.
1 #include <fcntl.h>
2 #include <iomanip>
3 #include <iostream>
4 #include <memory>
5 #include <sstream>
6 #include <sys/types.h>
7 #include <sys/file.h>
8 #include <sys/time.h>
9 #include <unistd.h>
10 #include <vector>
11 #include <fstream>
12 #include <zlib.h>
13 #include <cstdio>
14 #include <chrono>
15 
16 #include <boost/algorithm/string.hpp>
17 #include <boost/filesystem/fstream.hpp>
18 
19 
24 
26 
33 
35 
37 
41 
43 
48 
49 //JSON file reader
51 
52 #include <boost/lexical_cast.hpp>
53 
54 using namespace jsoncollector;
55 
57  edm::InputSourceDescription const& desc) :
58  edm::RawInputSource(pset, desc),
59  defPath_(pset.getUntrackedParameter<std::string> ("buDefPath", std::string(getenv("CMSSW_BASE"))+"/src/EventFilter/Utilities/plugins/budef.jsd")),
60  eventChunkSize_(pset.getUntrackedParameter<unsigned int> ("eventChunkSize",32)*1048576),
61  eventChunkBlock_(pset.getUntrackedParameter<unsigned int> ("eventChunkBlock",32)*1048576),
62  numBuffers_(pset.getUntrackedParameter<unsigned int> ("numBuffers",2)),
63  maxBufferedFiles_(pset.getUntrackedParameter<unsigned int> ("maxBufferedFiles",2)),
64  getLSFromFilename_(pset.getUntrackedParameter<bool> ("getLSFromFilename", true)),
65  verifyAdler32_(pset.getUntrackedParameter<bool> ("verifyAdler32", true)),
66  verifyChecksum_(pset.getUntrackedParameter<bool> ("verifyChecksum", true)),
67  useL1EventID_(pset.getUntrackedParameter<bool> ("useL1EventID", false)),
68  fileNames_(pset.getUntrackedParameter<std::vector<std::string>> ("fileNames",std::vector<std::string>())),
69  fileListMode_(pset.getUntrackedParameter<bool> ("fileListMode", false)),
70  fileListLoopMode_(pset.getUntrackedParameter<bool> ("fileListLoopMode", false)),
71  runNumber_(edm::Service<evf::EvFDaqDirector>()->getRunNumber()),
72  fuOutputDir_(std::string()),
73  daqProvenanceHelper_(edm::TypeID(typeid(FEDRawDataCollection))),
74  eventID_(),
75  processHistoryID_(),
76  currentLumiSection_(0),
77  tcds_pointer_(nullptr),
78  eventsThisLumi_(0),
79  dpd_(nullptr)
80 {
81  char thishost[256];
82  gethostname(thishost, 255);
83  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: "
84  << std::endl << (eventChunkSize_/1048576)
85  << " MB on host " << thishost;
86 
87  long autoRunNumber = -1;
88  if (fileListMode_) {
89  autoRunNumber = initFileList();
90  if (!fileListLoopMode_) {
91  if (autoRunNumber<0)
92  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") << "Run number not found from filename";
93  //override run number
94  runNumber_ = (edm::RunNumber_t)autoRunNumber;
95  edm::Service<evf::EvFDaqDirector>()->overrideRunNumber((unsigned int)autoRunNumber);
96  }
97  }
98 
100  setNewRun();
101  //todo:autodetect from file name (assert if names differ)
104 
105  std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
106  defPath_ = std::string(getenv("CMSSW_BASE")) + "/" + defPathSuffix;
107  struct stat statbuf;
108  if (stat(defPath_.c_str(), &statbuf)) {
109  defPath_ = std::string(getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
110  if (stat(defPath_.c_str(), &statbuf)) {
111  defPath_ = defPathSuffix;
112  }
113  }
114 
115  dpd_ = new DataPointDefinition();
116  std::string defLabel = "data";
117  DataPointDefinition::getDataPointDefinitionFor(defPath_, dpd_,&defLabel);
118 
119  //make sure that chunk size is N * block size
124 
125  if (!numBuffers_)
126  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") <<
127  "no reading enabled with numBuffers parameter 0";
128 
132 
133  if (!crc32c_hw_test())
134  edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
135 
136  //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
137  if (fileListMode_) {
138  try {
140  } catch(cms::Exception e) {
141  edm::LogInfo("FedRawDataInputSource") << "No FastMonitoringService found in the configuration";
142  }
143  }
144  else {
146  if (!fms_) {
147  throw cms::Exception("FedRawDataInputSource") << "FastMonitoringService not found";
148  }
149  }
150 
152  if (!daqDirector_)
153  cms::Exception("FedRawDataInputSource") << "EvFDaqDirector not found";
154 
155  //set DaqDirector to delete files in preGlobalEndLumi callback
157  if (fms_) {
159  fms_->setInputSource(this);
162  }
163  //should delete chunks when run stops
164  for (unsigned int i=0;i<numBuffers_;i++) {
166  }
167 
168  quit_threads_ = false;
169 
170  for (unsigned int i=0;i<numConcurrentReads_;i++)
171  {
172  std::unique_lock<std::mutex> lk(startupLock_);
173  //issue a memory fence here and in threads (constructor was segfaulting without this)
174  thread_quit_signal.push_back(false);
175  workerJob_.push_back(ReaderInfo(nullptr,nullptr));
176  cvReader_.push_back(new std::condition_variable);
177  tid_active_.push_back(0);
178  threadInit_.store(false,std::memory_order_release);
179  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker,this,i));
180  startupCv_.wait(lk);
181  }
182 
183  runAuxiliary()->setProcessHistoryID(processHistoryID_);
184 }
185 
187 {
188  quit_threads_=true;
189 
190  //delete any remaining open files
191  for (auto it = filesToDelete_.begin();it!=filesToDelete_.end();it++) {
192  deleteFile(it->second->fileName_);
193  delete it->second;
194  }
196  readSupervisorThread_->join();
197  }
198  else {
199  //join aux threads in case the supervisor thread was not started
200  for (unsigned int i=0;i<workerThreads_.size();i++) {
201  std::unique_lock<std::mutex> lk(mReader_);
202  thread_quit_signal[i]=true;
203  cvReader_[i]->notify_one();
204  lk.unlock();
205  workerThreads_[i]->join();
206  delete workerThreads_[i];
207  }
208  }
209  for (unsigned int i=0;i<numConcurrentReads_;i++) delete cvReader_[i];
210  /*
211  for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
212  InputChunk *ch;
213  while (!freeChunks_.try_pop(ch)) {}
214  delete ch;
215  }
216  */
217 }
218 
220 {
222  desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk");
223  desc.addUntracked<unsigned int> ("eventChunkSize",32)->setComment("Input buffer (chunk) size");
224  desc.addUntracked<unsigned int> ("eventChunkBlock",32)->setComment("Block size used in a single file read call (must be smaller or equal to buffer size)");
225  desc.addUntracked<unsigned int> ("numBuffers",2)->setComment("Number of buffers used for reading input");
226  desc.addUntracked<unsigned int> ("maxBufferedFiles",2)->setComment("Maximum number of simultaneously buffered raw files");
227  desc.addUntracked<bool> ("verifyAdler32", true)->setComment("Verify event Adler32 checksum with FRDv3 or v4");
228  desc.addUntracked<bool> ("verifyChecksum", true)->setComment("Verify event CRC-32C checksum of FRDv5 or higher");
229  desc.addUntracked<bool> ("useL1EventID", false)->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
230  desc.addUntracked<bool> ("fileListMode", false)->setComment("Use fileNames parameter to directly specify raw files to open");
231  desc.addUntracked<std::vector<std::string>> ("fileNames", std::vector<std::string>())->setComment("file list used when fileListMode is enabled");
232  desc.setAllowAnything();
233  descriptions.add("source", desc);
234 }
235 
237 {
239  {
240  //late init of directory variable
242 
243  //this thread opens new files and dispatches reading to worker readers
244  //threadInit_.store(false,std::memory_order_release);
245  std::unique_lock<std::mutex> lk(startupLock_);
246  readSupervisorThread_.reset(new std::thread(&FedRawDataInputSource::readSupervisor,this));
248  startupCv_.wait(lk);
249  }
250  //signal hltd to start event accounting
251  if (!currentLumiSection_)
254  switch (nextEvent() ) {
256  //maybe create EoL file in working directory before ending run
257  struct stat buf;
258  if ( currentLumiSection_ > 0) {
259  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
260  if (eolFound) {
262  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
263  if ( !found ) {
265  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
266  close(eol_fd);
268  }
269  }
270  }
271  //also create EoR file in FU data directory
272  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(),&buf) == 0);
273  if (!eorFound) {
274  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
275  close(eor_fd);
276  }
278  eventsThisLumi_=0;
280  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
281  return false;
282  }
284  //this is not reachable
285  return true;
286  }
288  //std::cout << "--------------NEW LUMI---------------" << std::endl;
289  return true;
290  }
291  default: {
292  if (!getLSFromFilename_) {
293  //get new lumi from file header
294  if (event_->lumi() > currentLumiSection_) {
296  eventsThisLumi_=0;
297  maybeOpenNewLumiSection( event_->lumi() );
298  }
299  }
302  else
303  eventRunNumber_=event_->run();
304  L1EventID_ = event_->event();
305 
306  setEventCached();
307 
308  return true;
309  }
310  }
311 }
312 
313 void FedRawDataInputSource::createBoLSFile(const uint32_t lumiSection, bool checkIfExists)
314 {
315  //used for backpressure mechanisms and monitoring
316  const std::string fuBoLS = daqDirector_->getBoLSFilePathOnFU(lumiSection);
317  struct stat buf;
318  if (checkIfExists==false || stat(fuBoLS.c_str(), &buf) != 0) {
319  int bol_fd = open(fuBoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
320  close(bol_fd);
321  }
322 }
323 
324 void FedRawDataInputSource::maybeOpenNewLumiSection(const uint32_t lumiSection)
325 {
327  || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
328 
329  if ( currentLumiSection_ > 0) {
330  const std::string fuEoLS =
332  struct stat buf;
333  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
334  if ( !found ) {
336  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
337  close(eol_fd);
338  createBoLSFile(lumiSection,false);
340  }
341  }
342  else createBoLSFile(lumiSection,true);//needed for initial lumisection
343 
344  currentLumiSection_ = lumiSection;
345 
347 
348  timeval tv;
349  gettimeofday(&tv, nullptr);
350  const edm::Timestamp lsopentime( (unsigned long long) tv.tv_sec * 1000000 + (unsigned long long) tv.tv_usec );
351 
352  edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary =
354  runAuxiliary()->run(),
355  lumiSection, lsopentime,
357 
358  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
359  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
360 
361  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: "<< lumiSection;
362  }
363 }
364 
366 {
368  while ((status = getNextEvent())==evf::EvFDaqDirector::noFile)
369  {
370  if (edm::shutdown_flag.load(std::memory_order_relaxed)) break;
371  }
372  return status;
373 }
374 
376 {
377 
379  if (!currentFile_)
380  {
383  if (!fileQueue_.try_pop(currentFile_))
384  {
385  //sleep until wakeup (only in single-buffer mode) or timeout
386  std::unique_lock<std::mutex> lkw(mWakeup_);
387  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_)
389  }
390  status = currentFile_->status_;
391  if ( status == evf::EvFDaqDirector::runEnded)
392  {
394  delete currentFile_;
395  currentFile_=nullptr;
396  return status;
397  }
398  else if ( status == evf::EvFDaqDirector::runAbort)
399  {
400  throw cms::Exception("FedRawDataInputSource::getNextEvent") << "Run has been aborted by the input source reader thread";
401  }
402  else if (status == evf::EvFDaqDirector::newLumi)
403  {
405  if (getLSFromFilename_) {
408  eventsThisLumi_=0;
410  }
411  }
412  else {//let this be picked up from next event
414  }
415 
416  delete currentFile_;
417  currentFile_=nullptr;
418  return status;
419  }
420  else if (status == evf::EvFDaqDirector::newFile) {
422  }
423  else
424  assert(false);
425  }
427 
428  //file is empty
429  if (!currentFile_->fileSize_) {
431  //try to open new lumi
432  assert(currentFile_->nChunks_==0);
433  if (getLSFromFilename_)
436  eventsThisLumi_=0;
438  }
439  //immediately delete empty file
441  delete currentFile_;
442  currentFile_=nullptr;
444  }
445 
446  //file is finished
449  //release last chunk (it is never released elsewhere)
452  {
453  throw cms::Exception("FedRawDataInputSource::getNextEvent")
454  << "Fully processed " << currentFile_->nProcessed_
455  << " from the file " << currentFile_->fileName_
456  << " but according to BU JSON there should be "
457  << currentFile_->nEvents_ << " events";
458  }
459  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
460  if (singleBufferMode_) {
461  std::unique_lock<std::mutex> lkw(mWakeup_);
462  cvWakeup_.notify_one();
463  }
466  //put the file in pending delete list;
467  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
468  filesToDelete_.push_back(std::pair<int,InputFile*>(currentFileIndex_,currentFile_));
469  }
470  else {
471  //in single-thread and stream jobs, events are already processed
473  delete currentFile_;
474  }
475  currentFile_=nullptr;
477  }
478 
479 
480  //file is too short
482  {
483  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
484  "Premature end of input file while reading event header";
485  }
486  if (singleBufferMode_) {
487 
488  //should already be there
491  usleep(10000);
493  }
495 
496  unsigned char *dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
497 
498  //conditions when read amount is not sufficient for the header to fit
499  if (!bufferInputRead_ || bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_]
501  {
503 
504  if (detectedFRDversion_==0) {
505  detectedFRDversion_=*((uint32*)dataPosition);
506  if (detectedFRDversion_>5)
507  throw cms::Exception("FedRawDataInputSource::getNextEvent")
508  << "Unknown FRD version -: " << detectedFRDversion_;
509  assert(detectedFRDversion_>=1);
510  }
511 
512  //recalculate chunk position
513  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
514  if ( bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_])
515  {
516  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
517  "Premature end of input file while reading event header";
518  }
519  }
520 
521  event_.reset( new FRDEventMsgView(dataPosition) );
522  if (event_->size()>eventChunkSize_) {
523  throw cms::Exception("FedRawDataInputSource::getNextEvent")
524  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
525  << " run:" << event_->run() << " of size:" << event_->size()
526  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
527  }
528 
529  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
530 
532  {
533  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
534  "Premature end of input file while reading event data";
535  }
536  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
538  //recalculate chunk position
539  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
540  event_.reset( new FRDEventMsgView(dataPosition) );
541  }
542  currentFile_->bufferPosition_ += event_->size();
543  currentFile_->chunkPosition_ += event_->size();
544  //last chunk is released when this function is invoked next time
545 
546  }
547  //multibuffer mode:
548  else
549  {
550  //wait for the current chunk to become added to the vector
553  usleep(10000);
555  }
557 
558  //check if header is at the boundary of two chunks
559  chunkIsFree_ = false;
560  unsigned char *dataPosition;
561 
562  //read header, copy it to a single chunk if necessary
563  bool chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]);
564 
565  event_.reset( new FRDEventMsgView(dataPosition) );
566  if (event_->size()>eventChunkSize_) {
567  throw cms::Exception("FedRawDataInputSource::getNextEvent")
568  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
569  << " run:" << event_->run() << " of size:" << event_->size()
570  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
571  }
572 
573  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
574 
576  {
577  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
578  "Premature end of input file while reading event data";
579  }
580 
581  if (chunkEnd) {
582  //header was at the chunk boundary, we will have to move payload as well
583  currentFile_->moveToPreviousChunk(msgSize,FRDHeaderVersionSize[detectedFRDversion_]);
584  chunkIsFree_ = true;
585  }
586  else {
587  //header was contiguous, but check if payload fits the chunk
588  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
589  //rewind to header start position
590  currentFile_->rewindChunk(FRDHeaderVersionSize[detectedFRDversion_]);
591  //copy event to a chunk start and move pointers
592  chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]+msgSize);
593  assert(chunkEnd);
594  chunkIsFree_=true;
595  //header is moved
596  event_.reset( new FRDEventMsgView(dataPosition) );
597  }
598  else {
599  //everything is in a single chunk, only move pointers forward
600  chunkEnd = currentFile_->advance(dataPosition,msgSize);
601  assert(!chunkEnd);
602  chunkIsFree_=false;
603  }
604  }
605  }//end multibuffer mode
607 
608  if (verifyChecksum_ && event_->version() >= 5)
609  {
610  uint32_t crc=0;
611  crc = crc32c(crc,(const unsigned char*)event_->payload(),event_->eventSize());
612  if ( crc != event_->crc32c() ) {
614  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
615  "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() <<
616  " but calculated 0x" << crc;
617  }
618  }
619  else if ( verifyAdler32_ && event_->version() >= 3)
620  {
621  uint32_t adler = adler32(0L,Z_NULL,0);
622  adler = adler32(adler,(Bytef*)event_->payload(),event_->eventSize());
623 
624  if ( adler != event_->adler32() ) {
626  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
627  "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() <<
628  " but calculated 0x" << adler;
629  }
630  }
632 
634 
636 }
637 
639 {
640  //no deletion in file list mode
641  if (fileListMode_) return;
642 
643  const boost::filesystem::path filePath(fileName);
644  LogDebug("FedRawDataInputSource") << "Deleting input file -:" << fileName;
645  try {
646  //sometimes this fails but file gets deleted
647  boost::filesystem::remove(filePath);
648  }
649  catch (const boost::filesystem::filesystem_error& ex)
650  {
651  edm::LogError("FedRawDataInputSource") << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
652  << ". Trying again.";
653  usleep(100000);
654  try {
655  boost::filesystem::remove(filePath);
656  }
657  catch (const boost::filesystem::filesystem_error&) {/*file gets deleted first time but exception is still thrown*/}
658  }
659  catch (std::exception& ex)
660  {
661  edm::LogError("FedRawDataInputSource") << " - deleteFile std::exception CAUGHT -: " << ex.what()
662  << ". Trying again.";
663  usleep(100000);
664  try {
665  boost::filesystem::remove(filePath);
666  } catch (std::exception&) {/*file gets deleted first time but exception is still thrown*/}
667  }
668 }
669 
670 
672 {
674  std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
675  edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData);
676 
677  if (useL1EventID_){
679  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
682  makeEvent(eventPrincipal, aux);
683  }
684  else if(tcds_pointer_==nullptr){
685  assert(GTPEventID_);
687  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
690  makeEvent(eventPrincipal, aux);
691  }
692  else{
693  const FEDHeader fedHeader(tcds_pointer_);
694  tcds::Raw_v1 const* tcds = reinterpret_cast<tcds::Raw_v1 const*>(tcds_pointer_ + FEDHeader::length);
697  static_cast<edm::EventAuxiliary::ExperimentType>(fedHeader.triggerType()),
700  makeEvent(eventPrincipal, aux);
701  }
702 
703 
704 
705  std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
706 
707  eventPrincipal.put(daqProvenanceHelper_.branchDescription(), std::move(edp),
709 
710  eventsThisLumi_++;
712 
713  //resize vector if needed
714  while (streamFileTracker_.size() <= eventPrincipal.streamID())
715  streamFileTracker_.push_back(-1);
716 
718 
719  //this old file check runs no more often than every 10 events
720  if (!((currentFile_->nProcessed_-1)%(checkEvery_))) {
721  //delete files that are not in processing
722  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
723  auto it = filesToDelete_.begin();
724  while (it!=filesToDelete_.end()) {
725  bool fileIsBeingProcessed = false;
726  for (unsigned int i=0;i<nStreams_;i++) {
727  if (it->first == streamFileTracker_.at(i)) {
728  fileIsBeingProcessed = true;
729  break;
730  }
731  }
732  if (!fileIsBeingProcessed) {
733  deleteFile(it->second->fileName_);
734  delete it->second;
735  it = filesToDelete_.erase(it);
736  }
737  else it++;
738  }
739 
740  }
742  chunkIsFree_=false;
744  return;
745 }
746 
748 {
750  timeval stv;
751  gettimeofday(&stv,nullptr);
752  time = stv.tv_sec;
753  time = (time << 32) + stv.tv_usec;
754  edm::Timestamp tstamp(time);
755 
756  uint32_t eventSize = event_->eventSize();
757  unsigned char* event = (unsigned char*)event_->payload();
758  GTPEventID_=0;
759  tcds_pointer_ = nullptr;
760  while (eventSize > 0) {
761  assert(eventSize>=FEDTrailer::length);
762  eventSize -= FEDTrailer::length;
763  const FEDTrailer fedTrailer(event + eventSize);
764  const uint32_t fedSize = fedTrailer.fragmentLength() << 3; //trailer length counts in 8 bytes
765  assert(eventSize>=fedSize - FEDHeader::length);
766  eventSize -= (fedSize - FEDHeader::length);
767  const FEDHeader fedHeader(event + eventSize);
768  const uint16_t fedId = fedHeader.sourceID();
769  if(fedId>FEDNumbering::MAXFEDID)
770  {
771  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
772  }
773  if (fedId == FEDNumbering::MINTCDSuTCAFEDID) {
774  tcds_pointer_ = event + eventSize;
775  }
776  if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
777  if (evf::evtn::evm_board_sense(event + eventSize,fedSize))
778  GTPEventID_ = evf::evtn::get(event + eventSize,true);
779  else
780  GTPEventID_ = evf::evtn::get(event + eventSize,false);
781  //evf::evtn::evm_board_setformat(fedSize);
782  const uint64_t gpsl = evf::evtn::getgpslow(event + eventSize);
783  const uint64_t gpsh = evf::evtn::getgpshigh(event + eventSize);
784  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
785  }
786  //take event ID from GTPE FED
787  if (fedId == FEDNumbering::MINTriggerEGTPFEDID && GTPEventID_==0) {
788  if (evf::evtn::gtpe_board_sense(event + eventSize)) {
789  GTPEventID_ = evf::evtn::gtpe_get(event + eventSize);
790  }
791  }
792  FEDRawData& fedData = rawData.FEDData(fedId);
793  fedData.resize(fedSize);
794  memcpy(fedData.data(), event + eventSize, fedSize);
795  }
796  assert(eventSize == 0);
797 
798  return tstamp;
799 }
800 
802 {
804  try {
805  // assemble json destination path
807 
808  //TODO:should be ported to use fffnaming
809  std::ostringstream fileNameWithPID;
810  fileNameWithPID << jsonSourcePath.stem().string() << "_pid"
811  << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
812  jsonDestPath /= fileNameWithPID.str();
813 
814  LogDebug("FedRawDataInputSource") << "JSON rename -: " << jsonSourcePath << " to "
815  << jsonDestPath;
816  try {
817  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
818  }
819  catch (const boost::filesystem::filesystem_error& ex)
820  {
821  // Input dir gone?
822  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
823  // << " Maybe the file is not yet visible by FU. Trying again in one second";
824  sleep(1);
825  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
826  }
828 
829  try {
830  //sometimes this fails but file gets deleted
831  boost::filesystem::remove(jsonSourcePath);
832  }
833  catch (const boost::filesystem::filesystem_error& ex)
834  {
835  // Input dir gone?
836  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
837  }
838  catch (std::exception& ex)
839  {
840  // Input dir gone?
841  edm::LogError("FedRawDataInputSource") << "grabNextFile std::exception CAUGHT -: " << ex.what();
842  }
843 
844  boost::filesystem::ifstream ij(jsonDestPath);
845  Json::Value deserializeRoot;
847 
848  std::stringstream ss;
849  ss << ij.rdbuf();
850  if (!reader.parse(ss.str(), deserializeRoot)) {
851  edm::LogError("FedRawDataInputSource") << "Failed to deserialize JSON file -: " << jsonDestPath
852  << "\nERROR:\n" << reader.getFormatedErrorMessages()
853  << "CONTENT:\n" << ss.str()<<".";
854  throw std::runtime_error("Cannot deserialize input JSON file");
855  }
856 
857  //read BU JSON
859  DataPoint dp;
860  dp.deserialize(deserializeRoot);
861  bool success = false;
862  for (unsigned int i=0;i<dpd_->getNames().size();i++) {
863  if (dpd_->getNames().at(i)=="NEvents")
864  if (i<dp.getData().size()) {
865  data = dp.getData()[i];
866  success=true;
867  }
868  }
869  if (!success) {
870  if (!dp.getData().empty())
871  data = dp.getData()[0];
872  else
873  throw cms::Exception("FedRawDataInputSource::grabNextJsonFile") <<
874  " error reading number of events from BU JSON -: No input value " << data;
875  }
876  return boost::lexical_cast<int>(data);
877 
878  }
879  catch (const boost::filesystem::filesystem_error& ex)
880  {
881  // Input dir gone?
883  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
884  }
885  catch (std::runtime_error e)
886  {
887  // Another process grabbed the file and NFS did not register this
889  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - runtime Exception -: " << e.what();
890  }
891 
892  catch( boost::bad_lexical_cast const& ) {
893  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - error parsing number of events from BU JSON. "
894  << "Input value is -: " << data;
895  }
896 
897  catch (std::exception e)
898  {
899  // BU run directory disappeared?
901  edm::LogError("FedRawDataInputSource") << "grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
902  }
903 
904  return -1;
905 }
906 
908 {}
909 
910 
912 {
913  bool stop=false;
914  unsigned int currentLumiSection = 0;
915  //threadInit_.exchange(true,std::memory_order_acquire);
916 
917  {
918  std::unique_lock<std::mutex> lk(startupLock_);
919  startupCv_.notify_one();
920  }
921 
922  uint32_t ls=0;
923  uint32_t monLS=1;
924  uint32_t lockCount=0;
925  uint64_t sumLockWaitTimeUs=0.;
926 
927  while (!stop) {
928 
929  //wait for at least one free thread and chunk
930  int counter=0;
932  {
933  //report state to monitoring
934  if (fms_) {
935  bool copy_active=false;
936  for (auto j : tid_active_) if (j) copy_active=true;
938  else if (freeChunks_.empty()) {
939  if (copy_active)
941  else
943  }
944  else {
945  if (copy_active)
947  else
949  }
950  }
951  std::unique_lock<std::mutex> lkw(mWakeup_);
952  //sleep until woken up by condition or a timeout
953  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
954  counter++;
955  //if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads...";
956  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
957  }
958  else {
959  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
960  }
961  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {stop=true;break;}
962  }
963 
964  if (stop) break;
965 
966  //look for a new file
967  std::string nextFile;
968  uint32_t fileSize;
969 
970  if (fms_) {
974  }
975 
977 
978  while (status == evf::EvFDaqDirector::noFile) {
979  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
980  stop=true;
981  break;
982  }
983 
984  uint64_t thisLockWaitTimeUs=0.;
985  if (fileListMode_) {
986  //return LS if LS not set, otherwise return file
987  status = getFile(ls,nextFile,fileSize,thisLockWaitTimeUs);
988  }
989  else
990  status = daqDirector_->updateFuLock(ls,nextFile,fileSize,thisLockWaitTimeUs);
991 
993 
994  if (currentLumiSection!=ls && status==evf::EvFDaqDirector::runEnded) status=evf::EvFDaqDirector::noFile;
995 
996  //monitoring of lock wait time
997  if (thisLockWaitTimeUs>0.)
998  sumLockWaitTimeUs+=thisLockWaitTimeUs;
999  lockCount++;
1000  if (ls>monLS) {
1001  monLS=ls;
1002  if (lockCount)
1003  if (fms_) fms_->reportLockWait(monLS,sumLockWaitTimeUs,lockCount);
1004  lockCount=0;
1005  sumLockWaitTimeUs=0;
1006  }
1007 
1008  //check again for any remaining index/EoLS files after EoR file is seen
1009  if ( status == evf::EvFDaqDirector::runEnded && !fileListMode_) {
1011  usleep(100000);
1012  //now all files should have appeared in ramdisk, check again if any raw files were left behind
1013  status = daqDirector_->updateFuLock(ls,nextFile,fileSize,thisLockWaitTimeUs);
1014  if (currentLumiSection!=ls && status==evf::EvFDaqDirector::runEnded) status=evf::EvFDaqDirector::noFile;
1015  }
1016 
1017  if ( status == evf::EvFDaqDirector::runEnded) {
1019  stop=true;
1020  break;
1021  }
1022 
1023  //error from filelocking function
1024  if (status == evf::EvFDaqDirector::runAbort) {
1026  stop=true;
1027  break;
1028  }
1029  //queue new lumisection
1030  if( getLSFromFilename_ && ls > currentLumiSection) {
1031  //fms_->setInStateSup(evf::FastMonitoringThread::inSupNewLumi);
1032  currentLumiSection = ls;
1033  fileQueue_.push(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
1034  }
1035 
1036  if( getLSFromFilename_ && currentLumiSection>0 && ls < currentLumiSection) {
1037  edm::LogError("FedRawDataInputSource") << "Got old LS ("<<ls<<") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<". Aborting execution."<<std::endl;
1039  stop=true;
1040  break;
1041  }
1042 
1043  int dbgcount=0;
1044  if (status == evf::EvFDaqDirector::noFile) {
1046  dbgcount++;
1047  if (!(dbgcount%20)) LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
1048  usleep(100000);
1049  }
1050  }
1051  if ( status == evf::EvFDaqDirector::newFile ) {
1053  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
1054 
1055 
1056  boost::filesystem::path rawFilePath(nextFile);
1057  std::string rawFile = rawFilePath.replace_extension(".raw").string();
1058 
1059  struct stat st;
1060  int stat_res = stat(rawFile.c_str(),&st);
1061  if (stat_res==-1) {
1062  edm::LogError("FedRawDataInputSource") << "Can not stat file (" << errno << "):-"<< rawFile << std::endl;
1063  setExceptionState_=true;
1064  break;
1065  }
1066  fileSize=st.st_size;
1067 
1068  if (fms_) {
1072  }
1073  int eventsInNewFile;
1074  if (fileListMode_) {
1075  if (fileSize==0) eventsInNewFile=0;
1076  else eventsInNewFile=-1;
1077  }
1078  else {
1079  eventsInNewFile = grabNextJsonFile(nextFile);
1080  assert( eventsInNewFile>=0 );
1081  assert((eventsInNewFile>0) == (fileSize>0));//file without events must be empty
1082  }
1083 
1084  if (!singleBufferMode_) {
1085  //calculate number of needed chunks
1086  unsigned int neededChunks = fileSize/eventChunkSize_;
1087  if (fileSize%eventChunkSize_) neededChunks++;
1088 
1089  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,this);
1091  fileQueue_.push(newInputFile);
1092 
1093  for (unsigned int i=0;i<neededChunks;i++) {
1094 
1095  if (fms_) {
1096  bool copy_active=false;
1097  for (auto j : tid_active_) if (j) copy_active=true;
1098  if (copy_active)
1100  else
1102  }
1103  //get thread
1104  unsigned int newTid = 0xffffffff;
1105  while (!workerPool_.try_pop(newTid)) {
1106  usleep(100000);
1107  }
1108 
1109  if (fms_) {
1110  bool copy_active=false;
1111  for (auto j : tid_active_) if (j) copy_active=true;
1112  if (copy_active)
1114  else
1116  }
1117  InputChunk * newChunk = nullptr;
1118  while (!freeChunks_.try_pop(newChunk)) {
1119  usleep(100000);
1120  if (quit_threads_.load(std::memory_order_relaxed)) break;
1121  }
1122 
1123  if (newChunk == nullptr) {
1124  //return unused tid if we received shutdown (nullptr chunk)
1125  if (newTid!=0xffffffff) workerPool_.push(newTid);
1126  stop = true;
1127  break;
1128  }
1130 
1131  std::unique_lock<std::mutex> lk(mReader_);
1132 
1133  unsigned int toRead = eventChunkSize_;
1134  if (i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
1135  newChunk->reset(i*eventChunkSize_,toRead,i);
1136 
1137  workerJob_[newTid].first=newInputFile;
1138  workerJob_[newTid].second=newChunk;
1139 
1140  //wake up the worker thread
1141  cvReader_[newTid]->notify_one();
1142  }
1143  }
1144  else {
1145  if (!eventsInNewFile) {
1146  //still queue file for lumi update
1147  std::unique_lock<std::mutex> lkw(mWakeup_);
1148  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,this);
1150  fileQueue_.push(newInputFile);
1151  cvWakeup_.notify_one();
1152  break;
1153  }
1154  //in single-buffer mode put single chunk in the file and let the main thread read the file
1155  InputChunk * newChunk;
1156  //should be available immediately
1157  while(!freeChunks_.try_pop(newChunk)) usleep(100000);
1158 
1159  std::unique_lock<std::mutex> lkw(mWakeup_);
1160 
1161  unsigned int toRead = eventChunkSize_;
1162  if (fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
1163  newChunk->reset(0,toRead,0);
1164  newChunk->readComplete_=true;
1165 
1166  //push file and wakeup main thread
1167  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,this);
1168  newInputFile->chunks_[0]=newChunk;
1170  fileQueue_.push(newInputFile);
1171  cvWakeup_.notify_one();
1172  }
1173  }
1174  }
1176  //make sure threads finish reading
1177  unsigned numFinishedThreads = 0;
1178  while (numFinishedThreads < workerThreads_.size()) {
1179  unsigned int tid;
1180  while (!workerPool_.try_pop(tid)) {usleep(10000);}
1181  std::unique_lock<std::mutex> lk(mReader_);
1182  thread_quit_signal[tid]=true;
1183  cvReader_[tid]->notify_one();
1184  numFinishedThreads++;
1185  }
1186  for (unsigned int i=0;i<workerThreads_.size();i++) {
1187  workerThreads_[i]->join();
1188  delete workerThreads_[i];
1189  }
1190 }
1191 
1193 {
1194  bool init = true;
1195  threadInit_.exchange(true,std::memory_order_acquire);
1196 
1197  while (true) {
1198 
1199  tid_active_[tid]=false;
1200  std::unique_lock<std::mutex> lk(mReader_);
1201  workerJob_[tid].first=nullptr;
1202  workerJob_[tid].first=nullptr;
1203 
1204  assert(!thread_quit_signal[tid]);//should never get it here
1205  workerPool_.push(tid);
1206 
1207  if (init) {
1208  std::unique_lock<std::mutex> lk(startupLock_);
1209  init = false;
1210  startupCv_.notify_one();
1211  }
1212  cvReader_[tid]->wait(lk);
1213 
1214  if (thread_quit_signal[tid]) return;
1215  tid_active_[tid]=true;
1216 
1217  InputFile * file;
1218  InputChunk * chunk;
1219 
1220  assert(workerJob_[tid].first!=nullptr && workerJob_[tid].second!=nullptr);
1221 
1222  file = workerJob_[tid].first;
1223  chunk = workerJob_[tid].second;
1224 
1225  int fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1226  off_t pos = lseek(fileDescriptor,chunk->offset_,SEEK_SET);
1227 
1228 
1229  if (fileDescriptor>=0)
1230  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_ << " at offset " << pos;
1231  else
1232  {
1233  edm::LogError("FedRawDataInputSource") <<
1234  "readWorker failed to open file -: " << file->fileName_ << " fd:" << fileDescriptor <<
1235  " or seek to offset " << chunk->offset_ << ", lseek returned:" << pos;
1236  setExceptionState_=true;
1237  return;
1238 
1239  }
1240 
1241  unsigned int bufferLeft = 0;
1243  for (unsigned int i=0;i<readBlocks_;i++)
1244  {
1245  const ssize_t last = ::read(fileDescriptor,( void*) (chunk->buf_+bufferLeft), eventChunkBlock_);
1246  if ( last > 0 )
1247  bufferLeft+=last;
1248  if (last < eventChunkBlock_) {
1249  assert(chunk->usedSize_==i*eventChunkBlock_+last);
1250  break;
1251  }
1252  }
1254  auto diff = end-start;
1255  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1256  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB" << " in " << msec.count() << " ms ("<< (bufferLeft >> 20)/double(msec.count())<<" GB/s)";
1257  close(fileDescriptor);
1258 
1259  if (detectedFRDversion_==0 && chunk->offset_==0) detectedFRDversion_=*((uint32*)chunk->buf_);
1260  assert(detectedFRDversion_<=5);
1261  chunk->readComplete_=true;//this is atomic to secure the sequential buffer fill before becoming available for processing)
1262  file->chunks_[chunk->fileIndex_]=chunk;//put the completed chunk in the file chunk vector at predetermined index
1263 
1264  }
1265 }
1266 
1268 {
1269  quit_threads_=true;
1270  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1271 
1272 }
1273 
1274 
1275 inline bool InputFile::advance(unsigned char* & dataPosition, const size_t size)
1276 {
1277  //wait for chunk
1278  while (!waitForChunk(currentChunk_)) {
1279  usleep(100000);
1280  if (parent_->exceptionState()) parent_->threadError();
1281  }
1282 
1283  dataPosition = chunks_[currentChunk_]->buf_+ chunkPosition_;
1284  size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_;
1285 
1286  if (currentLeft < size) {
1287 
1288  //we need next chunk
1289  while (!waitForChunk(currentChunk_+1)) {
1290  usleep(100000);
1291  if (parent_->exceptionState()) parent_->threadError();
1292  }
1293  //copy everything to beginning of the first chunk
1294  dataPosition-=chunkPosition_;
1295  assert(dataPosition==chunks_[currentChunk_]->buf_);
1296  memmove(chunks_[currentChunk_]->buf_, chunks_[currentChunk_]->buf_+chunkPosition_, currentLeft);
1297  memcpy(chunks_[currentChunk_]->buf_ + currentLeft, chunks_[currentChunk_+1]->buf_, size - currentLeft);
1298  //set pointers at the end of the old data position
1299  bufferPosition_+=size;
1300  chunkPosition_=size-currentLeft;
1301  currentChunk_++;
1302  return true;
1303  }
1304  else {
1305  chunkPosition_+=size;
1306  bufferPosition_+=size;
1307  return false;
1308  }
1309 }
1310 
1311 inline void InputFile::moveToPreviousChunk(const size_t size, const size_t offset)
1312 {
1313  //this will fail in case of events that are too large
1314  assert(size < chunks_[currentChunk_]->size_ - chunkPosition_);
1315  assert(size - offset < chunks_[currentChunk_]->size_);
1316  memcpy(chunks_[currentChunk_-1]->buf_+offset,chunks_[currentChunk_]->buf_+chunkPosition_,size);
1317  chunkPosition_+=size;
1318  bufferPosition_+=size;
1319 }
1320 
1321 inline void InputFile::rewindChunk(const size_t size) {
1322  chunkPosition_-=size;
1323  bufferPosition_-=size;
1324 }
1325 
1326 //single-buffer mode file reading
1328 {
1329 
1330  if (fileDescriptor_<0) {
1331  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1332  bufferInputRead_ = 0;
1333  //off_t pos = lseek(fileDescriptor,0,SEEK_SET);
1334  if (fileDescriptor_>=0)
1335  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1336  else
1337  {
1338  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer") << "failed to open file " << std::endl
1339  << file->fileName_ << " fd:" << fileDescriptor_;
1340  }
1341  }
1342 
1343  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1344  uint32_t existingSize = 0;
1345  for (unsigned int i=0;i<readBlocks_;i++)
1346  {
1347  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1349  existingSize+=last;
1350  }
1351  }
1352  else {
1353  const uint32_t chunksize = file->chunkPosition_;
1354  const uint32_t blockcount=chunksize/eventChunkBlock_;
1355  const uint32_t leftsize = chunksize%eventChunkBlock_;
1356  uint32_t existingSize = eventChunkSize_ - file->chunkPosition_;
1357  memmove((void*) file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSize);
1358 
1359  for (uint32_t i=0;i<blockcount;i++) {
1360  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1362  existingSize+=last;
1363  }
1364  if (leftsize) {
1365  const ssize_t last = ::read(fileDescriptor_,( void*)( file->chunks_[0]->buf_ + existingSize ), leftsize);
1367  }
1368  file->chunkPosition_=0;//data was moved to beginning of the chunk
1369  }
1370  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1371  if (fileDescriptor_!=-1)
1372  {
1373  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1374  close(fileDescriptor_);
1375  fileDescriptor_=-1;
1376  }
1377  }
1378 }
1379 
1380 
1382 {
1383 
1384  std::lock_guard<std::mutex> lock(monlock_);
1385  auto itr = sourceEventsReport_.find(lumi);
1386  if (itr!=sourceEventsReport_.end())
1387  itr->second+=events;
1388  else
1389  sourceEventsReport_[lumi]=events;
1390 }
1391 
1392 std::pair<bool,unsigned int> FedRawDataInputSource::getEventReport(unsigned int lumi, bool erase)
1393 {
1394  std::lock_guard<std::mutex> lock(monlock_);
1395  auto itr = sourceEventsReport_.find(lumi);
1396  if (itr!=sourceEventsReport_.end()) {
1397  auto && ret = std::pair<bool,unsigned int>(true,itr->second);
1398  if (erase)
1399  sourceEventsReport_.erase(itr);
1400  return ret;
1401  }
1402  else
1403  return std::pair<bool,unsigned int>(false,0);
1404 }
1405 
1407 {
1408  std::sort(fileNames_.begin(),fileNames_.end(),
1409  [](std::string a, std::string b) {
1410  if (a.rfind("/")!=std::string::npos) a=a.substr(a.rfind("/"));
1411  if (b.rfind("/")!=std::string::npos) b=b.substr(b.rfind("/"));
1412  return b > a;});
1413 
1414  if (!fileNames_.empty()) {
1415  //get run number from first file in the vector
1417  std::string fileStem = fileName.stem().string();
1418  auto end = fileStem.find("_");
1419  if (fileStem.find("run")==0) {
1420  std::string runStr = fileStem.substr(3,end-3);
1421  try {
1422  //get long to support test run numbers < 2^32
1423  long rval = boost::lexical_cast<long>(runStr);
1424  edm::LogInfo("FedRawDataInputSource") << "Autodetected run number in fileListMode -: "<< rval;
1425  return rval;
1426  }
1427  catch( boost::bad_lexical_cast const& ) {
1428  edm::LogWarning("FedRawDataInputSource") << "Unable to autodetect run number in fileListMode from file -: "<< fileName;
1429  }
1430  }
1431  }
1432  return -1;
1433 }
1434 
1435 evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getFile(unsigned int& ls, std::string& nextFile, uint32_t& fsize, uint64_t& lockWaitTime)
1436 {
1437  if (fileListIndex_ < fileNames_.size()) {
1438  nextFile = fileNames_[fileListIndex_];
1439  if (nextFile.find("file://")==0) nextFile=nextFile.substr(7);
1440  else if (nextFile.find("file:")==0) nextFile=nextFile.substr(5);
1441  boost::filesystem::path fileName = nextFile;
1442  std::string fileStem = fileName.stem().string();
1443  if (fileStem.find("ls")) fileStem = fileStem.substr(fileStem.find("ls")+2);
1444  if (fileStem.find("_")) fileStem = fileStem.substr(0,fileStem.find("_"));
1445 
1446  if (!fileListLoopMode_)
1447  ls = boost::lexical_cast<unsigned int>(fileStem);
1448  else //always starting from LS 1 in loop mode
1449  ls = 1 + loopModeIterationInc_;
1450 
1451  //fsize = 0;
1452  //lockWaitTime = 0;
1453  fileListIndex_++;
1455  }
1456  else {
1457  if (!fileListLoopMode_)
1459  else {
1460  //loop through files until interrupted
1462  fileListIndex_=0;
1463  return getFile(ls,nextFile,fsize,lockWaitTime);
1464  }
1465  }
1466 }
#define LogDebug(id)
size
Write out results.
static const char runNumber_[]
Definition: start.py:1
void setComment(std::string const &value)
edm::EventAuxiliary makeEventAuxiliary(const tcds::Raw_v1 *, unsigned int runNumber, unsigned int lumiSection, const edm::EventAuxiliary::ExperimentType &, const std::string &processGUID, bool verifyLumiSection)
std::vector< std::string > & getData()
Definition: DataPoint.h:58
unsigned int lumi_
std::vector< std::string > fileNames_
unsigned int getgpshigh(const unsigned char *)
uint32_t chunkPosition_
std::condition_variable cvWakeup_
void read(edm::EventPrincipal &eventPrincipal) override
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:339
bool gtpe_board_sense(const unsigned char *p)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void setExceptionDetected(unsigned int ls)
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
uint8_t triggerType() const
Event Trigger type identifier.
Definition: FEDHeader.cc:17
bool crc32c_hw_test()
Definition: crc32c.cc:354
unsigned int offset_
jsoncollector::DataPointDefinition * dpd_
std::pair< bool, unsigned int > getEventReport(unsigned int lumi, bool erase)
static void fillDescriptions(edm::ConfigurationDescriptions &descriptions)
static const uint32_t length
Definition: FEDTrailer.h:61
tbb::concurrent_queue< unsigned int > workerPool_
void setAllowAnything()
allow any parameter label/value pairs
unsigned int get(const unsigned char *, bool)
void maybeOpenNewLumiSection(const uint32_t lumiSection)
static const uint32_t length
Definition: FEDHeader.h:54
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
void setInState(FastMonitoringThread::InputState inputState)
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists)
void rewindChunk(const size_t size)
const uint32 FRDHeaderVersionSize[6]
volatile std::atomic< bool > shutdown_flag
int init
Definition: HydjetWrapper.h:67
uint16_t sourceID() const
Identifier of the FED.
Definition: FEDHeader.cc:32
evf::EvFDaqDirector::FileStatus status_
void setInStateSup(FastMonitoringThread::InputState inputState)
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:504
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
void createProcessingNotificationMaybe() const
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
std::unique_ptr< std::thread > readSupervisorThread_
ProductProvenance const & dummyProvenance() const
Represents a JSON value.
Definition: value.h:111
std::string getEoLSFilePathOnBU(const unsigned int ls) const
#define nullptr
int timeout
Definition: mps_check.py:51
std::vector< std::condition_variable * > cvReader_
uint32_t fragmentLength() const
The length of the event fragment counted in 64-bit words including header and trailer.
Definition: FEDTrailer.cc:17
FedRawDataInputSource * parent_
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:209
FedRawDataInputSource(edm::ParameterSet const &, edm::InputSourceDescription const &)
U second(std::pair< T, U > const &p)
void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex)
static Timestamp beginOfTime()
Definition: Timestamp.h:103
void setComment(std::string const &value)
uint32_t bufferPosition_
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
Definition: TCDSRaw.h:16
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
void resize(size_t newsize)
Definition: FEDRawData.cc:32
bool advance(unsigned char *&dataPosition, const size_t size)
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:346
std::vector< int > streamFileTracker_
void moveToPreviousChunk(const size_t size, const size_t offset)
unsigned char * buf_
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
int grabNextJsonFile(boost::filesystem::path const &)
StreamID streamID() const
BranchDescription const & branchDescription() const
edm::ProcessHistoryID processHistoryID_
#define end
Definition: vmac.h:39
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance) const
std::map< unsigned int, unsigned int > sourceEventsReport_
bool waitForChunk(unsigned int chunkid)
const edm::DaqProvenanceHelper daqProvenanceHelper_
unsigned int fileIndex_
unsigned long long TimeValue_t
Definition: Timestamp.h:28
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
std::vector< std::thread * > workerThreads_
bool evm_board_sense(const unsigned char *p, size_t size)
unsigned int uint32
Definition: MsgTools.h:13
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:371
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
Definition: crc32c.cc:340
void setInputSource(FedRawDataInputSource *inputSource)
std::atomic< bool > readComplete_
unsigned long long int rval
Definition: vlib.h:22
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:510
def ls(path, rec=False)
Definition: eostools.py:348
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
def getRunNumber(filename)
unsigned long long uint64_t
Definition: Time.h:15
unsigned int currentChunk_
auto dp
Definition: deltaR.h:22
void deserialize(Json::Value &root) override
Definition: DataPoint.cc:56
tbb::concurrent_queue< InputFile * > fileQueue_
std::unique_ptr< FRDEventMsgView > event_
def load(fileName)
Definition: svgfig.py:546
std::string fileName_
double b
Definition: hdecay.h:120
void add(std::string const &label, ParameterSetDescription const &psetDescription)
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:210
void setProcessHistoryID(ProcessHistoryID const &phid)
std::condition_variable startupCv_
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:250
void stoppedLookingForFile(unsigned int lumi)
tbb::concurrent_vector< InputChunk * > chunks_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:354
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:340
std::atomic< bool > threadInit_
std::pair< InputFile *, InputChunk * > ReaderInfo
std::string getBoLSFilePathOnFU(const unsigned int ls) const
HLT enums.
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
evf::EvFDaqDirector * daqDirector_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:342
void readWorker(unsigned int tid)
double a
Definition: hdecay.h:121
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:28
Unserialize a JSON document into a Value.
Definition: reader.h:16
std::vector< unsigned int > thread_quit_signal
std::atomic< unsigned int > readingFilesCount_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
void deleteFile(std::string const &)
evf::FastMonitoringService * fms_
unsigned int gtpe_get(const unsigned char *)
std::vector< unsigned int > tid_active_
unsigned int RunNumber_t
std::vector< std::string > const & getNames()
std::string getEoLSFilePathOnFU(const unsigned int ls) const
tbb::concurrent_queue< InputChunk * > freeChunks_
evf::EvFDaqDirector::FileStatus getNextEvent()
evf::EvFDaqDirector::FileStatus nextEvent()
void readNextChunkIntoBuffer(InputFile *file)
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection &)
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:253
void setFMS(evf::FastMonitoringService *fms)
unsigned int getgpslow(const unsigned char *)
std::string getFormatedErrorMessages() const
Returns a user friendly string that list errors in the parsed document.
def move(src, dest)
Definition: eostools.py:510
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
Definition: event.py:1
std::string getEoRFilePathOnFU() const