CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Protected Member Functions | Private Types | Private Member Functions | Private Attributes | Friends
FedRawDataInputSource Class Reference

#include <FedRawDataInputSource.h>

Inheritance diagram for FedRawDataInputSource:
edm::RawInputSource edm::InputSource edm::ProductRegistryHelper

Public Member Functions

 FedRawDataInputSource (edm::ParameterSet const &, edm::InputSourceDescription const &)
 
virtual ~FedRawDataInputSource ()
 
- Public Member Functions inherited from edm::RawInputSource
 RawInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
 
virtual ~RawInputSource ()
 
- Public Member Functions inherited from edm::InputSource
boost::shared_ptr
< ActivityRegistry
actReg () const
 Accessor for Activity Registry. More...
 
boost::shared_ptr
< BranchIDListHelper
branchIDListHelper () const
 Accessor for branchIDListHelper. More...
 
void closeFile (FileBlock *, bool cleaningUpAfterException)
 close current file More...
 
void doBeginJob ()
 Called by framework at beginning of job. More...
 
void doBeginLumi (LuminosityBlockPrincipal &lbp, ProcessContext const *)
 Called by framework at beginning of lumi block. More...
 
void doBeginRun (RunPrincipal &rp, ProcessContext const *)
 Called by framework at beginning of run. More...
 
void doEndJob ()
 Called by framework at end of job. More...
 
void doEndLumi (LuminosityBlockPrincipal &lbp, bool cleaningUpAfterException, ProcessContext const *)
 Called by framework at end of lumi block. More...
 
void doEndRun (RunPrincipal &rp, bool cleaningUpAfterException, ProcessContext const *)
 Called by framework at end of run. More...
 
void doPostForkReacquireResources (boost::shared_ptr< multicore::MessageReceiverForSource >)
 
void doPreForkReleaseResources ()
 Called by the framework before forking the process. More...
 
ProcessingController::ForwardState forwardState () const
 
bool goToEvent (EventID const &eventID)
 
 InputSource (ParameterSet const &, InputSourceDescription const &)
 Constructor. More...
 
 InputSource (InputSource const &)=delete
 
void issueReports (EventID const &eventID)
 issue an event report More...
 
LuminosityBlockNumber_t luminosityBlock () const
 Accessor for current luminosity block number. More...
 
boost::shared_ptr
< LuminosityBlockAuxiliary
luminosityBlockAuxiliary () const
 Called by the framework to merge or insert lumi in principal cache. More...
 
int maxEvents () const
 
int maxLuminosityBlocks () const
 
ModuleDescription const & moduleDescription () const
 Accessor for 'module' description. More...
 
ItemType nextItemType ()
 Advances the source to the next item. More...
 
InputSourceoperator= (InputSource const &)=delete
 
bool primary () const
 Accessor for primary input source flag. More...
 
ProcessConfiguration const & processConfiguration () const
 Accessor for Process Configuration. More...
 
std::string const & processGUID () const
 Accessor for global process identifier. More...
 
ProcessHistoryRegistry const & processHistoryRegistry () const
 Const accessor for process history registry. More...
 
ProcessHistoryRegistryprocessHistoryRegistryForUpdate ()
 Non-const accessor for process history registry. More...
 
ProcessingMode processingMode () const
 RunsLumisAndEvents (default), RunsAndLumis, or Runs. More...
 
boost::shared_ptr
< ProductRegistry const > 
productRegistry () const
 Accessor for product registry. More...
 
bool randomAccess () const
 
void readAndMergeLumi (LuminosityBlockPrincipal &lbp)
 Read next luminosity block (same as a prior lumi) More...
 
void readAndMergeRun (RunPrincipal &rp)
 Read next run (same as a prior run) More...
 
void readEvent (EventPrincipal &ep, StreamContext &)
 Read next event. More...
 
bool readEvent (EventPrincipal &ep, EventID const &, StreamContext &)
 Read a specific event. More...
 
std::unique_ptr< FileBlockreadFile ()
 Read next file. More...
 
void readLuminosityBlock (LuminosityBlockPrincipal &lumiPrincipal, HistoryAppender &historyAppender)
 Read next luminosity block (new lumi) More...
 
boost::shared_ptr
< LuminosityBlockAuxiliary
readLuminosityBlockAuxiliary ()
 Read next luminosity block Auxilary. More...
 
void readRun (RunPrincipal &runPrincipal, HistoryAppender &historyAppender)
 Read next run (new run) More...
 
boost::shared_ptr< RunAuxiliaryreadRunAuxiliary ()
 Read next run Auxiliary. More...
 
ProcessHistoryID const & reducedProcessHistoryID () const
 
void registerProducts ()
 Register any produced products. More...
 
int remainingEvents () const
 
int remainingLuminosityBlocks () const
 
void repeat ()
 Reset the remaining number of events/lumis to the maximum number. More...
 
SharedResourcesAcquirerresourceSharedWithDelayedReader () const
 Returns nullptr if no resource shared between the Source and a DelayedReader. More...
 
ProcessingController::ReverseState reverseState () const
 
void rewind ()
 Begin again at the first event. More...
 
RunNumber_t run () const
 Accessor for current run number. More...
 
boost::shared_ptr< RunAuxiliaryrunAuxiliary () const
 Called by the framework to merge or insert run in principal cache. More...
 
void setLuminosityBlockNumber_t (LuminosityBlockNumber_t lb)
 Set the luminosity block ID. More...
 
void setRunNumber (RunNumber_t r)
 Set the run number. More...
 
void skipEvents (int offset)
 
bool skipForForking ()
 
Timestamp const & timestamp () const
 Accessor for the current time, as seen by the input source. More...
 
virtual ~InputSource ()
 Destructor. More...
 

Protected Member Functions

virtual bool checkNextEvent () override
 
virtual void read (edm::EventPrincipal &eventPrincipal) override
 
- Protected Member Functions inherited from edm::RawInputSource
void makeEvent (EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
 
void setInputFileTransitionsEachEvent ()
 
- Protected Member Functions inherited from edm::InputSource
void decreaseRemainingEventsBy (int iSkipped)
 
bool eventCached () const
 
boost::shared_ptr
< LuminosityBlockPrincipal >
const 
luminosityBlockPrincipal () const
 
bool newLumi () const
 
bool newRun () const
 
ProcessHistoryRegistryprocessHistoryRegistryUpdate () const
 
ProductRegistryproductRegistryUpdate () const
 
void reset () const
 
void resetEventCached ()
 
void resetLuminosityBlockAuxiliary (bool isNewLumi=true) const
 
void resetNewLumi ()
 
void resetNewRun ()
 
void resetRunAuxiliary (bool isNewRun=true) const
 
boost::shared_ptr
< RunPrincipal > const 
runPrincipal () const
 
void setEventCached ()
 Called by the framework to merge or ached() const {return eventCached_;}. More...
 
void setLuminosityBlockAuxiliary (LuminosityBlockAuxiliary *lbp)
 
void setNewLumi ()
 
void setNewRun ()
 
void setRunAuxiliary (RunAuxiliary *rp)
 
void setTimestamp (Timestamp const &theTime)
 To set the current time, as seen by the input source. More...
 
virtual void skip (int offset)
 
ItemType state () const
 

Private Types

typedef std::pair< InputFile
*, InputChunk * > 
ReaderInfo
 

Private Member Functions

void deleteFile (std::string const &)
 
bool exceptionState ()
 
edm::Timestamp fillFEDRawDataCollection (std::auto_ptr< FEDRawDataCollection > &)
 
evf::EvFDaqDirector::FileStatus getNextEvent ()
 
int grabNextJsonFile (boost::filesystem::path const &)
 
void maybeOpenNewLumiSection (const uint32_t lumiSection)
 
evf::EvFDaqDirector::FileStatus nextEvent ()
 
virtual void postForkReacquireResources (boost::shared_ptr< edm::multicore::MessageReceiverForSource >) override
 
virtual void preForkReleaseResources () override
 
void readNextChunkIntoBuffer (InputFile *file)
 
void readSupervisor ()
 
void readWorker (unsigned int tid)
 
void renameToNextFree (std::string const &fileName) const
 
virtual void rewind_ () override
 
void threadError ()
 

Private Attributes

uint32_t bufferInputRead_ = 0
 
unsigned int checkEvery_ = 10
 
bool chunkIsFree_ =false
 
InputFilecurrentFile_ = nullptr
 
int currentFileIndex_ = -1
 
unsigned int currentLumiSection_
 
std::vector
< std::condition_variable * > 
cvReader_
 
std::condition_variable cvWakeup_
 
evf::EvFDaqDirectordaqDirector_ =nullptr
 
const edm::DaqProvenanceHelper daqProvenanceHelper_
 
std::string defPath_
 
uint32 detectedFRDversion_ =0
 
jsoncollector::DataPointDefinitiondpd_
 
std::unique_ptr< FRDEventMsgViewevent_
 
unsigned int eventChunkBlock_
 
unsigned int eventChunkSize_
 
edm::EventID eventID_
 
uint32_t eventRunNumber_ =0
 
unsigned int eventsThisLumi_
 
unsigned long eventsThisRun_ = 0
 
std::mutex fileDeleteLock_
 
int fileDescriptor_ = -1
 
std::list< std::pair< int,
std::string > > 
fileNamesToDelete_
 
tbb::concurrent_queue
< InputFile * > 
fileQueue_
 
std::list< std::pair< int,
InputFile * > > 
filesToDelete_
 
evf::FastMonitoringServicefms_ =nullptr
 
tbb::concurrent_queue
< InputChunk * > 
freeChunks_
 
const std::string fuOutputDir_
 
const bool getLSFromFilename_
 
uint32_t GTPEventID_ = 0
 
uint32_t L1EventID_ = 0
 
std::mutex mReader_
 
std::mutex mWakeup_
 
unsigned int nStreams_ = 0
 
unsigned int numBuffers_
 
unsigned int numConcurrentReads_
 
edm::ProcessHistoryID processHistoryID_
 
std::atomic< bool > quit_threads_
 
unsigned int readBlocks_
 
std::unique_ptr< std::thread > readSupervisorThread_
 
const edm::RunNumber_t runNumber_
 
bool setExceptionState_ = false
 
bool singleBufferMode_
 
bool startedSupervisorThread_ = false
 
std::condition_variable startupCv_
 
std::mutex startupLock_
 
std::vector< int > * streamFileTrackerPtr_ = nullptr
 
const bool testModeNoBuilderUnit_
 
std::vector< bool > thread_quit_signal
 
std::atomic< bool > threadInit_
 
const bool useL1EventID_
 
const bool verifyAdler32_
 
std::vector< ReaderInfoworkerJob_
 
tbb::concurrent_queue
< unsigned int > 
workerPool_
 
std::vector< std::thread * > workerThreads_
 

Friends

class InputChunk
 
class InputFile
 Open Root file and provide MEs ############. More...
 

Additional Inherited Members

- Public Types inherited from edm::InputSource
enum  ItemType {
  IsInvalid, IsStop, IsFile, IsRun,
  IsLumi, IsEvent, IsRepeat, IsSynchronize
}
 
enum  ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents }
 
