CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Protected Member Functions | Private Types | Private Member Functions | Private Attributes | Friends
FedRawDataInputSource Class Reference

#include <FedRawDataInputSource.h>

Inheritance diagram for FedRawDataInputSource:
edm::RawInputSource edm::InputSource edm::ProductRegistryHelper

Public Member Functions

 FedRawDataInputSource (edm::ParameterSet const &, edm::InputSourceDescription const &)
 
virtual ~FedRawDataInputSource ()
 
- Public Member Functions inherited from edm::RawInputSource
 RawInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
 
virtual ~RawInputSource ()
 
- Public Member Functions inherited from edm::InputSource
boost::shared_ptr
< ActivityRegistry
actReg () const
 Accessor for Activity Registry. More...
 
boost::shared_ptr
< BranchIDListHelper
branchIDListHelper () const
 Accessor for branchIDListHelper. More...
 
void closeFile (FileBlock *, bool cleaningUpAfterException)
 close current file More...
 
void doBeginJob ()
 Called by framework at beginning of job. More...
 
void doBeginLumi (LuminosityBlockPrincipal &lbp, ProcessContext const *)
 Called by framework at beginning of lumi block. More...
 
void doBeginRun (RunPrincipal &rp, ProcessContext const *)
 Called by framework at beginning of run. More...
 
void doEndJob ()
 Called by framework at end of job. More...
 
void doEndLumi (LuminosityBlockPrincipal &lbp, bool cleaningUpAfterException, ProcessContext const *)
 Called by framework at end of lumi block. More...
 
void doEndRun (RunPrincipal &rp, bool cleaningUpAfterException, ProcessContext const *)
 Called by framework at end of run. More...
 
void doPostForkReacquireResources (boost::shared_ptr< multicore::MessageReceiverForSource >)
 
void doPreForkReleaseResources ()
 Called by the framework before forking the process. More...
 
ProcessingController::ForwardState forwardState () const
 
bool goToEvent (EventID const &eventID)
 
 InputSource (ParameterSet const &, InputSourceDescription const &)
 Constructor. More...
 
 InputSource (InputSource const &)=delete
 
void issueReports (EventID const &eventID)
 issue an event report More...
 
LuminosityBlockNumber_t luminosityBlock () const
 Accessor for current luminosity block number. More...
 
boost::shared_ptr
< LuminosityBlockAuxiliary
luminosityBlockAuxiliary () const
 Called by the framework to merge or insert lumi in principal cache. More...
 
int maxEvents () const
 
int maxLuminosityBlocks () const
 
ModuleDescription const & moduleDescription () const
 Accessor for 'module' description. More...
 
ItemType nextItemType ()
 Advances the source to the next item. More...
 
InputSourceoperator= (InputSource const &)=delete
 
bool primary () const
 Accessor for primary input source flag. More...
 
ProcessConfiguration const & processConfiguration () const
 Accessor for Process Configuration. More...
 
std::string const & processGUID () const
 Accessor for global process identifier. More...
 
ProcessHistoryRegistry const & processHistoryRegistry () const
 Const accessor for process history registry. More...
 
ProcessHistoryRegistryprocessHistoryRegistryForUpdate ()
 Non-const accessor for process history registry. More...
 
ProcessingMode processingMode () const
 RunsLumisAndEvents (default), RunsAndLumis, or Runs. More...
 
boost::shared_ptr
< ProductRegistry const > 
productRegistry () const
 Accessor for product registry. More...
 
bool randomAccess () const
 
void readAndMergeLumi (LuminosityBlockPrincipal &lbp)
 Read next luminosity block (same as a prior lumi) More...
 
void readAndMergeRun (RunPrincipal &rp)
 Read next run (same as a prior run) More...
 
void readEvent (EventPrincipal &ep, StreamContext &)
 Read next event. More...
 
bool readEvent (EventPrincipal &ep, EventID const &, StreamContext &)
 Read a specific event. More...
 
std::unique_ptr< FileBlockreadFile ()
 Read next file. More...
 
void readLuminosityBlock (LuminosityBlockPrincipal &lumiPrincipal, HistoryAppender &historyAppender)
 Read next luminosity block (new lumi) More...
 
boost::shared_ptr
< LuminosityBlockAuxiliary
readLuminosityBlockAuxiliary ()
 Read next luminosity block Auxilary. More...
 
void readRun (RunPrincipal &runPrincipal, HistoryAppender &historyAppender)
 Read next run (new run) More...
 
boost::shared_ptr< RunAuxiliaryreadRunAuxiliary ()
 Read next run Auxiliary. More...
 
ProcessHistoryID const & reducedProcessHistoryID () const
 
void registerProducts ()
 Register any produced products. More...
 
int remainingEvents () const
 
int remainingLuminosityBlocks () const
 
void repeat ()
 Reset the remaining number of events/lumis to the maximum number. More...
 
SharedResourcesAcquirerresourceSharedWithDelayedReader () const
 Returns nullptr if no resource shared between the Source and a DelayedReader. More...
 
ProcessingController::ReverseState reverseState () const
 
void rewind ()
 Begin again at the first event. More...
 
RunNumber_t run () const
 Accessor for current run number. More...
 
boost::shared_ptr< RunAuxiliaryrunAuxiliary () const
 Called by the framework to merge or insert run in principal cache. More...
 
void setLuminosityBlockNumber_t (LuminosityBlockNumber_t lb)
 Set the luminosity block ID. More...
 
void setRunNumber (RunNumber_t r)
 Set the run number. More...
 
void skipEvents (int offset)
 
bool skipForForking ()
 
Timestamp const & timestamp () const
 Accessor for the current time, as seen by the input source. More...
 
virtual ~InputSource ()
 Destructor. More...
 

Protected Member Functions

virtual bool checkNextEvent () override
 
virtual void read (edm::EventPrincipal &eventPrincipal) override
 
- Protected Member Functions inherited from edm::RawInputSource
void makeEvent (EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
 
void setInputFileTransitionsEachEvent ()
 
- Protected Member Functions inherited from edm::InputSource
void decreaseRemainingEventsBy (int iSkipped)
 
bool eventCached () const
 
boost::shared_ptr
< LuminosityBlockPrincipal >
const 
luminosityBlockPrincipal () const
 
bool newLumi () const
 
bool newRun () const
 
ProcessHistoryRegistryprocessHistoryRegistryUpdate () const
 
ProductRegistryproductRegistryUpdate () const
 
void reset () const
 
void resetEventCached ()
 
void resetLuminosityBlockAuxiliary (bool isNewLumi=true) const
 
void resetNewLumi ()
 
void resetNewRun ()
 
void resetRunAuxiliary (bool isNewRun=true) const
 
boost::shared_ptr
< RunPrincipal > const 
runPrincipal () const
 
void setEventCached ()
 Called by the framework to merge or ached() const {return eventCached_;}. More...
 
void setLuminosityBlockAuxiliary (LuminosityBlockAuxiliary *lbp)
 
void setNewLumi ()
 
void setNewRun ()
 
void setRunAuxiliary (RunAuxiliary *rp)
 
void setTimestamp (Timestamp const &theTime)
 To set the current time, as seen by the input source. More...
 
virtual void skip (int offset)
 
ItemType state () const
 

Private Types

typedef std::pair< InputFile
*, InputChunk * > 
ReaderInfo
 

Private Member Functions

void deleteFile (std::string const &)
 
bool exceptionState ()
 
edm::Timestamp fillFEDRawDataCollection (std::auto_ptr< FEDRawDataCollection > &)
 
evf::EvFDaqDirector::FileStatus getNextEvent ()
 
int grabNextJsonFile (boost::filesystem::path const &)
 
void maybeOpenNewLumiSection (const uint32_t lumiSection)
 
evf::EvFDaqDirector::FileStatus nextEvent ()
 
virtual void postForkReacquireResources (boost::shared_ptr< edm::multicore::MessageReceiverForSource >) override
 
virtual void preForkReleaseResources () override
 
void readNextChunkIntoBuffer (InputFile *file)
 
void readSupervisor ()
 
void readWorker (unsigned int tid)
 
void renameToNextFree (std::string const &fileName) const
 
virtual void rewind_ () override
 
void threadError ()
 

Private Attributes

uint32_t bufferInputRead_ = 0
 
unsigned int checkEvery_ = 10
 
bool chunkIsFree_ =false
 
InputFilecurrentFile_ = 0
 
int currentFileIndex_ = -1
 
unsigned int currentLumiSection_
 
std::vector
< std::condition_variable * > 
cvReader_
 
std::condition_variable cvWakeup_
 
evf::EvFDaqDirectordaqDirector_ = 0
 
const edm::DaqProvenanceHelper daqProvenanceHelper_
 
std::string defPath_
 
uint32 detectedFRDversion_ =0
 
jsoncollector::DataPointDefinitiondpd_
 
std::unique_ptr< FRDEventMsgViewevent_
 
unsigned int eventChunkBlock_
 
unsigned int eventChunkSize_
 
edm::EventID eventID_
 
uint32_t eventRunNumber_ =0
 
unsigned int eventsThisLumi_
 
unsigned long eventsThisRun_ = 0
 
std::mutex fileDeleteLock_
 
int fileDescriptor_ = -1
 
std::list< std::pair< int,
std::string > > 
fileNamesToDelete_
 
tbb::concurrent_queue
< InputFile * > 
fileQueue_
 
std::list< std::pair< int,
InputFile * > > 
filesToDelete_
 
evf::FastMonitoringServicefms_ = 0
 
tbb::concurrent_queue
< InputChunk * > 
freeChunks_
 
const std::string fuOutputDir_
 
const bool getLSFromFilename_
 
uint32_t GTPEventID_ = 0
 
uint32_t L1EventID_ = 0
 
std::mutex mReader_
 
std::mutex mWakeup_
 
unsigned int nStreams_ = 0
 
unsigned int numBuffers_
 
unsigned int numConcurrentReads_
 
edm::ProcessHistoryID processHistoryID_
 
std::atomic< bool > quit_threads_
 
unsigned int readBlocks_
 
std::unique_ptr< std::thread > readSupervisorThread_
 
const edm::RunNumber_t runNumber_
 
bool setExceptionState_ = false
 
bool singleBufferMode_
 
bool startedSupervisorThread_ = false
 
std::condition_variable startupCv_
 
std::mutex startupLock_
 
std::vector< int > * streamFileTrackerPtr_ = 0
 
unsigned char * tcds_pointer_
 
const bool testModeNoBuilderUnit_
 
std::vector< bool > thread_quit_signal
 
std::atomic< bool > threadInit_
 
const bool useL1EventID_
 
const bool verifyAdler32_
 
std::vector< ReaderInfoworkerJob_
 
tbb::concurrent_queue
< unsigned int > 
workerPool_
 
std::vector< std::thread * > workerThreads_
 

Friends

class InputChunk
 
class InputFile
 Open Root file and provide MEs ############. More...
 

Additional Inherited Members

- Public Types inherited from edm::InputSource
enum  ItemType {
  IsInvalid, IsStop, IsFile, IsRun,
  IsLumi, IsEvent, IsRepeat, IsSynchronize
}
 
enum  ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents }
 
typedef
ProductRegistryHelper::TypeLabelList 
TypeLabelList
 
- Static Public Member Functions inherited from edm::RawInputSource
static void fillDescription (ParameterSetDescription &description)
 
- Static Public Member Functions inherited from edm::InputSource
static const std::string & baseType ()
 
static void fillDescription (ParameterSetDescription &desc)
 
static void fillDescriptions (ConfigurationDescriptions &descriptions)
 
static void prevalidate (ConfigurationDescriptions &)
 

Detailed Description

Definition at line 43 of file FedRawDataInputSource.h.

Member Typedef Documentation

Definition at line 122 of file FedRawDataInputSource.h.

Constructor & Destructor Documentation

FedRawDataInputSource::FedRawDataInputSource ( edm::ParameterSet const &  pset,
edm::InputSourceDescription const &  desc 
)
explicit

Definition at line 52 of file FedRawDataInputSource.cc.

References edm::Timestamp::beginOfTime(), cvReader_, daqDirector_, edm::DaqProvenanceHelper::daqInit(), daqProvenanceHelper_, defPath_, dpd_, eventChunkBlock_, eventChunkSize_, edm::hlt::Exception, fileDeleteLock_, filesToDelete_, fms_, freeChunks_, i, InputChunk, edm::Timestamp::invalidTimestamp(), numBuffers_, numConcurrentReads_, processHistoryID_, edm::InputSource::processHistoryRegistryForUpdate(), edm::InputSource::productRegistryUpdate(), quit_threads_, readBlocks_, readWorker(), edm::InputSource::runAuxiliary(), runNumber_, evf::EvFDaqDirector::setDeleteTracking(), evf::EvFDaqDirector::setFMS(), edm::InputSource::setNewRun(), edm::InputSource::setRunAuxiliary(), singleBufferMode_, startupCv_, startupLock_, AlCaHLTBitMon_QueryRunRegistry::string, testModeNoBuilderUnit_, thread_quit_signal, threadInit_, workerJob_, and workerThreads_.

53  :
54  edm::RawInputSource(pset, desc),
55  defPath_(pset.getUntrackedParameter<std::string> ("buDefPath", std::string(getenv("CMSSW_BASE"))+"/src/EventFilter/Utilities/plugins/budef.jsd")),
56  eventChunkSize_(pset.getUntrackedParameter<unsigned int> ("eventChunkSize",16)*1048576),
57  eventChunkBlock_(pset.getUntrackedParameter<unsigned int> ("eventChunkBlock",eventChunkSize_/1048576)*1048576),
58  numBuffers_(pset.getUntrackedParameter<unsigned int> ("numBuffers",1)),
59  getLSFromFilename_(pset.getUntrackedParameter<bool> ("getLSFromFilename", true)),
60  verifyAdler32_(pset.getUntrackedParameter<bool> ("verifyAdler32", true)),
61  useL1EventID_(pset.getUntrackedParameter<bool> ("useL1EventID", false)),
62  testModeNoBuilderUnit_(edm::Service<evf::EvFDaqDirector>()->getTestModeNoBuilderUnit()),
66  eventID_(),
69  tcds_pointer_(0),
70  eventsThisLumi_(0),
71  dpd_(nullptr)
72 {
73  char thishost[256];
74  gethostname(thishost, 255);
75  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: "
76  << std::endl << (eventChunkSize_/1048576)
77  << " MB on host " << thishost;
79  edm::LogInfo("FedRawDataInputSource") << "Test mode is ON!";
80 
82  setNewRun();
85 
86  dpd_ = new DataPointDefinition();
87  std::string defLabel = "data";
88  DataPointDefinition::getDataPointDefinitionFor(defPath_, dpd_,&defLabel);
89 
90  //make sure that chunk size is N * block size
95 
96  if (!numBuffers_)
97  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") <<
98  "no reading enabled with numBuffers parameter 0";
99 
102 
103  //het handles to DaqDirector and FastMonitoringService because it isn't acessible in readSupervisor thread
104 
105  try {
107  } catch (...){
108  edm::LogWarning("FedRawDataInputSource") << "FastMonitoringService not found";
109  assert(0);//test
110  }
111 
112  try {
114  //set DaqDirector to delete files in preGlobalEndLumi callback
117  if (fms_) daqDirector_->setFMS(fms_);
118  } catch (...){
119  edm::LogWarning("FedRawDataInputSource") << "EvFDaqDirector not found";
120  assert(0);//test
121  }
122 
123  //should delete chunks when run stops
124  for (unsigned int i=0;i<numBuffers_;i++) {
126  }
127 
128  quit_threads_ = false;
129 
130  for (unsigned int i=0;i<numConcurrentReads_;i++)
131  {
132  std::unique_lock<std::mutex> lk(startupLock_);
133  //issue a memory fence here and in threads (constructor was segfaulting without this)
134  thread_quit_signal.push_back(false);
135  workerJob_.push_back(ReaderInfo(nullptr,nullptr));
136  cvReader_.push_back(new std::condition_variable);
137  threadInit_.store(false,std::memory_order_release);
138  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker,this,i));
139  startupCv_.wait(lk);
140  }
141 
142  runAuxiliary()->setProcessHistoryID(processHistoryID_);
143 }
int i
Definition: DBlmapReader.cc:9
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
jsoncollector::DataPointDefinition * dpd_
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
std::vector< std::condition_variable * > cvReader_
static Timestamp beginOfTime()
Definition: Timestamp.h:103
std::vector< bool > thread_quit_signal
const edm::RunNumber_t runNumber_
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
edm::ProcessHistoryID processHistoryID_
const edm::DaqProvenanceHelper daqProvenanceHelper_
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
std::vector< std::thread * > workerThreads_
const std::string fuOutputDir_
std::list< std::pair< int, InputFile * > > filesToDelete_
ProductRegistry & productRegistryUpdate() const
Definition: InputSource.h:347
std::condition_variable startupCv_
boost::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:259
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Non-const accessor for process history registry.
Definition: InputSource.h:174
std::atomic< bool > threadInit_
std::pair< InputFile *, InputChunk * > ReaderInfo
evf::EvFDaqDirector * daqDirector_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:350
void readWorker(unsigned int tid)
evf::FastMonitoringService * fms_
tbb::concurrent_queue< InputChunk * > freeChunks_
void setFMS(evf::FastMonitoringService *fms)
FedRawDataInputSource::~FedRawDataInputSource ( )
virtual

Definition at line 145 of file FedRawDataInputSource.cc.

References cvReader_, deleteFile(), filesToDelete_, i, mReader_, numConcurrentReads_, quit_threads_, readSupervisorThread_, startedSupervisorThread_, thread_quit_signal, and workerThreads_.