typedef
ProductRegistryHelper::TypeLabelList 
TypeLabelList
 
- Static Public Member Functions inherited from edm::RawInputSource
static void fillDescription (ParameterSetDescription &description)
 
- Static Public Member Functions inherited from edm::InputSource
static const std::string & baseType ()
 
static void fillDescription (ParameterSetDescription &desc)
 
static void fillDescriptions (ConfigurationDescriptions &descriptions)
 
static void prevalidate (ConfigurationDescriptions &)
 

Detailed Description

Definition at line 39 of file FedRawDataInputSource.h.

Member Typedef Documentation

Definition at line 117 of file FedRawDataInputSource.h.

Constructor & Destructor Documentation

FedRawDataInputSource::FedRawDataInputSource ( edm::ParameterSet const &  pset,
edm::InputSourceDescription const &  desc 
)
explicit

Definition at line 49 of file FedRawDataInputSource.cc.

References edm::Timestamp::beginOfTime(), cvReader_, daqDirector_, edm::DaqProvenanceHelper::daqInit(), daqProvenanceHelper_, defPath_, dpd_, eventChunkBlock_, eventChunkSize_, edm::hlt::Exception, fileDeleteLock_, filesToDelete_, fms_, freeChunks_, i, InputChunk, edm::Timestamp::invalidTimestamp(), numBuffers_, numConcurrentReads_, processHistoryID_, edm::InputSource::processHistoryRegistryForUpdate(), edm::InputSource::productRegistryUpdate(), quit_threads_, readBlocks_, readWorker(), edm::InputSource::runAuxiliary(), runNumber_, evf::EvFDaqDirector::setDeleteTracking(), evf::EvFDaqDirector::setFMS(), edm::InputSource::setNewRun(), edm::InputSource::setRunAuxiliary(), singleBufferMode_, startupCv_, startupLock_, AlCaHLTBitMon_QueryRunRegistry::string, testModeNoBuilderUnit_, thread_quit_signal, threadInit_, workerJob_, and workerThreads_.

50  :
51  edm::RawInputSource(pset, desc),
52  defPath_(pset.getUntrackedParameter<std::string> ("buDefPath", std::string(getenv("CMSSW_BASE"))+"/src/EventFilter/Utilities/plugins/budef.jsd")),
53  eventChunkSize_(pset.getUntrackedParameter<unsigned int> ("eventChunkSize",16)*1048576),
54  eventChunkBlock_(pset.getUntrackedParameter<unsigned int> ("eventChunkBlock",eventChunkSize_/1048576)*1048576),
55  numBuffers_(pset.getUntrackedParameter<unsigned int> ("numBuffers",1)),
56  getLSFromFilename_(pset.getUntrackedParameter<bool> ("getLSFromFilename", true)),
57  verifyAdler32_(pset.getUntrackedParameter<bool> ("verifyAdler32", true)),
58  useL1EventID_(pset.getUntrackedParameter<bool> ("useL1EventID", false)),
59  testModeNoBuilderUnit_(edm::Service<evf::EvFDaqDirector>()->getTestModeNoBuilderUnit()),
63  eventID_(),
66  eventsThisLumi_(0),
67  dpd_(nullptr)
68 {
69  char thishost[256];
70  gethostname(thishost, 255);
71  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: "
72  << std::endl << (eventChunkSize_/1048576)
73  << " MB on host " << thishost;
75  edm::LogInfo("FedRawDataInputSource") << "Test mode is ON!";
76 
78  setNewRun();
81 
82  dpd_ = new DataPointDefinition();
83  std::string defLabel = "data";
84  DataPointDefinition::getDataPointDefinitionFor(defPath_, dpd_,&defLabel);
85 
86  //make sure that chunk size is N * block size
91 
92  if (!numBuffers_)
93  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") <<
94  "no reading enabled with numBuffers parameter 0";
95 
98 
99  //het handles to DaqDirector and FastMonitoringService because it isn't acessible in readSupervisor thread
100 
101  try {
103  } catch (...){
104  edm::LogWarning("FedRawDataInputSource") << "FastMonitoringService not found";
105  assert(0);//test
106  }
107 
108  try {
110  //set DaqDirector to delete files in preGlobalEndLumi callback
113  if (fms_) daqDirector_->setFMS(fms_);
114  } catch (...){
115  edm::LogWarning("FedRawDataInputSource") << "EvFDaqDirector not found";
116  assert(0);//test
117  }
118 
119  //should delete chunks when run stops
120  for (unsigned int i=0;i<numBuffers_;i++) {
122  }
123 
124  quit_threads_ = false;
125 
126  for (unsigned int i=0;i<numConcurrentReads_;i++)
127  {
128  std::unique_lock<std::mutex> lk(startupLock_);
129  //issue a memory fence here and in threads (constructor was segfaulting without this)
130  thread_quit_signal.push_back(false);
131  workerJob_.push_back(ReaderInfo(nullptr,nullptr));
132  cvReader_.push_back(new std::condition_variable);
133  threadInit_.store(false,std::memory_order_release);
134  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker,this,i));
135  startupCv_.wait(lk);
136  }
137 
138  runAuxiliary()->setProcessHistoryID(processHistoryID_);
139 }
int i
Definition: DBlmapReader.cc:9
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
jsoncollector::DataPointDefinition * dpd_
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
std::vector< std::condition_variable * > cvReader_
static Timestamp beginOfTime()
Definition: Timestamp.h:103
std::vector< bool > thread_quit_signal
const edm::RunNumber_t runNumber_
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
edm::ProcessHistoryID processHistoryID_
const edm::DaqProvenanceHelper daqProvenanceHelper_
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
std::vector< std::thread * > workerThreads_
const std::string fuOutputDir_
std::list< std::pair< int, InputFile * > > filesToDelete_
ProductRegistry & productRegistryUpdate() const
Definition: InputSource.h:347
std::condition_variable startupCv_
boost::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:259
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Non-const accessor for process history registry.
Definition: InputSource.h:174
std::atomic< bool > threadInit_
std::pair< InputFile *, InputChunk * > ReaderInfo
evf::EvFDaqDirector * daqDirector_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:350
void readWorker(unsigned int tid)
evf::FastMonitoringService * fms_
tbb::concurrent_queue< InputChunk * > freeChunks_
void setFMS(evf::FastMonitoringService *fms)
FedRawDataInputSource::~FedRawDataInputSource ( )
virtual