146 {
147  quit_threads_=true;
148 
149  //delete any remaining open files
150  for (auto it = filesToDelete_.begin();it!=filesToDelete_.end();it++) {
151  deleteFile(it->second->fileName_);
152  delete it->second;
153  }
155  readSupervisorThread_->join();
156  }
157  else {
158  //join aux threads in case the supervisor thread was not started
159  for (unsigned int i=0;i<workerThreads_.size();i++) {
160  std::unique_lock<std::mutex> lk(mReader_);
161  thread_quit_signal[i]=true;
162  cvReader_[i]->notify_one();
163  lk.unlock();
164  workerThreads_[i]->join();
165  delete workerThreads_[i];
166  }
167  }
168  for (unsigned int i=0;i<numConcurrentReads_;i++) delete cvReader_[i];
169  /*
170  for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
171  InputChunk *ch;
172  while (!freeChunks_.try_pop(ch)) {}
173  delete ch;
174  }
175  */
176 }
int i
Definition: DBlmapReader.cc:9
std::atomic< bool > quit_threads_
std::unique_ptr< std::thread > readSupervisorThread_
std::vector< std::condition_variable * > cvReader_
std::vector< bool > thread_quit_signal
std::vector< std::thread * > workerThreads_
std::list< std::pair< int, InputFile * > > filesToDelete_
void deleteFile(std::string const &)

Member Function Documentation

bool FedRawDataInputSource::checkNextEvent ( )
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 178 of file FedRawDataInputSource.cc.

References currentLumiSection_, daqDirector_, event_, eventRunNumber_, eventsThisLumi_, fms_, newFWLiteAna::found, evf::EvFDaqDirector::getEoLSFilePathOnBU(), evf::EvFDaqDirector::getEoLSFilePathOnFU(), evf::EvFDaqDirector::getEoRFilePathOnFU(), getLSFromFilename_, L1EventID_, evf::EvFDaqDirector::lockFULocal2(), maybeOpenNewLumiSection(), evf::EvFDaqDirector::newLumi, nextEvent(), evf::EvFDaqDirector::noFile, readSupervisor(), readSupervisorThread_, evf::FastMonitoringService::reportEventsThisLumiInSource(), edm::InputSource::resetLuminosityBlockAuxiliary(), evf::EvFDaqDirector::runEnded, edm::InputSource::setEventCached(), startedSupervisorThread_, startupCv_, startupLock_, and AlCaHLTBitMon_QueryRunRegistry::string.

179 {
181  {
182  //this thread opens new files and dispatches reading to worker readers
183  //threadInit_.store(false,std::memory_order_release);
184  std::unique_lock<std::mutex> lk(startupLock_);
185  readSupervisorThread_.reset(new std::thread(&FedRawDataInputSource::readSupervisor,this));
187  startupCv_.wait(lk);
188  }
189  switch (nextEvent() ) {
191 
192  //maybe create EoL file in working directory before ending run
193  struct stat buf;
194  if ( currentLumiSection_ > 0 ) {
195  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
196  if (eolFound) {
198  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
199  if ( !found ) {
201  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
202  close(eol_fd);
204  }
205  }
206  }
207  //also create EoR file in FU data directory
208  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(),&buf) == 0);
209  if (!eorFound) {
210  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
211  close(eor_fd);
212  }
214  eventsThisLumi_=0;
216  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
217  return false;
218  }
220  //this is not reachable
221  return true;
222  }
224  //std::cout << "--------------NEW LUMI---------------" << std::endl;
225  return true;
226  }
227  default: {
228  if (!getLSFromFilename_) {
229  //get new lumi from file header
230  if (event_->lumi() > currentLumiSection_) {
232  eventsThisLumi_=0;
233  maybeOpenNewLumiSection( event_->lumi() );
234  }
235  }
236  eventRunNumber_=event_->run();
237  L1EventID_ = event_->event();
238 
239  setEventCached();
240 
241  return true;
242  }
243  }
244 }
void maybeOpenNewLumiSection(const uint32_t lumiSection)
std::unique_ptr< std::thread > readSupervisorThread_
std::string getEoLSFilePathOnBU(const unsigned int ls) const
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:381
std::unique_ptr< FRDEventMsgView > event_
std::condition_variable startupCv_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:362
evf::EvFDaqDirector * daqDirector_
evf::FastMonitoringService * fms_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
evf::EvFDaqDirector::FileStatus nextEvent()
std::string getEoRFilePathOnFU() const
void FedRawDataInputSource::deleteFile ( std::string const &  fileName)
private

Definition at line 535 of file FedRawDataInputSource.cc.

References cppFunctionSkipper::exception, convertXMLtoSQLite_cfg::fileName, LogDebug, cmsHarvester::path, python.multivaluedict::remove(), renameToNextFree(), and testModeNoBuilderUnit_.

Referenced by getNextEvent(), read(), and ~FedRawDataInputSource().

536 {
537  const boost::filesystem::path filePath(fileName);
538  if (!testModeNoBuilderUnit_) {
539  LogDebug("FedRawDataInputSource") << "Deleting input file -:" << fileName;
540  try {
541  //sometimes this fails but file gets deleted
542  boost::filesystem::remove(filePath);
543  }
544  catch (const boost::filesystem::filesystem_error& ex)
545  {
546  edm::LogError("FedRawDataInputSource") << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
547  << ". Trying again.";
548  usleep(100000);
549  try {
550  boost::filesystem::remove(filePath);
551  }
552  catch (...) {/*file gets deleted first time but exception is still thrown*/}
553  }
554  catch (std::exception& ex)
555  {
556  edm::LogError("FedRawDataInputSource") << " - deleteFile std::exception CAUGHT -: " << ex.what()
557  << ". Trying again.";
558  usleep(100000);
559  try {
560  boost::filesystem::remove(filePath);
561  } catch (...) {/*file gets deleted first time but exception is still thrown*/}
562  }
563  } else {
565  }
566 }
#define LogDebug(id)
tuple path
else: Piece not in the list, fine.
void renameToNextFree(std::string const &fileName) const
bool FedRawDataInputSource::exceptionState ( )
inlineprivate

Definition at line 72 of file FedRawDataInputSource.h.

References setExceptionState_.

Referenced by InputFile::advance(), and getNextEvent().

edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection ( std::auto_ptr< FEDRawDataCollection > &  rawData)
private

Definition at line 638 of file FedRawDataInputSource.cc.

References FEDRawData::data(), event(), event_, fedt_struct::eventsize, evf::evtn::evm_board_sense(), FED_EVSZ_EXTRACT, FED_SOID_EXTRACT, evf::evtn::get(), evf::evtn::getgpshigh(), evf::evtn::getgpslow(), evf::evtn::gtpe_board_sense(), evf::evtn::gtpe_get(), GTPEventID_, FEDNumbering::MINTCDSuTCAFEDID, FEDNumbering::MINTriggerEGTPFEDID, FEDNumbering::MINTriggerGTPFEDID, FEDRawData::resize(), fedh_struct::sourceid, and tcds_pointer_.

Referenced by read().

639 {
640  edm::Timestamp tstamp;
641  uint32_t eventSize = event_->eventSize();
642  char* event = (char*)event_->payload();
643  GTPEventID_=0;
644  tcds_pointer_ = 0;
645  while (eventSize > 0) {
646  eventSize -= sizeof(fedt_t);
647  const fedt_t* fedTrailer = (fedt_t*) (event + eventSize);
648  const uint32_t fedSize = FED_EVSZ_EXTRACT(fedTrailer->eventsize) << 3; //trailer length counts in 8 bytes
649  eventSize -= (fedSize - sizeof(fedh_t));
650  const fedh_t* fedHeader = (fedh_t *) (event + eventSize);
651  const uint16_t fedId = FED_SOID_EXTRACT(fedHeader->sourceid);
652  if (fedId == FEDNumbering::MINTCDSuTCAFEDID) {
653  tcds_pointer_ = (unsigned char *)(event + eventSize );
654  }
655  if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
656  if (evf::evtn::evm_board_sense((unsigned char*) fedHeader,fedSize))
657  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,true);
658  else
659  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,false);
660  //evf::evtn::evm_board_setformat(fedSize);
661  const uint64_t gpsl = evf::evtn::getgpslow((unsigned char*) fedHeader);
662  const uint64_t gpsh = evf::evtn::getgpshigh((unsigned char*) fedHeader);
663  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
664  }
665  //take event ID from GTPE FED
666  if (fedId == FEDNumbering::MINTriggerEGTPFEDID && GTPEventID_==0) {
667  if (evf::evtn::gtpe_board_sense((unsigned char*)fedHeader)) {
668  GTPEventID_ = evf::evtn::gtpe_get((unsigned char*) fedHeader);
669  }
670  }
671  FEDRawData& fedData = rawData->FEDData(fedId);
672  fedData.resize(fedSize);
673  memcpy(fedData.data(), event + eventSize, fedSize);
674  }
675  assert(eventSize == 0);
676 
677  return tstamp;
678 }
unsigned int getgpshigh(const unsigned char *)
bool gtpe_board_sense(const unsigned char *p)
struct fedh_struct fedh_t
unsigned int get(const unsigned char *, bool)
unsigned int sourceid
Definition: fed_header.h:32
void resize(size_t newsize)
Definition: FEDRawData.cc:32
struct fedt_struct fedt_t
#define FED_EVSZ_EXTRACT(a)
Definition: fed_trailer.h:36
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
bool evm_board_sense(const unsigned char *p, size_t size)
unsigned long long uint64_t
Definition: Time.h:15
std::unique_ptr< FRDEventMsgView > event_
unsigned int eventsize
Definition: fed_trailer.h:33
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:28
unsigned int gtpe_get(const unsigned char *)
unsigned int getgpslow(const unsigned char *)
#define FED_SOID_EXTRACT(a)
Definition: fed_header.h:53
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent ( )
inlineprivate

Definition at line 295 of file FedRawDataInputSource.cc.

References InputFile::advance(), bufferInputRead_, InputFile::bufferPosition_, checkEvery_, chunkIsFree_, InputFile::chunkPosition_, InputFile::chunks_, InputFile::currentChunk_, currentFile_, currentFileIndex_, currentLumiSection_, cvWakeup_, daqDirector_, deleteFile(), detectedFRDversion_, event_, eventChunkSize_, eventsThisLumi_, edm::hlt::Exception, exceptionState(), fileDeleteLock_, InputFile::fileName_, fileQueue_, InputFile::fileSize_, filesToDelete_, fms_, freeChunks_, getLSFromFilename_, evf::EvFDaqDirector::getStreamFileTracker(), evf::EvFDaqDirector::isSingleStreamThread(), dttmaxenums::L, InputFile::lumi_, maybeOpenNewLumiSection(), InputFile::moveToPreviousChunk(), mWakeup_, InputFile::nChunks_, InputFile::nEvents_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, InputFile::nProcessed_, nStreams_, InputFile::parent_, readNextChunkIntoBuffer(), evf::FastMonitoringService::reportEventsThisLumiInSource(), InputFile::rewindChunk(), evf::EvFDaqDirector::runEnded, evf::EvFDaqDirector::sameFile, setExceptionState_, singleBufferMode_, ntuplemaker::status, InputFile::status_, streamFileTrackerPtr_, threadError(), evf::EvFDaqDirector::updateFileIndex(), verifyAdler32_, and InputFile::waitForChunk().

Referenced by nextEvent().

296 {
297  const size_t headerSize[4] = {0,2*sizeof(uint32),(4 + 1024) * sizeof(uint32),7*sizeof(uint32)}; //size per version of FRDEventHeader
298 
300  if (!currentFile_)
301  {
302  if (!streamFileTrackerPtr_) {
306  }
307 
309  if (!fileQueue_.try_pop(currentFile_))
310  {
311  //sleep until wakeup (only in single-buffer mode) or timeout
312  std::unique_lock<std::mutex> lkw(mWakeup_);
313  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_)
315  }
316  status = currentFile_->status_;
317  if ( status == evf::EvFDaqDirector::runEnded)
318  {
319  delete currentFile_;
320  currentFile_=nullptr;
321  return status;
322  }
323 
324  else if (status == evf::EvFDaqDirector::newLumi)
325  {
326  if (getLSFromFilename_) {
329  eventsThisLumi_=0;
331  }
332  }
333  else {//let this be picked up from next event
335  }
336 
337  delete currentFile_;
338  currentFile_=nullptr;
339  return status;
340  }
341  else if (status == evf::EvFDaqDirector::newFile) {
344  }
345  else
346  assert(0);
347  }
348 
349  //file is empty
350  if (!currentFile_->fileSize_) {
351  //try to open new lumi
352  assert(currentFile_->nChunks_==0);
353  if (getLSFromFilename_)
356  eventsThisLumi_=0;
358  }
359  //immediately delete empty file
361  delete currentFile_;
362  currentFile_=nullptr;
364  }
365 
366  //file is finished
368  //release last chunk (it is never released elsewhere)
371  {
372  throw cms::Exception("RuntimeError")
373  << "Fully processed " << currentFile_->nProcessed_
374  << " from the file " << currentFile_->fileName_
375  << " but according to BU JSON there should be "
376  << currentFile_->nEvents_ << " events";
377  }
378  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
379  if (singleBufferMode_) {
380  std::unique_lock<std::mutex> lkw(mWakeup_);
381  cvWakeup_.notify_one();
382  }
385  //put the file in pending delete list;
386  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
387  filesToDelete_.push_back(std::pair<int,InputFile*>(currentFileIndex_,currentFile_));
388  }
389  else {
390  //in single-thread and stream jobs, events are already processed
392  delete currentFile_;
393  }
394  currentFile_=nullptr;
396  }
397 
398 
399  //file is too short
401  {
402  throw cms::Exception("FedRawDataInputSource::cacheNextEvent") <<
403  "Premature end of input file while reading event header";
404  }
405  if (singleBufferMode_) {
406 
407  //should already be there
409  usleep(10000);
411  }
412 
413  unsigned char *dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
414 
415  //conditions when read amount is not sufficient for the header to fit
416  if (!bufferInputRead_ || bufferInputRead_ < headerSize[detectedFRDversion_]
417  || eventChunkSize_ - currentFile_->chunkPosition_ < headerSize[detectedFRDversion_])
418  {
420 
421  if (detectedFRDversion_==0) {
422  detectedFRDversion_=*((uint32*)dataPosition);
423  assert(detectedFRDversion_>=1 && detectedFRDversion_<=3);
424  }
425 
426  //recalculate chunk position
427  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
428  if ( bufferInputRead_ < headerSize[detectedFRDversion_])
429  {
430  throw cms::Exception("FedRawDataInputSource::cacheNextEvent") <<
431  "Premature end of input file while reading event header";
432  }
433  }
434 
435  event_.reset( new FRDEventMsgView(dataPosition) );
436  if (event_->size()>eventChunkSize_) {
437  throw cms::Exception("FedRawDataInputSource::nextEvent")
438  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
439  << " run:" << event_->run() << " of size:" << event_->size()
440  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
441  }
442 
443  const uint32_t msgSize = event_->size()-headerSize[detectedFRDversion_];
444 
446  {
447  throw cms::Exception("FedRawDataInputSource::nextEvent") <<
448  "Premature end of input file while reading event data";
449  }
450  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
452  //recalculate chunk position
453  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
454  event_.reset( new FRDEventMsgView(dataPosition) );
455  }
456  currentFile_->bufferPosition_ += event_->size();
457  currentFile_->chunkPosition_ += event_->size();
458  //last chunk is released when this function is invoked next time
459 
460  }
461  //multibuffer mode:
462  else
463  {
464  //wait for the current chunk to become added to the vector
466  usleep(10000);
468  }
469 
470  //check if header is at the boundary of two chunks
471  chunkIsFree_ = false;
472  unsigned char *dataPosition;
473 
474  //read header, copy it to a single chunk if necessary
475  bool chunkEnd = currentFile_->advance(dataPosition,headerSize[detectedFRDversion_]);
476 
477  event_.reset( new FRDEventMsgView(dataPosition) );
478  if (event_->size()>eventChunkSize_) {
479  throw cms::Exception("FedRawDataInputSource::nextEvent")
480  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
481  << " run:" << event_->run() << " of size:" << event_->size()
482  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
483  }
484 
485  const uint32_t msgSize = event_->size()-headerSize[detectedFRDversion_];
486 
488  {
489  throw cms::Exception("FedRawDataInputSource::nextEvent") <<
490  "Premature end of input file while reading event data";
491  }
492 
493  if (chunkEnd) {
494  //header was at the chunk boundary, we will have to move payload as well
495  currentFile_->moveToPreviousChunk(msgSize,headerSize[detectedFRDversion_]);
496  chunkIsFree_ = true;
497  }
498  else {
499  //header was contiguous, but check if payload fits the chunk
500  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
501  //rewind to header start position
502  currentFile_->rewindChunk(headerSize[detectedFRDversion_]);
503  //copy event to a chunk start and move pointers
504  chunkEnd = currentFile_->advance(dataPosition,headerSize[detectedFRDversion_]+msgSize);
505  assert(chunkEnd);
506  chunkIsFree_=true;
507  //header is moved
508  event_.reset( new FRDEventMsgView(dataPosition) );
509  }
510  else {
511  //everything is in a single chunk, only move pointers forward
512  chunkEnd = currentFile_->advance(dataPosition,msgSize);
513  assert(!chunkEnd);
514  chunkIsFree_=false;
515  }
516  }
517  }//end multibuffer mode
518 
519  if ( verifyAdler32_ && event_->version() >= 3 )
520  {
521  uint32_t adler = adler32(0L,Z_NULL,0);
522  adler = adler32(adler,(Bytef*)event_->payload(),event_->eventSize());
523 
524  if ( adler != event_->adler32() ) {
525  throw cms::Exception("FedRawDataInputSource::nextEvent") <<
526  "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() <<
527  " but calculated 0x" << adler;
528  }
529  }
531 
533 }
unsigned int lumi_
uint32_t chunkPosition_
std::condition_variable cvWakeup_
std::vector< int > * getStreamFileTracker()
void maybeOpenNewLumiSection(const uint32_t lumiSection)
void rewindChunk(const size_t size)
std::vector< int > * streamFileTrackerPtr_
evf::EvFDaqDirector::FileStatus status_
FedRawDataInputSource * parent_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
uint32_t bufferPosition_
void updateFileIndex(int const &fileIndex)
bool advance(unsigned char *&dataPosition, const size_t size)
void moveToPreviousChunk(const size_t size, const size_t offset)
bool waitForChunk(unsigned int chunkid)
unsigned int uint32
Definition: MsgTools.h:13
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
unsigned int currentChunk_
tbb::concurrent_queue< InputFile * > fileQueue_
std::unique_ptr< FRDEventMsgView > event_
std::string fileName_
tbb::concurrent_vector< InputChunk * > chunks_
evf::EvFDaqDirector * daqDirector_
void deleteFile(std::string const &)
evf::FastMonitoringService * fms_
tbb::concurrent_queue< InputChunk * > freeChunks_
tuple status
Definition: ntuplemaker.py:245
void readNextChunkIntoBuffer(InputFile *file)
unsigned int nEvents_
int FedRawDataInputSource::grabNextJsonFile ( boost::filesystem::path const &  jsonSourcePath)
private