Definition at line 141 of file FedRawDataInputSource.cc.

References cvReader_, deleteFile(), filesToDelete_, i, mReader_, numConcurrentReads_, quit_threads_, readSupervisorThread_, startedSupervisorThread_, thread_quit_signal, and workerThreads_.

142 {
143  quit_threads_=true;
144 
145  //delete any remaining open files
146  for (auto it = filesToDelete_.begin();it!=filesToDelete_.end();it++) {
147  deleteFile(it->second->fileName_);
148  delete it->second;
149  }
151  readSupervisorThread_->join();
152  }
153  else {
154  //join aux threads in case the supervisor thread was not started
155  for (unsigned int i=0;i<workerThreads_.size();i++) {
156  std::unique_lock<std::mutex> lk(mReader_);
157  thread_quit_signal[i]=true;
158  cvReader_[i]->notify_one();
159  lk.unlock();
160  workerThreads_[i]->join();
161  delete workerThreads_[i];
162  }
163  }
164  for (unsigned int i=0;i<numConcurrentReads_;i++) delete cvReader_[i];
165  /*
166  for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
167  InputChunk *ch;
168  while (!freeChunks_.try_pop(ch)) {}
169  delete ch;
170  }
171  */
172 }
int i
Definition: DBlmapReader.cc:9
std::atomic< bool > quit_threads_
std::unique_ptr< std::thread > readSupervisorThread_
std::vector< std::condition_variable * > cvReader_
std::vector< bool > thread_quit_signal
std::vector< std::thread * > workerThreads_
std::list< std::pair< int, InputFile * > > filesToDelete_
void deleteFile(std::string const &)

Member Function Documentation

bool FedRawDataInputSource::checkNextEvent ( )
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 174 of file FedRawDataInputSource.cc.

References currentLumiSection_, daqDirector_, event_, eventRunNumber_, eventsThisLumi_, fms_, newFWLiteAna::found, evf::EvFDaqDirector::getEoLSFilePathOnBU(), evf::EvFDaqDirector::getEoLSFilePathOnFU(), evf::EvFDaqDirector::getEoRFilePathOnFU(), getLSFromFilename_, L1EventID_, evf::EvFDaqDirector::lockFULocal2(), maybeOpenNewLumiSection(), evf::EvFDaqDirector::newLumi, nextEvent(), evf::EvFDaqDirector::noFile, readSupervisor(), readSupervisorThread_, evf::FastMonitoringService::reportEventsThisLumiInSource(), edm::InputSource::resetLuminosityBlockAuxiliary(), evf::EvFDaqDirector::runEnded, edm::InputSource::setEventCached(), startedSupervisorThread_, startupCv_, startupLock_, and AlCaHLTBitMon_QueryRunRegistry::string.

175 {
177  {
178  //this thread opens new files and dispatches reading to worker readers
179  //threadInit_.store(false,std::memory_order_release);
180  std::unique_lock<std::mutex> lk(startupLock_);
181  readSupervisorThread_.reset(new std::thread(&FedRawDataInputSource::readSupervisor,this));
183  startupCv_.wait(lk);
184  }
185  switch (nextEvent() ) {
187 
188  //maybe create EoL file in working directory before ending run
189  struct stat buf;
190  if ( currentLumiSection_ > 0 ) {
191  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
192  if (eolFound) {
194  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
195  if ( !found ) {
197  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
198  close(eol_fd);
200  }
201  }
202  }
203  //also create EoR file in FU data directory
204  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(),&buf) == 0);
205  if (!eorFound) {
206  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
207  close(eor_fd);
208  }
210  eventsThisLumi_=0;
212  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
213  return false;
214  }
216  //this is not reachable
217  return true;
218  }
220  //std::cout << "--------------NEW LUMI---------------" << std::endl;
221  return true;
222  }
223  default: {
224  if (!getLSFromFilename_) {
225  //get new lumi from file header
226  if (event_->lumi() > currentLumiSection_) {
228  eventsThisLumi_=0;
229  maybeOpenNewLumiSection( event_->lumi() );
230  }
231  }
232  eventRunNumber_=event_->run();
233  L1EventID_ = event_->event();
234 
235  setEventCached();
236 
237  return true;
238  }
239  }
240 }
void maybeOpenNewLumiSection(const uint32_t lumiSection)
std::unique_ptr< std::thread > readSupervisorThread_
std::string getEoLSFilePathOnBU(const unsigned int ls) const
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:381
std::unique_ptr< FRDEventMsgView > event_
std::condition_variable startupCv_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:362
evf::EvFDaqDirector * daqDirector_
evf::FastMonitoringService * fms_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
evf::EvFDaqDirector::FileStatus nextEvent()
std::string getEoRFilePathOnFU() const
void FedRawDataInputSource::deleteFile ( std::string const &  fileName)
private

Definition at line 531 of file FedRawDataInputSource.cc.

References cppFunctionSkipper::exception, convertXMLtoSQLite_cfg::fileName, LogDebug, cmsHarvester::path, python.multivaluedict::remove(), renameToNextFree(), and testModeNoBuilderUnit_.

Referenced by getNextEvent(), read(), and ~FedRawDataInputSource().

532 {
533  const boost::filesystem::path filePath(fileName);
534  if (!testModeNoBuilderUnit_) {
535  LogDebug("FedRawDataInputSource") << "Deleting input file -:" << fileName;
536  try {
537  //sometimes this fails but file gets deleted
538  boost::filesystem::remove(filePath);
539  }
540  catch (const boost::filesystem::filesystem_error& ex)
541  {
542  edm::LogError("FedRawDataInputSource") << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
543  << ". Trying again.";
544  usleep(100000);
545  try {
546  boost::filesystem::remove(filePath);
547  }
548  catch (...) {/*file gets deleted first time but exception is still thrown*/}
549  }
550  catch (std::exception& ex)
551  {
552  edm::LogError("FedRawDataInputSource") << " - deleteFile std::exception CAUGHT -: " << ex.what()
553  << ". Trying again.";
554  usleep(100000);
555  try {
556  boost::filesystem::remove(filePath);
557  } catch (...) {/*file gets deleted first time but exception is still thrown*/}
558  }
559  } else {
561  }
562 }
#define LogDebug(id)
tuple path
else: Piece not in the list, fine.
void renameToNextFree(std::string const &fileName) const
bool FedRawDataInputSource::exceptionState ( )
inlineprivate

Definition at line 68 of file FedRawDataInputSource.h.

References setExceptionState_.

Referenced by InputFile::advance(), and getNextEvent().

edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection ( std::auto_ptr< FEDRawDataCollection > &  rawData)
private

Definition at line 621 of file FedRawDataInputSource.cc.

References FEDRawData::data(), event(), event_, fedt_struct::eventsize, evf::evtn::evm_board_sense(), FED_EVSZ_EXTRACT, FED_SOID_EXTRACT, evf::evtn::get(), evf::evtn::getgpshigh(), evf::evtn::getgpslow(), evf::evtn::gtpe_board_sense(), evf::evtn::gtpe_get(), GTPEventID_, FEDNumbering::MINTriggerEGTPFEDID, FEDNumbering::MINTriggerGTPFEDID, FEDRawData::resize(), and fedh_struct::sourceid.

Referenced by read().

622 {
623  edm::Timestamp tstamp;
624  uint32_t eventSize = event_->eventSize();
625  char* event = (char*)event_->payload();
626  GTPEventID_=0;
627 
628  while (eventSize > 0) {
629  eventSize -= sizeof(fedt_t);
630  const fedt_t* fedTrailer = (fedt_t*) (event + eventSize);
631  const uint32_t fedSize = FED_EVSZ_EXTRACT(fedTrailer->eventsize) << 3; //trailer length counts in 8 bytes
632  eventSize -= (fedSize - sizeof(fedh_t));
633  const fedh_t* fedHeader = (fedh_t *) (event + eventSize);
634  const uint16_t fedId = FED_SOID_EXTRACT(fedHeader->sourceid);
635  if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
636  if (evf::evtn::evm_board_sense((unsigned char*) fedHeader,fedSize))
637  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,true);
638  else
639  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,false);
640  //evf::evtn::evm_board_setformat(fedSize);
641  const uint64_t gpsl = evf::evtn::getgpslow((unsigned char*) fedHeader);
642  const uint64_t gpsh = evf::evtn::getgpshigh((unsigned char*) fedHeader);
643  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
644  }
645  //take event ID from GTPE FED
646  if (fedId == FEDNumbering::MINTriggerEGTPFEDID && GTPEventID_==0) {
647  if (evf::evtn::gtpe_board_sense((unsigned char*)fedHeader)) {
648  GTPEventID_ = evf::evtn::gtpe_get((unsigned char*) fedHeader);
649  }
650  }
651  FEDRawData& fedData = rawData->FEDData(fedId);
652  fedData.resize(fedSize);
653  memcpy(fedData.data(), event + eventSize, fedSize);
654  }
655  assert(eventSize == 0);
656 
657  return tstamp;
658 }
unsigned int getgpshigh(const unsigned char *)
bool gtpe_board_sense(const unsigned char *p)
struct fedh_struct fedh_t
unsigned int get(const unsigned char *, bool)
unsigned int sourceid
Definition: fed_header.h:32
void resize(size_t newsize)
Definition: FEDRawData.cc:32
struct fedt_struct fedt_t
#define FED_EVSZ_EXTRACT(a)
Definition: fed_trailer.h:36
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
bool evm_board_sense(const unsigned char *p, size_t size)
unsigned long long uint64_t
Definition: Time.h:15
std::unique_ptr< FRDEventMsgView > event_
unsigned int eventsize
Definition: fed_trailer.h:33
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:28
unsigned int gtpe_get(const unsigned char *)
unsigned int getgpslow(const unsigned char *)
#define FED_SOID_EXTRACT(a)
Definition: fed_header.h:53
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent ( )
inlineprivate

Definition at line 291 of file FedRawDataInputSource.cc.

References InputFile::advance(), bufferInputRead_, InputFile::bufferPosition_, checkEvery_, chunkIsFree_, InputFile::chunkPosition_, InputFile::chunks_, InputFile::currentChunk_, currentFile_, currentFileIndex_, currentLumiSection_, cvWakeup_, daqDirector_, deleteFile(), detectedFRDversion_, event_, eventChunkSize_, eventsThisLumi_, edm::hlt::Exception, exceptionState(), fileDeleteLock_, InputFile::fileName_, fileQueue_, InputFile::fileSize_, filesToDelete_, fms_, freeChunks_, getLSFromFilename_, evf::EvFDaqDirector::getStreamFileTracker(), evf::EvFDaqDirector::isSingleStreamThread(), dttmaxenums::L, InputFile::lumi_, maybeOpenNewLumiSection(), InputFile::moveToPreviousChunk(), mWakeup_, InputFile::nChunks_, InputFile::nEvents_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, InputFile::nProcessed_, nStreams_, InputFile::parent_, readNextChunkIntoBuffer(), evf::FastMonitoringService::reportEventsThisLumiInSource(), InputFile::rewindChunk(), evf::EvFDaqDirector::runEnded, evf::EvFDaqDirector::sameFile, setExceptionState_, singleBufferMode_, ntuplemaker::status, InputFile::status_, streamFileTrackerPtr_, threadError(), evf::EvFDaqDirector::updateFileIndex(), verifyAdler32_, and InputFile::waitForChunk().

Referenced by nextEvent().

292 {
293  const size_t headerSize[4] = {0,2*sizeof(uint32),(4 + 1024) * sizeof(uint32),7*sizeof(uint32)}; //size per version of FRDEventHeader
294 
296  if (!currentFile_)
297  {
298  if (!streamFileTrackerPtr_) {
302  }
303 
305  if (!fileQueue_.try_pop(currentFile_))
306  {
307  //sleep until wakeup (only in single-buffer mode) or timeout
308  std::unique_lock<std::mutex> lkw(mWakeup_);
309  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_)
311  }
312  status = currentFile_->status_;
313  if ( status == evf::EvFDaqDirector::runEnded)
314  {
315  delete currentFile_;
316  currentFile_=nullptr;
317  return status;
318  }
319 
320  else if (status == evf::EvFDaqDirector::newLumi)
321  {
322  if (getLSFromFilename_) {
325  eventsThisLumi_=0;
327  }
328  }
329  else {//let this be picked up from next event
331  }
332 
333  delete currentFile_;
334  currentFile_=nullptr;
335  return status;
336  }
337  else if (status == evf::EvFDaqDirector::newFile) {
340  }
341  else
342  assert(0);
343  }
344 
345  //file is empty
346  if (!currentFile_->fileSize_) {
347  //try to open new lumi
348  assert(currentFile_->nChunks_==0);
349  if (getLSFromFilename_)
352  eventsThisLumi_=0;
354  }
355  //immediately delete empty file
357  delete currentFile_;
358  currentFile_=nullptr;
360  }
361 
362  //file is finished
364  //release last chunk (it is never released elsewhere)
367  {
368  throw cms::Exception("RuntimeError")
369  << "Fully processed " << currentFile_->nProcessed_
370  << " from the file " << currentFile_->fileName_
371  << " but according to BU JSON there should be "
372  << currentFile_->nEvents_ << " events";
373  }
374  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
375  if (singleBufferMode_) {
376  std::unique_lock<std::mutex> lkw(mWakeup_);
377  cvWakeup_.notify_one();
378  }
381  //put the file in pending delete list;
382  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
383  filesToDelete_.push_back(std::pair<int,InputFile*>(currentFileIndex_,currentFile_));
384  }
385  else {
386  //in single-thread and stream jobs, events are already processed
388  delete currentFile_;
389  }
390  currentFile_=nullptr;
392  }
393 
394 
395  //file is too short
397  {
398  throw cms::Exception("FedRawDataInputSource::cacheNextEvent") <<
399  "Premature end of input file while reading event header";
400  }
401  if (singleBufferMode_) {
402 
403  //should already be there
405  usleep(10000);
407  }
408 
409  unsigned char *dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
410 
411  //conditions when read amount is not sufficient for the header to fit
412  if (!bufferInputRead_ || bufferInputRead_ < headerSize[detectedFRDversion_]
413  || eventChunkSize_ - currentFile_->chunkPosition_ < headerSize[detectedFRDversion_])
414  {
416 
417  if (detectedFRDversion_==0) {
418  detectedFRDversion_=*((uint32*)dataPosition);
419  assert(detectedFRDversion_>=1 && detectedFRDversion_<=3);
420  }
421 
422  //recalculate chunk position
423  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
424  if ( bufferInputRead_ < headerSize[detectedFRDversion_])
425  {
426  throw cms::Exception("FedRawDataInputSource::cacheNextEvent") <<
427  "Premature end of input file while reading event header";
428  }
429  }
430 
431  event_.reset( new FRDEventMsgView(dataPosition) );
432  if (event_->size()>eventChunkSize_) {
433  throw cms::Exception("FedRawDataInputSource::nextEvent")
434  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
435  << " run:" << event_->run() << " of size:" << event_->size()
436  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
437  }
438 
439  const uint32_t msgSize = event_->size()-headerSize[detectedFRDversion_];
440 
442  {
443  throw cms::Exception("FedRawDataInputSource::nextEvent") <<
444  "Premature end of input file while reading event data";
445  }
446  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
448  //recalculate chunk position
449  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
450  event_.reset( new FRDEventMsgView(dataPosition) );
451  }
452  currentFile_->bufferPosition_ += event_->size();
453  currentFile_->chunkPosition_ += event_->size();
454  //last chunk is released when this function is invoked next time
455 
456  }
457  //multibuffer mode:
458  else
459  {
460  //wait for the current chunk to become added to the vector
462  usleep(10000);
464  }
465 
466  //check if header is at the boundary of two chunks
467  chunkIsFree_ = false;
468  unsigned char *dataPosition;
469 
470  //read header, copy it to a single chunk if necessary
471  bool chunkEnd = currentFile_->advance(dataPosition,headerSize[detectedFRDversion_]);
472 
473  event_.reset( new FRDEventMsgView(dataPosition) );
474  if (event_->size()>eventChunkSize_) {
475  throw cms::Exception("FedRawDataInputSource::nextEvent")
476  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
477  << " run:" << event_->run() << " of size:" << event_->size()
478  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
479  }
480 
481  const uint32_t msgSize = event_->size()-headerSize[detectedFRDversion_];
482 
484  {
485  throw cms::Exception("FedRawDataInputSource::nextEvent") <<
486  "Premature end of input file while reading event data";
487  }
488 
489  if (chunkEnd) {
490  //header was at the chunk boundary, we will have to move payload as well
491  currentFile_->moveToPreviousChunk(msgSize,headerSize[detectedFRDversion_]);
492  chunkIsFree_ = true;
493  }
494  else {
495  //header was contiguous, but check if payload fits the chunk
496  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
497  //rewind to header start position
498  currentFile_->rewindChunk(headerSize[detectedFRDversion_]);
499  //copy event to a chunk start and move pointers
500  chunkEnd = currentFile_->advance(dataPosition,headerSize[detectedFRDversion_]+msgSize);
501  assert(chunkEnd);
502  chunkIsFree_=true;
503  //header is moved
504  event_.reset( new FRDEventMsgView(dataPosition) );
505  }
506  else {
507  //everything is in a single chunk, only move pointers forward
508  chunkEnd = currentFile_->advance(dataPosition,msgSize);
509  assert(!chunkEnd);
510  chunkIsFree_=false;
511  }
512  }
513  }//end multibuffer mode
514 
515  if ( verifyAdler32_ && event_->version() >= 3 )
516  {
517  uint32_t adler = adler32(0L,Z_NULL,0);
518  adler = adler32(adler,(Bytef*)event_->payload(),event_->eventSize());
519 
520  if ( adler != event_->adler32() ) {
521  throw cms::Exception("FedRawDataInputSource::nextEvent") <<
522  "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() <<
523  " but calculated 0x" << adler;
524  }
525  }
527 
529 }
unsigned int lumi_
uint32_t chunkPosition_
std::condition_variable cvWakeup_
std::vector< int > * getStreamFileTracker()
void maybeOpenNewLumiSection(const uint32_t lumiSection)
void rewindChunk(const size_t size)
std::vector< int > * streamFileTrackerPtr_
evf::EvFDaqDirector::FileStatus status_
FedRawDataInputSource * parent_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
uint32_t bufferPosition_
void updateFileIndex(int const &fileIndex)
bool advance(unsigned char *&dataPosition, const size_t size)
void moveToPreviousChunk(const size_t size, const size_t offset)
bool waitForChunk(unsigned int chunkid)
unsigned int uint32
Definition: MsgTools.h:13
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
unsigned int currentChunk_
tbb::concurrent_queue< InputFile * > fileQueue_
std::unique_ptr< FRDEventMsgView > event_
std::string fileName_
tbb::concurrent_vector< InputChunk * > chunks_
evf::EvFDaqDirector * daqDirector_
void deleteFile(std::string const &)
evf::FastMonitoringService * fms_
tbb::concurrent_queue< InputChunk * > freeChunks_
tuple status
Definition: ntuplemaker.py:245
void readNextChunkIntoBuffer(InputFile *file)
unsigned int nEvents_
int FedRawDataInputSource::grabNextJsonFile ( boost::filesystem::path const &  jsonSourcePath)
private