Definition at line 680 of file FedRawDataInputSource.cc.

References filterCSVwithJSON::copy, daqDirector_, data, jsoncollector::DataPoint::deserialize(), dpd_, alignCSCRings::e, cppFunctionSkipper::exception, edm::hlt::Exception, fuOutputDir_, jsoncollector::DataPoint::getData(), jsoncollector::DataPointDefinition::getNames(), i, LogDebug, Json::Reader::parse(), cmsHarvester::path, matplotRender::reader, python.multivaluedict::remove(), AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, testModeNoBuilderUnit_, and evf::EvFDaqDirector::unlockFULocal().

Referenced by readSupervisor().

681 {
683  try {
684  // assemble json destination path
686 
687  //TODO:should be ported to use fffnaming
688  std::ostringstream fileNameWithPID;
689  fileNameWithPID << jsonSourcePath.stem().string() << "_pid"
690  << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
691  jsonDestPath /= fileNameWithPID.str();
692 
693  LogDebug("FedRawDataInputSource") << "JSON rename -: " << jsonSourcePath << " to "
694  << jsonDestPath;
695 
697  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
698  else {
699  try {
700  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
701  }
702  catch (const boost::filesystem::filesystem_error& ex)
703  {
704  // Input dir gone?
705  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
706  // << " Maybe the file is not yet visible by FU. Trying again in one second";
707  sleep(1);
708  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
709  }
711 
712 
713  try {
714  //sometimes this fails but file gets deleted
715  boost::filesystem::remove(jsonSourcePath);
716  }
717  catch (const boost::filesystem::filesystem_error& ex)
718  {
719  // Input dir gone?
720  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
721  }
722  catch (std::exception& ex)
723  {
724  // Input dir gone?
725  edm::LogError("FedRawDataInputSource") << "grabNextFile std::exception CAUGHT -: " << ex.what();
726  }
727 
728  }
729 
730  boost::filesystem::ifstream ij(jsonDestPath);
731  Json::Value deserializeRoot;
733 
734  if (!reader.parse(ij, deserializeRoot))
735  throw std::runtime_error("Cannot deserialize input JSON file");
736 
737  //read BU JSON
739  DataPoint dp;
740  dp.deserialize(deserializeRoot);
741  bool success = false;
742  for (unsigned int i=0;i<dpd_->getNames().size();i++) {
743  if (dpd_->getNames().at(i)=="NEvents")
744  if (i<dp.getData().size()) {
745  data = dp.getData()[i];
746  success=true;
747  }
748  }
749  if (!success) {
750  if (dp.getData().size())
751  data = dp.getData()[0];
752  else
753  throw cms::Exception("FedRawDataInputSource::grabNextJsonFile") <<
754  " error reading number of events from BU JSON -: No input value " << data;
755  }
756  return boost::lexical_cast<int>(data);
757 
758  }
759  catch (const boost::filesystem::filesystem_error& ex)
760  {
761  // Input dir gone?
763  edm::LogError("FedRawDataInputSource") << "grabNextFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
764  << " - Maybe the BU run dir disappeared? Ending process with code 0...";
765  _exit(-1);
766  }
767  catch (std::runtime_error e)
768  {
769  // Another process grabbed the file and NFS did not register this
771  edm::LogError("FedRawDataInputSource") << "grabNextFile - runtime Exception -: " << e.what();
772  }
773 
774  catch( boost::bad_lexical_cast const& ) {
775  edm::LogError("FedRawDataInputSource") << "grabNextFile - error parsing number of events from BU JSON. "
776  << "Input value is -: " << data;
777  }
778 
779  catch (std::exception e)
780  {
781  // BU run directory disappeared?
783  edm::LogError("FedRawDataInputSource") << "grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
784  }
785 
786  return -1;
787 }
#define LogDebug(id)
std::vector< std::string > & getData()
Definition: DataPoint.h:58
int i
Definition: DBlmapReader.cc:9
jsoncollector::DataPointDefinition * dpd_
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
Represents a JSON value.
Definition: value.h:111
tuple path
else: Piece not in the list, fine.
virtual void deserialize(Json::Value &root)
Definition: DataPoint.cc:56
const std::string fuOutputDir_
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
evf::EvFDaqDirector * daqDirector_
Unserialize a JSON document into a Value.
Definition: reader.h:16
std::vector< std::string > const & getNames()
void FedRawDataInputSource::maybeOpenNewLumiSection ( const uint32_t  lumiSection)
private

Definition at line 246 of file FedRawDataInputSource.cc.

References currentLumiSection_, daqDirector_, newFWLiteAna::found, evf::EvFDaqDirector::getEoLSFilePathOnFU(), edm::Timestamp::invalidTimestamp(), evf::EvFDaqDirector::lockFULocal2(), edm::InputSource::luminosityBlock(), edm::InputSource::luminosityBlockAuxiliary(), processHistoryID_, edm::InputSource::resetLuminosityBlockAuxiliary(), edm::InputSource::run(), edm::InputSource::runAuxiliary(), edm::InputSource::setLuminosityBlockAuxiliary(), AlCaHLTBitMon_QueryRunRegistry::string, and evf::EvFDaqDirector::unlockFULocal2().

Referenced by checkNextEvent(), and getNextEvent().

247 {
249  || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
250 
251  if ( currentLumiSection_ > 0 ) {
252  const std::string fuEoLS =
254  struct stat buf;
255  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
256  if ( !found ) {
258  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
259  close(eol_fd);
261  }
262  }
263 
264  currentLumiSection_ = lumiSection;
265 
267 
268  timeval tv;
269  gettimeofday(&tv, 0);
270  const edm::Timestamp lsopentime( (unsigned long long) tv.tv_sec * 1000000 + (unsigned long long) tv.tv_usec );
271 
272  edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary =
274  runAuxiliary()->run(),
275  lumiSection, lsopentime,
277 
278  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
279  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
280 
281  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: "<< lumiSection;
282  }
283 }
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
boost::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:262
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:595
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:354
edm::ProcessHistoryID processHistoryID_
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:601
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:362
boost::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:259
evf::EvFDaqDirector * daqDirector_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::nextEvent ( )
inlineprivate

Definition at line 285 of file FedRawDataInputSource.cc.

References getNextEvent(), svgfig::load(), evf::EvFDaqDirector::noFile, edm::shutdown_flag, and ntuplemaker::status.

Referenced by checkNextEvent().

286 {
288  while ((status = getNextEvent())==evf::EvFDaqDirector::noFile)
289  {
290  if (edm::shutdown_flag.load(std::memory_order_relaxed)) break;
291  }
292  return status;
293 }
volatile std::atomic< bool > shutdown_flag
def load
Definition: svgfig.py:546
evf::EvFDaqDirector::FileStatus getNextEvent()
tuple status
Definition: ntuplemaker.py:245
void FedRawDataInputSource::postForkReacquireResources ( boost::shared_ptr< edm::multicore::MessageReceiverForSource )
overrideprivatevirtual

Definition at line 803 of file FedRawDataInputSource.cc.

References edm::Timestamp::beginOfTime(), edm::Timestamp::invalidTimestamp(), runNumber_, and edm::InputSource::setRunAuxiliary().