Definition at line 660 of file FedRawDataInputSource.cc.

References filterCSVwithJSON::copy, daqDirector_, data, jsoncollector::DataPoint::deserialize(), reco::dp, dpd_, alignCSCRings::e, cppFunctionSkipper::exception, edm::hlt::Exception, fuOutputDir_, jsoncollector::DataPoint::getData(), jsoncollector::DataPointDefinition::getNames(), i, LogDebug, Json::Reader::parse(), cmsHarvester::path, matplotRender::reader, python.multivaluedict::remove(), AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, testModeNoBuilderUnit_, and evf::EvFDaqDirector::unlockFULocal().

Referenced by readSupervisor().

661 {
663  try {
664  // assemble json destination path
666 
667  //TODO:should be ported to use fffnaming
668  std::ostringstream fileNameWithPID;
669  fileNameWithPID << jsonSourcePath.stem().string() << "_pid"
670  << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
671  jsonDestPath /= fileNameWithPID.str();
672 
673  LogDebug("FedRawDataInputSource") << "JSON rename -: " << jsonSourcePath << " to "
674  << jsonDestPath;
675 
677  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
678  else {
679  try {
680  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
681  }
682  catch (const boost::filesystem::filesystem_error& ex)
683  {
684  // Input dir gone?
685  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
686  // << " Maybe the file is not yet visible by FU. Trying again in one second";
687  sleep(1);
688  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
689  }
691 
692 
693  try {
694  //sometimes this fails but file gets deleted
695  boost::filesystem::remove(jsonSourcePath);
696  }
697  catch (const boost::filesystem::filesystem_error& ex)
698  {
699  // Input dir gone?
700  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
701  }
702  catch (std::exception& ex)
703  {
704  // Input dir gone?
705  edm::LogError("FedRawDataInputSource") << "grabNextFile std::exception CAUGHT -: " << ex.what();
706  }
707 
708  }
709 
710  boost::filesystem::ifstream ij(jsonDestPath);
711  Json::Value deserializeRoot;
713 
714  if (!reader.parse(ij, deserializeRoot))
715  throw std::runtime_error("Cannot deserialize input JSON file");
716 
717  //read BU JSON
719  DataPoint dp;
720  dp.deserialize(deserializeRoot);
721  bool success = false;
722  for (unsigned int i=0;i<dpd_->getNames().size();i++) {
723  if (dpd_->getNames().at(i)=="NEvents")
724  if (i<dp.getData().size()) {
725  data = dp.getData()[i];
726  success=true;
727  }
728  }
729  if (!success) {
730  if (dp.getData().size())
731  data = dp.getData()[0];
732  else
733  throw cms::Exception("FedRawDataInputSource::grabNextJsonFile") <<
734  " error reading number of events from BU JSON -: No input value " << data;
735  }
736  return boost::lexical_cast<int>(data);
737 
738  }
739  catch (const boost::filesystem::filesystem_error& ex)
740  {
741  // Input dir gone?
743  edm::LogError("FedRawDataInputSource") << "grabNextFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
744  << " - Maybe the BU run dir disappeared? Ending process with code 0...";
745  _exit(-1);
746  }
747  catch (std::runtime_error e)
748  {
749  // Another process grabbed the file and NFS did not register this
751  edm::LogError("FedRawDataInputSource") << "grabNextFile - runtime Exception -: " << e.what();
752  }
753 
754  catch( boost::bad_lexical_cast const& ) {
755  edm::LogError("FedRawDataInputSource") << "grabNextFile - error parsing number of events from BU JSON. "
756  << "Input value is -: " << data;
757  }
758 
759  catch (std::exception e)
760  {
761  // BU run directory disappeared?
763  edm::LogError("FedRawDataInputSource") << "grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
764  }
765 
766  return -1;
767 }
#define LogDebug(id)
std::vector< std::string > & getData()
Definition: DataPoint.h:58
int i
Definition: DBlmapReader.cc:9
jsoncollector::DataPointDefinition * dpd_
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
Represents a JSON value.
Definition: value.h:111
tuple path
else: Piece not in the list, fine.
virtual void deserialize(Json::Value &root)
Definition: DataPoint.cc:56
const std::string fuOutputDir_
auto dp
Definition: deltaR.h:24
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
evf::EvFDaqDirector * daqDirector_
Unserialize a JSON document into a Value.
Definition: reader.h:16
std::vector< std::string > const & getNames()
void FedRawDataInputSource::maybeOpenNewLumiSection ( const uint32_t  lumiSection)
private

Definition at line 242 of file FedRawDataInputSource.cc.

References currentLumiSection_, daqDirector_, newFWLiteAna::found, evf::EvFDaqDirector::getEoLSFilePathOnFU(), edm::Timestamp::invalidTimestamp(), evf::EvFDaqDirector::lockFULocal2(), edm::InputSource::luminosityBlock(), edm::InputSource::luminosityBlockAuxiliary(), processHistoryID_, edm::InputSource::resetLuminosityBlockAuxiliary(), edm::InputSource::run(), edm::InputSource::runAuxiliary(), edm::InputSource::setLuminosityBlockAuxiliary(), AlCaHLTBitMon_QueryRunRegistry::string, and evf::EvFDaqDirector::unlockFULocal2().

Referenced by checkNextEvent(), and getNextEvent().

243 {
245  || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
246 
247  if ( currentLumiSection_ > 0 ) {
248  const std::string fuEoLS =
250  struct stat buf;
251  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
252  if ( !found ) {
254  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
255  close(eol_fd);
257  }
258  }
259 
260  currentLumiSection_ = lumiSection;
261 
263 
264  timeval tv;
265  gettimeofday(&tv, 0);
266  const edm::Timestamp lsopentime( (unsigned long long) tv.tv_sec * 1000000 + (unsigned long long) tv.tv_usec );
267 
268  edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary =
270  runAuxiliary()->run(),
271  lumiSection, lsopentime,
273 
274  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
275  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
276 
277  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: "<< lumiSection;
278  }
279 }
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
boost::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:262
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:595
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:354
edm::ProcessHistoryID processHistoryID_
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:601
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:362
boost::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:259
evf::EvFDaqDirector * daqDirector_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::nextEvent ( )
inlineprivate

Definition at line 281 of file FedRawDataInputSource.cc.

References getNextEvent(), svgfig::load(), evf::EvFDaqDirector::noFile, edm::shutdown_flag, and ntuplemaker::status.

Referenced by checkNextEvent().

282 {
284  while ((status = getNextEvent())==evf::EvFDaqDirector::noFile)
285  {
286  if (edm::shutdown_flag.load(std::memory_order_relaxed)) break;
287  }
288  return status;
289 }
volatile std::atomic< bool > shutdown_flag
def load
Definition: svgfig.py:546
evf::EvFDaqDirector::FileStatus getNextEvent()
tuple status
Definition: ntuplemaker.py:245
void FedRawDataInputSource::postForkReacquireResources ( boost::shared_ptr< edm::multicore::MessageReceiverForSource )
overrideprivatevirtual

Definition at line 783 of file FedRawDataInputSource.cc.

References edm::Timestamp::beginOfTime(), edm::Timestamp::invalidTimestamp(), runNumber_, and edm::InputSource::setRunAuxiliary().

784 {
785  InputSource::rewind();
789 }
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
static Timestamp beginOfTime()
Definition: Timestamp.h:103
const edm::RunNumber_t runNumber_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:350
void FedRawDataInputSource::preForkReleaseResources ( )
overrideprivatevirtual

Reimplemented from edm::RawInputSource.

Definition at line 780 of file FedRawDataInputSource.cc.

781 {}
void FedRawDataInputSource::read ( edm::EventPrincipal eventPrincipal)
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 565 of file FedRawDataInputSource.cc.

References printConversionInfo::aux, edm::DaqProvenanceHelper::branchDescription(), checkEvery_, chunkIsFree_, InputFile::chunks_, InputFile::currentChunk_, currentFile_, currentLumiSection_, daqProvenanceHelper_, deleteFile(), edm::DaqProvenanceHelper::dummyProvenance(), eventID_, eventRunNumber_, eventsThisLumi_, fileDeleteLock_, filesToDelete_, fillFEDRawDataCollection(), freeChunks_, GTPEventID_, i, L1EventID_, edm::RawInputSource::makeEvent(), InputFile::nProcessed_, nStreams_, edm::EventAuxiliary::PhysicsTrigger, edm::InputSource::processGUID(), processHistoryID_, edm::EventPrincipal::put(), edm::EventAuxiliary::setProcessHistoryID(), streamFileTrackerPtr_, and useL1EventID_.

Referenced by Vispa.Plugins.EdmBrowser.EdmDataAccessor.EdmDataAccessor::goto(), readNextChunkIntoBuffer(), readWorker(), and Vispa.Plugins.EdmBrowser.EdmDataAccessor.EdmDataAccessor::setFilterBranches().

566 {
567  std::auto_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
568  edm::Timestamp tstamp = fillFEDRawDataCollection(rawData);
569 
570  if (useL1EventID_)
572  else {
573  assert(GTPEventID_);
575  }
576 
577  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
579  aux.setProcessHistoryID(processHistoryID_);
580  makeEvent(eventPrincipal, aux);
581 
584 
585  //FWCore/Sources DaqProvenanceHelper before 7_1_0_pre3
586  //eventPrincipal.put(daqProvenanceHelper_.constBranchDescription_, edp,
587  // daqProvenanceHelper_.dummyProvenance_);
588 
589  eventPrincipal.put(daqProvenanceHelper_.branchDescription(), edp,
591 
592  eventsThisLumi_++;
593 
594  //this old file check runs no more often than every 10 events
595  if (!((currentFile_->nProcessed_-1)%(checkEvery_))) {
596  //delete files that are not in processing
597  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
598  auto it = filesToDelete_.begin();
599  while (it!=filesToDelete_.end()) {
600  bool fileIsBeingProcessed = false;
601  for (unsigned int i=0;i<nStreams_;i++) {
602  if (it->first == streamFileTrackerPtr_->at(i)) {
603  fileIsBeingProcessed = true;
604  break;
605  }
606  }
607  if (!fileIsBeingProcessed) {
608  deleteFile(it->second->fileName_);
609  delete it->second;
610  it = filesToDelete_.erase(it);
611  }
612  else it++;
613  }
614 
615  }
617  chunkIsFree_=false;
618  return;
619 }
int i
Definition: DBlmapReader.cc:9
std::vector< int > * streamFileTrackerPtr_
ProductProvenance const & dummyProvenance() const
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:214
BranchDescription const & branchDescription() const
edm::ProcessHistoryID processHistoryID_
const edm::DaqProvenanceHelper daqProvenanceHelper_
edm::Timestamp fillFEDRawDataCollection(std::auto_ptr< FEDRawDataCollection > &)
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
unsigned int currentChunk_
tbb::concurrent_vector< InputChunk * > chunks_
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
void deleteFile(std::string const &)
tbb::concurrent_queue< InputChunk * > freeChunks_
void put(BranchDescription const &bd, WrapperOwningHolder const &edp, ProductProvenance const &productProvenance)
void FedRawDataInputSource::readNextChunkIntoBuffer ( InputFile file)
private

Definition at line 1104 of file FedRawDataInputSource.cc.

References bufferInputRead_, InputFile::chunkPosition_, InputFile::chunks_, eventChunkBlock_, eventChunkSize_, edm::hlt::Exception, fileDescriptor_, InputFile::fileName_, InputFile::fileSize_, i, prof2calltree::last, LogDebug, read(), and readBlocks_.

Referenced by getNextEvent().

1105 {
1106 
1107  if (fileDescriptor_<0) {
1108  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1109  bufferInputRead_ = 0;
1110  //off_t pos = lseek(fileDescriptor,0,SEEK_SET);
1111  if (fileDescriptor_>=0)
1112  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1113  else
1114  {
1115  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer") << "failed to open file " << std::endl
1116  << file->fileName_ << " fd:" << fileDescriptor_;
1117  }
1118  }
1119 
1120  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1121  uint32_t existingSize = 0;
1122  for (unsigned int i=0;i<readBlocks_;i++)
1123  {
1124  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1125  bufferInputRead_+=last;
1126  existingSize+=last;
1127  }
1128  }
1129  else {
1130  const uint32_t chunksize = file->chunkPosition_;
1131  const uint32_t blockcount=chunksize/eventChunkBlock_;
1132  const uint32_t leftsize = chunksize%eventChunkBlock_;
1133  uint32_t existingSize = eventChunkSize_ - file->chunkPosition_;
1134  memmove((void*) file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSize);
1135 
1136  for (uint32_t i=0;i<blockcount;i++) {
1137  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1138  bufferInputRead_+=last;
1139  existingSize+=last;
1140  }
1141  if (leftsize) {
1142  const ssize_t last = ::read(fileDescriptor_,( void*)( file->chunks_[0]->buf_ + existingSize ), leftsize);
1143  bufferInputRead_+=last;
1144  existingSize+=last;
1145  }
1146  file->chunkPosition_=0;//data was moved to beginning of the chunk
1147  }
1148  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1149  if (fileDescriptor_!=-1)
1150  {
1151  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1152  close(fileDescriptor_);
1153  fileDescriptor_=-1;
1154  }
1155  }
1156 }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
uint32_t chunkPosition_
virtual void read(edm::EventPrincipal &eventPrincipal) override
std::string fileName_
tbb::concurrent_vector< InputChunk * > chunks_
void FedRawDataInputSource::readSupervisor ( )
private

Definition at line 795 of file FedRawDataInputSource.cc.

References InputFile::chunks_, counter, cvReader_, cvWakeup_, daqDirector_, eventChunkSize_, fileQueue_, fms_, freeChunks_, getLSFromFilename_, grabNextJsonFile(), i, InputFile, LogDebug, python.rootplot.utilities::ls(), mReader_, mWakeup_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, cmsHarvester::path, quit_threads_, InputChunk::readComplete_, InputChunk::reset(), evf::EvFDaqDirector::runEnded, edm::shutdown_flag, singleBufferMode_, evf::FastMonitoringService::startedLookingForFile(), startupCv_, startupLock_, ntuplemaker::status, evf::FastMonitoringService::stoppedLookingForFile(), AlCaHLTBitMon_QueryRunRegistry::string, thread_quit_signal, evf::EvFDaqDirector::updateFuLock(), workerJob_, workerPool_, and workerThreads_.

Referenced by checkNextEvent().