804 {
805  InputSource::rewind();
809 }
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
static Timestamp beginOfTime()
Definition: Timestamp.h:103
const edm::RunNumber_t runNumber_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:350
void FedRawDataInputSource::preForkReleaseResources ( )
overrideprivatevirtual

Reimplemented from edm::RawInputSource.

Definition at line 800 of file FedRawDataInputSource.cc.

801 {}
void FedRawDataInputSource::read ( edm::EventPrincipal eventPrincipal)
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 569 of file FedRawDataInputSource.cc.

References printConversionInfo::aux, edm::DaqProvenanceHelper::branchDescription(), checkEvery_, chunkIsFree_, InputFile::chunks_, InputFile::currentChunk_, currentFile_, currentLumiSection_, daqProvenanceHelper_, deleteFile(), edm::DaqProvenanceHelper::dummyProvenance(), eventID_, eventRunNumber_, eventsThisLumi_, fileDeleteLock_, filesToDelete_, fillFEDRawDataCollection(), freeChunks_, GTPEventID_, i, L1EventID_, edm::RawInputSource::makeEvent(), evf::evtn::makeEventAuxiliary(), InputFile::nProcessed_, nStreams_, edm::EventAuxiliary::PhysicsTrigger, edm::InputSource::processGUID(), processHistoryID_, edm::EventPrincipal::put(), record, edm::EventAuxiliary::setProcessHistoryID(), streamFileTrackerPtr_, tcds_pointer_, and useL1EventID_.

Referenced by Vispa.Plugins.EdmBrowser.EdmDataAccessor.EdmDataAccessor::goto(), readNextChunkIntoBuffer(), readWorker(), and Vispa.Plugins.EdmBrowser.EdmDataAccessor.EdmDataAccessor::setFilterBranches().

570 {
571  std::auto_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
572  edm::Timestamp tstamp = fillFEDRawDataCollection(rawData);
573  if (useL1EventID_){
575  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
577  aux.setProcessHistoryID(processHistoryID_);
578  makeEvent(eventPrincipal, aux);
579  }
580  else if(tcds_pointer_==0){
581  assert(GTPEventID_);
583  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
585  aux.setProcessHistoryID(processHistoryID_);
586  makeEvent(eventPrincipal, aux);
587  }
588  else{
589  evf::evtn::TCDSRecord record((unsigned char *)(tcds_pointer_));
592  processGUID());
594  makeEvent(eventPrincipal, aux);
595  }
596 
597 
598 
601 
602  //FWCore/Sources DaqProvenanceHelper before 7_1_0_pre3
603  //eventPrincipal.put(daqProvenanceHelper_.constBranchDescription_, edp,
604  // daqProvenanceHelper_.dummyProvenance_);
605 
606  eventPrincipal.put(daqProvenanceHelper_.branchDescription(), edp,
608 
609  eventsThisLumi_++;
610 
611  //this old file check runs no more often than every 10 events
612  if (!((currentFile_->nProcessed_-1)%(checkEvery_))) {
613  //delete files that are not in processing
614  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
615  auto it = filesToDelete_.begin();
616  while (it!=filesToDelete_.end()) {
617  bool fileIsBeingProcessed = false;
618  for (unsigned int i=0;i<nStreams_;i++) {
619  if (it->first == streamFileTrackerPtr_->at(i)) {
620  fileIsBeingProcessed = true;
621  break;
622  }
623  }
624  if (!fileIsBeingProcessed) {
625  deleteFile(it->second->fileName_);
626  delete it->second;
627  it = filesToDelete_.erase(it);
628  }
629  else it++;
630  }
631 
632  }
634  chunkIsFree_=false;
635  return;
636 }
int i
Definition: DBlmapReader.cc:9
JetCorrectorParameters::Record record
Definition: classes.h:7
std::vector< int > * streamFileTrackerPtr_
ProductProvenance const & dummyProvenance() const
edm::EventAuxiliary makeEventAuxiliary(TCDSRecord *record, unsigned int runNumber, unsigned int lumiSection, std::string const &processGUID)
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:214
BranchDescription const & branchDescription() const
edm::ProcessHistoryID processHistoryID_
const edm::DaqProvenanceHelper daqProvenanceHelper_
edm::Timestamp fillFEDRawDataCollection(std::auto_ptr< FEDRawDataCollection > &)
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
unsigned int currentChunk_
void setProcessHistoryID(ProcessHistoryID const &phid)
tbb::concurrent_vector< InputChunk * > chunks_
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
void deleteFile(std::string const &)
tbb::concurrent_queue< InputChunk * > freeChunks_
void put(BranchDescription const &bd, WrapperOwningHolder const &edp, ProductProvenance const &productProvenance)
void FedRawDataInputSource::readNextChunkIntoBuffer ( InputFile file)
private

Definition at line 1124 of file FedRawDataInputSource.cc.

References bufferInputRead_, InputFile::chunkPosition_, InputFile::chunks_, eventChunkBlock_, eventChunkSize_, edm::hlt::Exception, fileDescriptor_, InputFile::fileName_, InputFile::fileSize_, i, prof2calltree::last, LogDebug, read(), and readBlocks_.

Referenced by getNextEvent().

1125 {
1126 
1127  if (fileDescriptor_<0) {
1128  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1129  bufferInputRead_ = 0;
1130  //off_t pos = lseek(fileDescriptor,0,SEEK_SET);
1131  if (fileDescriptor_>=0)
1132  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1133  else
1134  {
1135  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer") << "failed to open file " << std::endl
1136  << file->fileName_ << " fd:" << fileDescriptor_;
1137  }
1138  }
1139 
1140  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1141  uint32_t existingSize = 0;
1142  for (unsigned int i=0;i<readBlocks_;i++)
1143  {
1144  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1145  bufferInputRead_+=last;
1146  existingSize+=last;
1147  }
1148  }
1149  else {
1150  const uint32_t chunksize = file->chunkPosition_;
1151  const uint32_t blockcount=chunksize/eventChunkBlock_;
1152  const uint32_t leftsize = chunksize%eventChunkBlock_;
1153  uint32_t existingSize = eventChunkSize_ - file->chunkPosition_;
1154  memmove((void*) file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSize);
1155 
1156  for (uint32_t i=0;i<blockcount;i++) {
1157  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1158  bufferInputRead_+=last;
1159  existingSize+=last;
1160  }
1161  if (leftsize) {
1162  const ssize_t last = ::read(fileDescriptor_,( void*)( file->chunks_[0]->buf_ + existingSize ), leftsize);
1163  bufferInputRead_+=last;
1164  existingSize+=last;
1165  }
1166  file->chunkPosition_=0;//data was moved to beginning of the chunk
1167  }
1168  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1169  if (fileDescriptor_!=-1)
1170  {
1171  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1172  close(fileDescriptor_);
1173  fileDescriptor_=-1;
1174  }
1175  }
1176 }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
uint32_t chunkPosition_
virtual void read(edm::EventPrincipal &eventPrincipal) override
std::string fileName_
tbb::concurrent_vector< InputChunk * > chunks_
void FedRawDataInputSource::readSupervisor ( )
private

Definition at line 815 of file FedRawDataInputSource.cc.

References InputFile::chunks_, counter, cvReader_, cvWakeup_, daqDirector_, eventChunkSize_, fileQueue_, fms_, freeChunks_, getLSFromFilename_, grabNextJsonFile(), i, InputFile, LogDebug, python.rootplot.utilities::ls(), mReader_, mWakeup_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, cmsHarvester::path, quit_threads_, InputChunk::readComplete_, InputChunk::reset(), evf::EvFDaqDirector::runEnded, edm::shutdown_flag, singleBufferMode_, evf::FastMonitoringService::startedLookingForFile(), startupCv_, startupLock_, ntuplemaker::status, evf::FastMonitoringService::stoppedLookingForFile(), AlCaHLTBitMon_QueryRunRegistry::string, thread_quit_signal, evf::EvFDaqDirector::updateFuLock(), workerJob_, workerPool_, and workerThreads_.

Referenced by checkNextEvent().