796 {
797  bool stop=false;
798  unsigned int currentLumiSection = 0;
799  //threadInit_.exchange(true,std::memory_order_acquire);
800 
801  {
802  std::unique_lock<std::mutex> lk(startupLock_);
803  startupCv_.notify_one();
804  }
805 
806  while (!stop) {
807 
808  //wait for at least one free thread and chunk
809  int counter=0;
810  while ((workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty())
811  {
812  std::unique_lock<std::mutex> lkw(mWakeup_);
813  //sleep until woken up by condition or a timeout
814  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
815  counter++;
816  //if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads...";
817  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
818  }
819  else {
820  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
821  }
822  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {stop=true;break;}
823  }
824 
825  if (stop) break;
826 
827  //look for a new file
828  std::string nextFile;
829  uint32_t ls;
830  uint32_t fileSize;
831 
833 
835 
836  while (status == evf::EvFDaqDirector::noFile) {
837  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
838  stop=true;
839  break;
840  }
841  else
842  status = daqDirector_->updateFuLock(ls,nextFile,fileSize);
843 
844  if ( status == evf::EvFDaqDirector::runEnded) {
846  stop=true;
847  break;
848  }
849 
850  //queue new lumisection
851  if( getLSFromFilename_ && ls > currentLumiSection) {
852  currentLumiSection = ls;
853  fileQueue_.push(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
854  }
855 
856  if( getLSFromFilename_ && currentLumiSection>0 && ls < currentLumiSection) {
857  edm::LogError("FedRawDataInputSource") << "Got old LS ("<<ls<<") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<". Aborting execution."<<std::endl;
858  _exit(-1);
859  }
860 
861  int dbgcount=0;
862  if (status == evf::EvFDaqDirector::noFile) {
863  dbgcount++;
864  if (!(dbgcount%20)) LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
865  usleep(100000);
866  }
867  }
868  if ( status == evf::EvFDaqDirector::newFile ) {
869  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
870 
871 
872  boost::filesystem::path rawFilePath(nextFile);
873  std::string rawFile = rawFilePath.replace_extension(".raw").string();
874 
875  struct stat st;
876  stat(rawFile.c_str(),&st);
877  fileSize=st.st_size;
878 
879  int eventsInNewFile = grabNextJsonFile(nextFile);
880  if (fms_) fms_->stoppedLookingForFile(ls);
881  assert( eventsInNewFile>=0 );
882  assert((eventsInNewFile>0) == (fileSize>0));//file without events must be empty
883 
884  if (!singleBufferMode_) {
885  //calculate number of needed chunks
886  unsigned int neededChunks = fileSize/eventChunkSize_;
887  if (fileSize%eventChunkSize_) neededChunks++;
888 
889  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,this);
890  fileQueue_.push(newInputFile);
891 
892  for (unsigned int i=0;i<neededChunks;i++) {
893 
894  //get thread
895  unsigned int newTid = 0xffffffff;
896  while (!workerPool_.try_pop(newTid)) {
897  usleep(100000);
898  }
899 
900  InputChunk * newChunk = nullptr;
901  while (!freeChunks_.try_pop(newChunk)) {
902  usleep(100000);
903  if (quit_threads_.load(std::memory_order_relaxed)) break;
904  }
905 
906  if (newChunk == nullptr) {
907  //return unused tid if we received shutdown (nullptr chunk)
908  if (newTid!=0xffffffff) workerPool_.push(newTid);
909  stop = true;
910  break;
911  }
912 
913  std::unique_lock<std::mutex> lk(mReader_);
914 
915  unsigned int toRead = eventChunkSize_;
916  if (i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
917  newChunk->reset(i*eventChunkSize_,toRead,i);
918 
919  workerJob_[newTid].first=newInputFile;
920  workerJob_[newTid].second=newChunk;
921 
922  //wake up the worker thread
923  cvReader_[newTid]->notify_one();
924  }
925  }
926  else {
927  if (!eventsInNewFile) {
928  //still queue file for lumi update
929  std::unique_lock<std::mutex> lkw(mWakeup_);
930  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,this);
931  fileQueue_.push(newInputFile);
932  cvWakeup_.notify_one();
933  return;
934  }
935  //in single-buffer mode put single chunk in the file and let the main thread read the file
936  InputChunk * newChunk;
937  //should be available immediately
938  while(!freeChunks_.try_pop(newChunk)) usleep(100000);
939 
940  std::unique_lock<std::mutex> lkw(mWakeup_);
941 
942  unsigned int toRead = eventChunkSize_;
943  if (fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
944  newChunk->reset(0,toRead,0);
945  newChunk->readComplete_=true;
946 
947  //push file and wakeup main thread
948  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,this);
949  newInputFile->chunks_[0]=newChunk;
950  fileQueue_.push(newInputFile);
951  cvWakeup_.notify_one();
952  }
953  }
954  }
955  //make sure threads finish reading
956  unsigned numFinishedThreads = 0;
957  while (numFinishedThreads < workerThreads_.size()) {
958  unsigned int tid;
959  while (!workerPool_.try_pop(tid)) {usleep(10000);}
960  std::unique_lock<std::mutex> lk(mReader_);
961  thread_quit_signal[tid]=true;
962  cvReader_[tid]->notify_one();
963  numFinishedThreads++;
964  }
965  for (unsigned int i=0;i<workerThreads_.size();i++) {
966  workerThreads_[i]->join();
967  delete workerThreads_[i];
968  }
969 }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
std::condition_variable cvWakeup_
tbb::concurrent_queue< unsigned int > workerPool_
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
volatile std::atomic< bool > shutdown_flag
std::vector< std::condition_variable * > cvReader_
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize)
void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex)
std::vector< bool > thread_quit_signal
tuple path
else: Piece not in the list, fine.
int grabNextJsonFile(boost::filesystem::path const &)
std::vector< std::thread * > workerThreads_
std::atomic< bool > readComplete_
tbb::concurrent_queue< InputFile * > fileQueue_
std::condition_variable startupCv_
void stoppedLookingForFile(unsigned int lumi)
tbb::concurrent_vector< InputChunk * > chunks_
evf::EvFDaqDirector * daqDirector_
static std::atomic< unsigned int > counter
evf::FastMonitoringService * fms_
friend class InputFile
Open Root file and provide MEs ############.
tbb::concurrent_queue< InputChunk * > freeChunks_
tuple status
Definition: ntuplemaker.py:245
void FedRawDataInputSource::readWorker ( unsigned int  tid)
private

Definition at line 971 of file FedRawDataInputSource.cc.

References InputChunk::buf_, InputFile::chunks_, cvReader_, detectedFRDversion_, diffTreeTool::diff, end, eventChunkBlock_, mergeVDriftHistosByStation::file, InputChunk::fileIndex_, InputFile::fileName_, first, i, init, prof2calltree::last, LogDebug, mReader_, fileCollector::now, InputChunk::offset_, read(), readBlocks_, InputChunk::readComplete_, edm::second(), setExceptionState_, dqm_diff::start, startupCv_, startupLock_, thread_quit_signal, threadInit_, InputChunk::usedSize_, workerJob_, and workerPool_.

Referenced by FedRawDataInputSource().

972 {
973  bool init = true;
974  threadInit_.exchange(true,std::memory_order_acquire);
975 
976  while (1) {
977 
978  std::unique_lock<std::mutex> lk(mReader_);
979  workerJob_[tid].first=nullptr;
980  workerJob_[tid].first=nullptr;
981 
982  assert(!thread_quit_signal[tid]);//should never get it here
983  workerPool_.push(tid);
984 
985  if (init) {
986  std::unique_lock<std::mutex> lk(startupLock_);
987  init = false;
988  startupCv_.notify_one();
989  }
990  cvReader_[tid]->wait(lk);
991 
992  if (thread_quit_signal[tid]) return;
993 
994  InputFile * file;
995  InputChunk * chunk;
996 
997  assert(workerJob_[tid].first!=nullptr && workerJob_[tid].second!=nullptr);
998 
999  file = workerJob_[tid].first;
1000  chunk = workerJob_[tid].second;
1001 
1002  int fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1003  off_t pos = lseek(fileDescriptor,chunk->offset_,SEEK_SET);
1004 
1005 
1006  if (fileDescriptor>=0)
1007  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_ << " at offset " << pos;
1008  else
1009  {
1010  edm::LogError("FedRawDataInputSource") <<
1011  "readWorker failed to open file -: " << file->fileName_ << " fd:" << fileDescriptor <<
1012  " or seek to offset " << chunk->offset_ << ", lseek returned:" << pos;
1013  setExceptionState_=true;
1014  return;
1015 
1016  }
1017 
1018  unsigned int bufferLeft = 0;
1020  for (unsigned int i=0;i<readBlocks_;i++)
1021  {
1022  const ssize_t last = ::read(fileDescriptor,( void*) (chunk->buf_+bufferLeft), eventChunkBlock_);
1023  if ( last > 0 )
1024  bufferLeft+=last;
1025  if (last < eventChunkBlock_) {
1026  assert(chunk->usedSize_==i*eventChunkBlock_+last);
1027  break;
1028  }
1029  }
1031  auto diff = end-start;
1032  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1033  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB" << " in " << msec.count() << " ms ("<< (bufferLeft >> 20)/double(msec.count())<<" GB/s)";
1034  close(fileDescriptor);
1035 
1036  if (detectedFRDversion_==0 && chunk->offset_==0) detectedFRDversion_=*((uint32*)chunk->buf_);
1037  assert(detectedFRDversion_<=3);
1038  chunk->readComplete_=true;//this is atomic to secure the sequential buffer fill before becoming available for processing)
1039  file->chunks_[chunk->fileIndex_]=chunk;//put the completed chunk in the file chunk vector at predetermined index
1040 
1041  }
1042 }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
tuple start
Check for commandline option errors.
Definition: dqm_diff.py:58
virtual void read(edm::EventPrincipal &eventPrincipal) override
unsigned int offset_
tbb::concurrent_queue< unsigned int > workerPool_
int init
Definition: HydjetWrapper.h:62
std::vector< ReaderInfo > workerJob_
std::vector< std::condition_variable * > cvReader_
U second(std::pair< T, U > const &p)
std::vector< bool > thread_quit_signal
unsigned char * buf_
#define end
Definition: vmac.h:37
unsigned int fileIndex_
bool first
Definition: L1TdeRCT.cc:75
unsigned int uint32
Definition: MsgTools.h:13
std::atomic< bool > readComplete_
std::string fileName_
std::condition_variable startupCv_
tbb::concurrent_vector< InputChunk * > chunks_
std::atomic< bool > threadInit_
void FedRawDataInputSource::renameToNextFree ( std::string const &  fileName) const
private

Definition at line 769 of file FedRawDataInputSource.cc.

References daqDirector_, evf::EvFDaqDirector::getJumpFilePath(), cmsHarvester::path, and source.

Referenced by deleteFile().

770 {
773 
774  edm::LogInfo("FedRawDataInputSource") << "Instead of delete, RENAME -: " << fileName
775  << " to: " << destination.string();
776  boost::filesystem::rename(source,destination);
777  boost::filesystem::rename(source.replace_extension(".jsn"),destination.replace_extension(".jsn"));
778 }
tuple path
else: Piece not in the list, fine.
evf::EvFDaqDirector * daqDirector_
std::string getJumpFilePath() const
static std::string const source
Definition: EdmProvDump.cc:43
void FedRawDataInputSource::rewind_ ( )
overrideprivatevirtual

Reimplemented from edm::RawInputSource.

Definition at line 791 of file FedRawDataInputSource.cc.

792 {}
void FedRawDataInputSource::threadError ( )
private

Definition at line 1044 of file FedRawDataInputSource.cc.

References edm::hlt::Exception, and quit_threads_.

Referenced by InputFile::advance(), and getNextEvent().

1045 {
1046  quit_threads_=true;
1047  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1048 
1049 }
std::atomic< bool > quit_threads_

Friends And Related Function Documentation

friend class InputChunk
friend

Definition at line 42 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

friend class InputFile
friend

Open Root file and provide MEs ############.

Definition at line 41 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

Member Data Documentation

uint32_t FedRawDataInputSource::bufferInputRead_ = 0
private

Definition at line 157 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readNextChunkIntoBuffer().

unsigned int FedRawDataInputSource::checkEvery_ = 10
private

Definition at line 148 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

bool FedRawDataInputSource::chunkIsFree_ =false
private

Definition at line 121 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

InputFile* FedRawDataInputSource::currentFile_ = nullptr
private

Definition at line 120 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

int FedRawDataInputSource::currentFileIndex_ = -1
private

Definition at line 142 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

unsigned int FedRawDataInputSource::currentLumiSection_
private
std::vector<std::condition_variable*> FedRawDataInputSource::cvReader_
private
std::condition_variable FedRawDataInputSource::cvWakeup_
private

Definition at line 152 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

evf::EvFDaqDirector* FedRawDataInputSource::daqDirector_ =nullptr
private
const edm::DaqProvenanceHelper FedRawDataInputSource::daqProvenanceHelper_
private

Definition at line 95 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and read().

std::string FedRawDataInputSource::defPath_
private

Definition at line 77 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

uint32 FedRawDataInputSource::detectedFRDversion_ =0
private

Definition at line 119 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readWorker().

jsoncollector::DataPointDefinition* FedRawDataInputSource::dpd_
private

Definition at line 109 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and grabNextJsonFile().

std::unique_ptr<FRDEventMsgView> FedRawDataInputSource::event_
private

Definition at line 97 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), fillFEDRawDataCollection(), and getNextEvent().

unsigned int FedRawDataInputSource::eventChunkBlock_
private
unsigned int FedRawDataInputSource::eventChunkSize_
private
edm::EventID FedRawDataInputSource::eventID_
private

Definition at line 99 of file FedRawDataInputSource.h.

Referenced by read().

uint32_t FedRawDataInputSource::eventRunNumber_ =0
private

Definition at line 103 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and read().

unsigned int FedRawDataInputSource::eventsThisLumi_
private

Definition at line 106 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), getNextEvent(), and read().

unsigned long FedRawDataInputSource::eventsThisRun_ = 0
private

Definition at line 107 of file FedRawDataInputSource.h.

std::mutex FedRawDataInputSource::fileDeleteLock_
private

Definition at line 145 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and read().

int FedRawDataInputSource::fileDescriptor_ = -1
private

Definition at line 156 of file FedRawDataInputSource.h.

Referenced by readNextChunkIntoBuffer().

std::list<std::pair<int,std::string> > FedRawDataInputSource::fileNamesToDelete_
private

Definition at line 144 of file FedRawDataInputSource.h.

tbb::concurrent_queue<InputFile*> FedRawDataInputSource::fileQueue_
private

Definition at line 131 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

std::list<std::pair<int,InputFile*> > FedRawDataInputSource::filesToDelete_
private
evf::FastMonitoringService* FedRawDataInputSource::fms_ =nullptr
private
tbb::concurrent_queue<InputChunk*> FedRawDataInputSource::freeChunks_
private

Definition at line 130 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), read(), and readSupervisor().

const std::string FedRawDataInputSource::fuOutputDir_
private

Definition at line 93 of file FedRawDataInputSource.h.

Referenced by grabNextJsonFile().

const bool FedRawDataInputSource::getLSFromFilename_
private

Definition at line 86 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), getNextEvent(), and readSupervisor().

uint32_t FedRawDataInputSource::GTPEventID_ = 0
private

Definition at line 104 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

uint32_t FedRawDataInputSource::L1EventID_ = 0
private

Definition at line 105 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and read().

std::mutex FedRawDataInputSource::mReader_
private

Definition at line 133 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), readWorker(), and ~FedRawDataInputSource().

std::mutex FedRawDataInputSource::mWakeup_
private

Definition at line 151 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

unsigned int FedRawDataInputSource::nStreams_ = 0
private

Definition at line 147 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

unsigned int FedRawDataInputSource::numBuffers_
private

Definition at line 82 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

unsigned int FedRawDataInputSource::numConcurrentReads_
private

Definition at line 83 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and ~FedRawDataInputSource().

edm::ProcessHistoryID FedRawDataInputSource::processHistoryID_
private

Definition at line 100 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), maybeOpenNewLumiSection(), and read().

std::atomic<bool> FedRawDataInputSource::quit_threads_
private
unsigned int FedRawDataInputSource::readBlocks_
private
std::unique_ptr<std::thread> FedRawDataInputSource::readSupervisorThread_
private

Definition at line 124 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and ~FedRawDataInputSource().

const edm::RunNumber_t FedRawDataInputSource::runNumber_
private

Definition at line 91 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and postForkReacquireResources().

bool FedRawDataInputSource::setExceptionState_ = false
private

Definition at line 138 of file FedRawDataInputSource.h.

Referenced by exceptionState(), getNextEvent(), and readWorker().

bool FedRawDataInputSource::singleBufferMode_
private

Definition at line 155 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

bool FedRawDataInputSource::startedSupervisorThread_ = false
private

Definition at line 123 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and ~FedRawDataInputSource().

std::condition_variable FedRawDataInputSource::startupCv_
private
std::mutex FedRawDataInputSource::startupLock_
private
std::vector<int>* FedRawDataInputSource::streamFileTrackerPtr_ = nullptr
private

Definition at line 146 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

const bool FedRawDataInputSource::testModeNoBuilderUnit_
private

Definition at line 89 of file FedRawDataInputSource.h.

Referenced by deleteFile(), FedRawDataInputSource(), and grabNextJsonFile().

std::vector<bool> FedRawDataInputSource::thread_quit_signal
private
std::atomic<bool> FedRawDataInputSource::threadInit_
private

Definition at line 159 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and readWorker().

const bool FedRawDataInputSource::useL1EventID_
private

Definition at line 88 of file FedRawDataInputSource.h.

Referenced by read().

const bool FedRawDataInputSource::verifyAdler32_
private

Definition at line 87 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

std::vector<ReaderInfo> FedRawDataInputSource::workerJob_
private

Definition at line 128 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), readSupervisor(), and readWorker().

tbb::concurrent_queue<unsigned int> FedRawDataInputSource::workerPool_
private

Definition at line 127 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), and readWorker().

std::vector<std::thread*> FedRawDataInputSource::workerThreads_
private