816 {
817  bool stop=false;
818  unsigned int currentLumiSection = 0;
819  //threadInit_.exchange(true,std::memory_order_acquire);
820 
821  {
822  std::unique_lock<std::mutex> lk(startupLock_);
823  startupCv_.notify_one();
824  }
825 
826  while (!stop) {
827 
828  //wait for at least one free thread and chunk
829  int counter=0;
830  while ((workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty())
831  {
832  std::unique_lock<std::mutex> lkw(mWakeup_);
833  //sleep until woken up by condition or a timeout
834  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
835  counter++;
836  //if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads...";
837  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
838  }
839  else {
840  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
841  }
842  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {stop=true;break;}
843  }
844 
845  if (stop) break;
846 
847  //look for a new file
848  std::string nextFile;
849  uint32_t ls;
850  uint32_t fileSize;
851 
853 
855 
856  while (status == evf::EvFDaqDirector::noFile) {
857  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
858  stop=true;
859  break;
860  }
861  else
862  status = daqDirector_->updateFuLock(ls,nextFile,fileSize);
863 
864  if ( status == evf::EvFDaqDirector::runEnded) {
866  stop=true;
867  break;
868  }
869 
870  //queue new lumisection
871  if( getLSFromFilename_ && ls > currentLumiSection) {
872  currentLumiSection = ls;
873  fileQueue_.push(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
874  }
875 
876  if( getLSFromFilename_ && currentLumiSection>0 && ls < currentLumiSection) {
877  edm::LogError("FedRawDataInputSource") << "Got old LS ("<<ls<<") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<". Aborting execution."<<std::endl;
878  _exit(-1);
879  }
880 
881  int dbgcount=0;
882  if (status == evf::EvFDaqDirector::noFile) {
883  dbgcount++;
884  if (!(dbgcount%20)) LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
885  usleep(100000);
886  }
887  }
888  if ( status == evf::EvFDaqDirector::newFile ) {
889  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
890 
891 
892  boost::filesystem::path rawFilePath(nextFile);
893  std::string rawFile = rawFilePath.replace_extension(".raw").string();
894 
895  struct stat st;
896  stat(rawFile.c_str(),&st);
897  fileSize=st.st_size;
898 
899  int eventsInNewFile = grabNextJsonFile(nextFile);
900  if (fms_) fms_->stoppedLookingForFile(ls);
901  assert( eventsInNewFile>=0 );
902  assert((eventsInNewFile>0) == (fileSize>0));//file without events must be empty
903 
904  if (!singleBufferMode_) {
905  //calculate number of needed chunks
906  unsigned int neededChunks = fileSize/eventChunkSize_;
907  if (fileSize%eventChunkSize_) neededChunks++;
908 
909  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,this);
910  fileQueue_.push(newInputFile);
911 
912  for (unsigned int i=0;i<neededChunks;i++) {
913 
914  //get thread
915  unsigned int newTid = 0xffffffff;
916  while (!workerPool_.try_pop(newTid)) {
917  usleep(100000);
918  }
919 
920  InputChunk * newChunk = nullptr;
921  while (!freeChunks_.try_pop(newChunk)) {
922  usleep(100000);
923  if (quit_threads_.load(std::memory_order_relaxed)) break;
924  }
925 
926  if (newChunk == nullptr) {
927  //return unused tid if we received shutdown (nullptr chunk)
928  if (newTid!=0xffffffff) workerPool_.push(newTid);
929  stop = true;
930  break;
931  }
932 
933  std::unique_lock<std::mutex> lk(mReader_);
934 
935  unsigned int toRead = eventChunkSize_;
936  if (i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
937  newChunk->reset(i*eventChunkSize_,toRead,i);
938 
939  workerJob_[newTid].first=newInputFile;
940  workerJob_[newTid].second=newChunk;
941 
942  //wake up the worker thread
943  cvReader_[newTid]->notify_one();
944  }
945  }
946  else {
947  if (!eventsInNewFile) {
948  //still queue file for lumi update
949  std::unique_lock<std::mutex> lkw(mWakeup_);
950  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,this);
951  fileQueue_.push(newInputFile);
952  cvWakeup_.notify_one();
953  return;
954  }
955  //in single-buffer mode put single chunk in the file and let the main thread read the file
956  InputChunk * newChunk;
957  //should be available immediately
958  while(!freeChunks_.try_pop(newChunk)) usleep(100000);
959 
960  std::unique_lock<std::mutex> lkw(mWakeup_);
961 
962  unsigned int toRead = eventChunkSize_;
963  if (fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
964  newChunk->reset(0,toRead,0);
965  newChunk->readComplete_=true;
966 
967  //push file and wakeup main thread
968  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,this);
969  newInputFile->chunks_[0]=newChunk;
970  fileQueue_.push(newInputFile);
971  cvWakeup_.notify_one();
972  }
973  }
974  }
975  //make sure threads finish reading
976  unsigned numFinishedThreads = 0;
977  while (numFinishedThreads < workerThreads_.size()) {
978  unsigned int tid;
979  while (!workerPool_.try_pop(tid)) {usleep(10000);}
980  std::unique_lock<std::mutex> lk(mReader_);
981  thread_quit_signal[tid]=true;
982  cvReader_[tid]->notify_one();
983  numFinishedThreads++;
984  }
985  for (unsigned int i=0;i<workerThreads_.size();i++) {
986  workerThreads_[i]->join();
987  delete workerThreads_[i];
988  }
989 }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
std::condition_variable cvWakeup_
tbb::concurrent_queue< unsigned int > workerPool_
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
volatile std::atomic< bool > shutdown_flag
std::vector< std::condition_variable * > cvReader_
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize)
void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex)
std::vector< bool > thread_quit_signal
tuple path
else: Piece not in the list, fine.
int grabNextJsonFile(boost::filesystem::path const &)
std::vector< std::thread * > workerThreads_
std::atomic< bool > readComplete_
tbb::concurrent_queue< InputFile * > fileQueue_
std::condition_variable startupCv_
void stoppedLookingForFile(unsigned int lumi)
tbb::concurrent_vector< InputChunk * > chunks_
evf::EvFDaqDirector * daqDirector_
static std::atomic< unsigned int > counter
evf::FastMonitoringService * fms_
friend class InputFile
Open Root file and provide MEs ############.
tbb::concurrent_queue< InputChunk * > freeChunks_
tuple status
Definition: ntuplemaker.py:245
void FedRawDataInputSource::readWorker ( unsigned int  tid)
private

Definition at line 991 of file FedRawDataInputSource.cc.

References InputChunk::buf_, InputFile::chunks_, cvReader_, detectedFRDversion_, diffTreeTool::diff, end, eventChunkBlock_, mergeVDriftHistosByStation::file, InputChunk::fileIndex_, InputFile::fileName_, first, i, init, prof2calltree::last, LogDebug, mReader_, fileCollector::now, InputChunk::offset_, read(), readBlocks_, InputChunk::readComplete_, edm::second(), setExceptionState_, dqm_diff::start, startupCv_, startupLock_, thread_quit_signal, threadInit_, InputChunk::usedSize_, workerJob_, and workerPool_.

Referenced by FedRawDataInputSource().

992 {
993  bool init = true;
994  threadInit_.exchange(true,std::memory_order_acquire);
995 
996  while (1) {
997 
998  std::unique_lock<std::mutex> lk(mReader_);
999  workerJob_[tid].first=nullptr;
1000  workerJob_[tid].first=nullptr;
1001 
1002  assert(!thread_quit_signal[tid]);//should never get it here
1003  workerPool_.push(tid);
1004 
1005  if (init) {
1006  std::unique_lock<std::mutex> lk(startupLock_);
1007  init = false;
1008  startupCv_.notify_one();
1009  }
1010  cvReader_[tid]->wait(lk);
1011 
1012  if (thread_quit_signal[tid]) return;
1013 
1014  InputFile * file;
1015  InputChunk * chunk;
1016 
1017  assert(workerJob_[tid].first!=nullptr && workerJob_[tid].second!=nullptr);
1018 
1019  file = workerJob_[tid].first;
1020  chunk = workerJob_[tid].second;
1021 
1022  int fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1023  off_t pos = lseek(fileDescriptor,chunk->offset_,SEEK_SET);
1024 
1025 
1026  if (fileDescriptor>=0)
1027  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_ << " at offset " << pos;
1028  else
1029  {
1030  edm::LogError("FedRawDataInputSource") <<
1031  "readWorker failed to open file -: " << file->fileName_ << " fd:" << fileDescriptor <<
1032  " or seek to offset " << chunk->offset_ << ", lseek returned:" << pos;
1033  setExceptionState_=true;
1034  return;
1035 
1036  }
1037 
1038  unsigned int bufferLeft = 0;
1040  for (unsigned int i=0;i<readBlocks_;i++)
1041  {
1042  const ssize_t last = ::read(fileDescriptor,( void*) (chunk->buf_+bufferLeft), eventChunkBlock_);
1043  if ( last > 0 )
1044  bufferLeft+=last;
1045  if (last < eventChunkBlock_) {
1046  assert(chunk->usedSize_==i*eventChunkBlock_+last);
1047  break;
1048  }
1049  }
1051  auto diff = end-start;
1052  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1053  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB" << " in " << msec.count() << " ms ("<< (bufferLeft >> 20)/double(msec.count())<<" GB/s)";
1054  close(fileDescriptor);
1055 
1056  if (detectedFRDversion_==0 && chunk->offset_==0) detectedFRDversion_=*((uint32*)chunk->buf_);
1057  assert(detectedFRDversion_<=3);
1058  chunk->readComplete_=true;//this is atomic to secure the sequential buffer fill before becoming available for processing)
1059  file->chunks_[chunk->fileIndex_]=chunk;//put the completed chunk in the file chunk vector at predetermined index
1060 
1061  }
1062 }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
tuple start
Check for commandline option errors.
Definition: dqm_diff.py:58
virtual void read(edm::EventPrincipal &eventPrincipal) override
unsigned int offset_
tbb::concurrent_queue< unsigned int > workerPool_
int init
Definition: HydjetWrapper.h:62
std::vector< ReaderInfo > workerJob_
std::vector< std::condition_variable * > cvReader_
U second(std::pair< T, U > const &p)
std::vector< bool > thread_quit_signal
unsigned char * buf_
#define end
Definition: vmac.h:37
unsigned int fileIndex_
bool first
Definition: L1TdeRCT.cc:75
unsigned int uint32
Definition: MsgTools.h:13
std::atomic< bool > readComplete_
std::string fileName_
std::condition_variable startupCv_
tbb::concurrent_vector< InputChunk * > chunks_
std::atomic< bool > threadInit_
void FedRawDataInputSource::renameToNextFree ( std::string const &  fileName) const
private

Definition at line 789 of file FedRawDataInputSource.cc.

References daqDirector_, roll_playback::destination, evf::EvFDaqDirector::getJumpFilePath(), cmsHarvester::path, and source.

Referenced by deleteFile().

790 {
793 
794  edm::LogInfo("FedRawDataInputSource") << "Instead of delete, RENAME -: " << fileName
795  << " to: " << destination.string();
796  boost::filesystem::rename(source,destination);
797  boost::filesystem::rename(source.replace_extension(".jsn"),destination.replace_extension(".jsn"));
798 }
string destination
tuple path
else: Piece not in the list, fine.
evf::EvFDaqDirector * daqDirector_
std::string getJumpFilePath() const
static std::string const source
Definition: EdmProvDump.cc:43
void FedRawDataInputSource::rewind_ ( )
overrideprivatevirtual

Reimplemented from edm::RawInputSource.

Definition at line 811 of file FedRawDataInputSource.cc.

812 {}
void FedRawDataInputSource::threadError ( )
private

Definition at line 1064 of file FedRawDataInputSource.cc.

References edm::hlt::Exception, and quit_threads_.

Referenced by InputFile::advance(), and getNextEvent().

1065 {
1066  quit_threads_=true;
1067  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1068 
1069 }
std::atomic< bool > quit_threads_

Friends And Related Function Documentation

friend class InputChunk
friend

Definition at line 46 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

friend class InputFile
friend

Open Root file and provide MEs ############.

Definition at line 45 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

Member Data Documentation

uint32_t FedRawDataInputSource::bufferInputRead_ = 0
private

Definition at line 162 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readNextChunkIntoBuffer().

unsigned int FedRawDataInputSource::checkEvery_ = 10
private

Definition at line 153 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

bool FedRawDataInputSource::chunkIsFree_ =false
private

Definition at line 126 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

InputFile* FedRawDataInputSource::currentFile_ = 0
private

Definition at line 125 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

int FedRawDataInputSource::currentFileIndex_ = -1
private

Definition at line 147 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

unsigned int FedRawDataInputSource::currentLumiSection_
private
std::vector<std::condition_variable*> FedRawDataInputSource::cvReader_
private
std::condition_variable FedRawDataInputSource::cvWakeup_
private

Definition at line 157 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

evf::EvFDaqDirector* FedRawDataInputSource::daqDirector_ = 0
private
const edm::DaqProvenanceHelper FedRawDataInputSource::daqProvenanceHelper_
private

Definition at line 99 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and read().

std::string FedRawDataInputSource::defPath_
private

Definition at line 81 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

uint32 FedRawDataInputSource::detectedFRDversion_ =0
private

Definition at line 124 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readWorker().

jsoncollector::DataPointDefinition* FedRawDataInputSource::dpd_
private

Definition at line 114 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and grabNextJsonFile().

std::unique_ptr<FRDEventMsgView> FedRawDataInputSource::event_
private

Definition at line 101 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), fillFEDRawDataCollection(), and getNextEvent().

unsigned int FedRawDataInputSource::eventChunkBlock_
private
unsigned int FedRawDataInputSource::eventChunkSize_
private
edm::EventID FedRawDataInputSource::eventID_
private

Definition at line 103 of file FedRawDataInputSource.h.

Referenced by read().

uint32_t FedRawDataInputSource::eventRunNumber_ =0
private

Definition at line 107 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and read().

unsigned int FedRawDataInputSource::eventsThisLumi_
private

Definition at line 111 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), getNextEvent(), and read().

unsigned long FedRawDataInputSource::eventsThisRun_ = 0
private

Definition at line 112 of file FedRawDataInputSource.h.

std::mutex FedRawDataInputSource::fileDeleteLock_
private

Definition at line 150 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and read().

int FedRawDataInputSource::fileDescriptor_ = -1
private

Definition at line 161 of file FedRawDataInputSource.h.

Referenced by readNextChunkIntoBuffer().

std::list<std::pair<int,std::string> > FedRawDataInputSource::fileNamesToDelete_
private

Definition at line 149 of file FedRawDataInputSource.h.

tbb::concurrent_queue<InputFile*> FedRawDataInputSource::fileQueue_
private

Definition at line 136 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

std::list<std::pair<int,InputFile*> > FedRawDataInputSource::filesToDelete_
private
evf::FastMonitoringService* FedRawDataInputSource::fms_ = 0
private
tbb::concurrent_queue<InputChunk*> FedRawDataInputSource::freeChunks_
private

Definition at line 135 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), read(), and readSupervisor().

const std::string FedRawDataInputSource::fuOutputDir_
private

Definition at line 97 of file FedRawDataInputSource.h.

Referenced by grabNextJsonFile().

const bool FedRawDataInputSource::getLSFromFilename_
private

Definition at line 90 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), getNextEvent(), and readSupervisor().

uint32_t FedRawDataInputSource::GTPEventID_ = 0
private

Definition at line 108 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

uint32_t FedRawDataInputSource::L1EventID_ = 0
private

Definition at line 109 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and read().

std::mutex FedRawDataInputSource::mReader_
private

Definition at line 138 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), readWorker(), and ~FedRawDataInputSource().

std::mutex FedRawDataInputSource::mWakeup_
private

Definition at line 156 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

unsigned int FedRawDataInputSource::nStreams_ = 0
private

Definition at line 152 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

unsigned int FedRawDataInputSource::numBuffers_
private

Definition at line 86 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

unsigned int FedRawDataInputSource::numConcurrentReads_
private

Definition at line 87 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and ~FedRawDataInputSource().

edm::ProcessHistoryID FedRawDataInputSource::processHistoryID_
private

Definition at line 104 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), maybeOpenNewLumiSection(), and read().

std::atomic<bool> FedRawDataInputSource::quit_threads_
private
unsigned int FedRawDataInputSource::readBlocks_
private
std::unique_ptr<std::thread> FedRawDataInputSource::readSupervisorThread_
private

Definition at line 129 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and ~FedRawDataInputSource().

const edm::RunNumber_t FedRawDataInputSource::runNumber_
private

Definition at line 95 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and postForkReacquireResources().

bool FedRawDataInputSource::setExceptionState_ = false
private

Definition at line 143 of file FedRawDataInputSource.h.

Referenced by exceptionState(), getNextEvent(), and readWorker().

bool FedRawDataInputSource::singleBufferMode_
private

Definition at line 160 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

bool FedRawDataInputSource::startedSupervisorThread_ = false
private

Definition at line 128 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and ~FedRawDataInputSource().

std::condition_variable FedRawDataInputSource::startupCv_
private
std::mutex FedRawDataInputSource::startupLock_
private
std::vector<int>* FedRawDataInputSource::streamFileTrackerPtr_ = 0
private

Definition at line 151 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

unsigned char* FedRawDataInputSource::tcds_pointer_
private

Definition at line 110 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

const bool FedRawDataInputSource::testModeNoBuilderUnit_
private

Definition at line 93 of file FedRawDataInputSource.h.

Referenced by deleteFile(), FedRawDataInputSource(), and grabNextJsonFile().

std::vector<bool> FedRawDataInputSource::thread_quit_signal
private
std::atomic<bool> FedRawDataInputSource::threadInit_
private

Definition at line 164 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and readWorker().

const bool FedRawDataInputSource::useL1EventID_
private

Definition at line 92 of file FedRawDataInputSource.h.

Referenced by read().

const bool FedRawDataInputSource::verifyAdler32_
private

Definition at line 91 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

std::vector<ReaderInfo> FedRawDataInputSource::workerJob_
private

Definition at line 133 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), readSupervisor(), and readWorker().

tbb::concurrent_queue<unsigned int> FedRawDataInputSource::workerPool_
private

Definition at line 132 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), and readWorker().

std::vector<std::thread*> FedRawDataInputSource::workerThreads_
private