CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Protected Member Functions | Private Types | Private Member Functions | Private Attributes | Friends
FedRawDataInputSource Class Reference

#include <FedRawDataInputSource.h>

Inheritance diagram for FedRawDataInputSource:
edm::RawInputSource edm::InputSource edm::ProductRegistryHelper

Public Member Functions

 FedRawDataInputSource (edm::ParameterSet const &, edm::InputSourceDescription const &)
 
virtual ~FedRawDataInputSource ()
 
- Public Member Functions inherited from edm::RawInputSource
 RawInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
 
virtual ~RawInputSource ()
 
- Public Member Functions inherited from edm::InputSource
boost::shared_ptr
< ActivityRegistry
actReg () const
 Accessor for Activity Registry. More...
 
boost::shared_ptr
< BranchIDListHelper
branchIDListHelper () const
 Accessor for branchIDListHelper. More...
 
void closeFile (FileBlock *, bool cleaningUpAfterException)
 close current file More...
 
void doBeginJob ()
 Called by framework at beginning of job. More...
 
void doBeginLumi (LuminosityBlockPrincipal &lbp, ProcessContext const *)
 Called by framework at beginning of lumi block. More...
 
void doBeginRun (RunPrincipal &rp, ProcessContext const *)
 Called by framework at beginning of run. More...
 
void doEndJob ()
 Called by framework at end of job. More...
 
void doEndLumi (LuminosityBlockPrincipal &lbp, bool cleaningUpAfterException, ProcessContext const *)
 Called by framework at end of lumi block. More...
 
void doEndRun (RunPrincipal &rp, bool cleaningUpAfterException, ProcessContext const *)
 Called by framework at end of run. More...
 
void doPostForkReacquireResources (boost::shared_ptr< multicore::MessageReceiverForSource >)
 
void doPreForkReleaseResources ()
 Called by the framework before forking the process. More...
 
ProcessingController::ForwardState forwardState () const
 
bool goToEvent (EventID const &eventID)
 
 InputSource (ParameterSet const &, InputSourceDescription const &)
 Constructor. More...
 
 InputSource (InputSource const &)=delete
 
void issueReports (EventID const &eventID)
 issue an event report More...
 
LuminosityBlockNumber_t luminosityBlock () const
 Accessor for current luminosity block number. More...
 
boost::shared_ptr
< LuminosityBlockAuxiliary
luminosityBlockAuxiliary () const
 Called by the framework to merge or insert lumi in principal cache. More...
 
int maxEvents () const
 
int maxLuminosityBlocks () const
 
ModuleDescription const & moduleDescription () const
 Accessor for 'module' description. More...
 
ItemType nextItemType ()
 Advances the source to the next item. More...
 
InputSourceoperator= (InputSource const &)=delete
 
bool primary () const
 Accessor for primary input source flag. More...
 
ProcessConfiguration const & processConfiguration () const
 Accessor for Process Configuration. More...
 
std::string const & processGUID () const
 Accessor for global process identifier. More...
 
ProcessHistoryRegistry const & processHistoryRegistry () const
 Const accessor for process history registry. More...
 
ProcessHistoryRegistryprocessHistoryRegistryForUpdate ()
 Non-const accessor for process history registry. More...
 
ProcessingMode processingMode () const
 RunsLumisAndEvents (default), RunsAndLumis, or Runs. More...
 
boost::shared_ptr
< ProductRegistry const > 
productRegistry () const
 Accessor for product registry. More...
 
bool randomAccess () const
 
void readAndMergeLumi (LuminosityBlockPrincipal &lbp)
 Read next luminosity block (same as a prior lumi) More...
 
void readAndMergeRun (RunPrincipal &rp)
 Read next run (same as a prior run) More...
 
void readEvent (EventPrincipal &ep, StreamContext &)
 Read next event. More...
 
bool readEvent (EventPrincipal &ep, EventID const &, StreamContext &)
 Read a specific event. More...
 
std::unique_ptr< FileBlockreadFile ()
 Read next file. More...
 
void readLuminosityBlock (LuminosityBlockPrincipal &lumiPrincipal, HistoryAppender &historyAppender)
 Read next luminosity block (new lumi) More...
 
boost::shared_ptr
< LuminosityBlockAuxiliary
readLuminosityBlockAuxiliary ()
 Read next luminosity block Auxilary. More...
 
void readRun (RunPrincipal &runPrincipal, HistoryAppender &historyAppender)
 Read next run (new run) More...
 
boost::shared_ptr< RunAuxiliaryreadRunAuxiliary ()
 Read next run Auxiliary. More...
 
ProcessHistoryID const & reducedProcessHistoryID () const
 
void registerProducts ()
 Register any produced products. More...
 
int remainingEvents () const
 
int remainingLuminosityBlocks () const
 
void repeat ()
 Reset the remaining number of events/lumis to the maximum number. More...
 
SharedResourcesAcquirerresourceSharedWithDelayedReader () const
 Returns nullptr if no resource shared between the Source and a DelayedReader. More...
 
ProcessingController::ReverseState reverseState () const
 
void rewind ()
 Begin again at the first event. More...
 
RunNumber_t run () const
 Accessor for current run number. More...
 
boost::shared_ptr< RunAuxiliaryrunAuxiliary () const
 Called by the framework to merge or insert run in principal cache. More...
 
void setLuminosityBlockNumber_t (LuminosityBlockNumber_t lb)
 Set the luminosity block ID. More...
 
void setRunNumber (RunNumber_t r)
 Set the run number. More...
 
void skipEvents (int offset)
 
bool skipForForking ()
 
Timestamp const & timestamp () const
 Accessor for the current time, as seen by the input source. More...
 
virtual ~InputSource ()
 Destructor. More...
 

Protected Member Functions

virtual bool checkNextEvent () override
 
virtual void read (edm::EventPrincipal &eventPrincipal) override
 
- Protected Member Functions inherited from edm::RawInputSource
void makeEvent (EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
 
void setInputFileTransitionsEachEvent ()
 
- Protected Member Functions inherited from edm::InputSource
void decreaseRemainingEventsBy (int iSkipped)
 
bool eventCached () const
 
boost::shared_ptr
< LuminosityBlockPrincipal >
const 
luminosityBlockPrincipal () const
 
bool newLumi () const
 
bool newRun () const
 
ProcessHistoryRegistryprocessHistoryRegistryUpdate () const
 
ProductRegistryproductRegistryUpdate () const
 
void reset () const
 
void resetEventCached ()
 
void resetLuminosityBlockAuxiliary (bool isNewLumi=true) const
 
void resetNewLumi ()
 
void resetNewRun ()
 
void resetRunAuxiliary (bool isNewRun=true) const
 
boost::shared_ptr
< RunPrincipal > const 
runPrincipal () const
 
void setEventCached ()
 Called by the framework to merge or ached() const {return eventCached_;}. More...
 
void setLuminosityBlockAuxiliary (LuminosityBlockAuxiliary *lbp)
 
void setNewLumi ()
 
void setNewRun ()
 
void setRunAuxiliary (RunAuxiliary *rp)
 
void setTimestamp (Timestamp const &theTime)
 To set the current time, as seen by the input source. More...
 
virtual void skip (int offset)
 
ItemType state () const
 

Private Types

typedef std::pair< InputFile
*, InputChunk * > 
ReaderInfo
 

Private Member Functions

void deleteFile (std::string const &)
 
bool exceptionState ()
 
edm::Timestamp fillFEDRawDataCollection (std::auto_ptr< FEDRawDataCollection > &) const
 
evf::EvFDaqDirector::FileStatus getNextEvent ()
 
int grabNextJsonFile (boost::filesystem::path const &)
 
void maybeOpenNewLumiSection (const uint32_t lumiSection)
 
evf::EvFDaqDirector::FileStatus nextEvent ()
 
virtual void postForkReacquireResources (boost::shared_ptr< edm::multicore::MessageReceiverForSource >) override
 
virtual void preForkReleaseResources () override
 
void readNextChunkIntoBuffer (InputFile *file)
 
void readSupervisor ()
 
void readWorker (unsigned int tid)
 
void renameToNextFree (std::string const &fileName) const
 
virtual void rewind_ () override
 
void threadError ()
 

Private Attributes

uint32_t bufferInputRead_ = 0
 
unsigned int checkEvery_ = 10
 
bool chunkIsFree_ =false
 
InputFilecurrentFile_ = nullptr
 
int currentFileIndex_ = -1
 
unsigned int currentLumiSection_
 
std::vector
< std::condition_variable * > 
cvReader_
 
std::condition_variable cvWakeup_
 
evf::EvFDaqDirectordaqDirector_ =nullptr
 
const edm::DaqProvenanceHelper daqProvenanceHelper_
 
std::string defPath_
 
jsoncollector::DataPointDefinitiondpd_
 
std::unique_ptr< FRDEventMsgViewevent_
 
unsigned int eventChunkBlock_
 
unsigned int eventChunkSize_
 
edm::EventID eventID_
 
unsigned int eventsThisLumi_
 
unsigned long eventsThisRun_ = 0
 
std::mutex fileDeleteLock_
 
int fileDescriptor_ = -1
 
std::list< std::pair< int,
std::string > > 
fileNamesToDelete_
 
tbb::concurrent_queue
< InputFile * > 
fileQueue_
 
std::list< std::pair< int,
InputFile * > > 
filesToDelete_
 
evf::FastMonitoringServicefms_ =nullptr
 
tbb::concurrent_queue
< InputChunk * > 
freeChunks_
 
const std::string fuOutputDir_
 
const bool getLSFromFilename_
 
std::mutex mReader_
 
std::mutex mWakeup_
 
unsigned int nStreams_ = 0
 
unsigned int numBuffers_
 
unsigned int numConcurrentReads_
 
edm::ProcessHistoryID processHistoryID_
 
std::atomic< bool > quit_threads_
 
unsigned int readBlocks_
 
std::unique_ptr< std::thread > readSupervisorThread_
 
const edm::RunNumber_t runNumber_
 
bool setExceptionState_ = false
 
bool singleBufferMode_
 
bool startedSupervisorThread_ = false
 
std::condition_variable startupCv_
 
std::mutex startupLock_
 
std::vector< int > * streamFileTrackerPtr_ = nullptr
 
const bool testModeNoBuilderUnit_
 
std::vector< bool > thread_quit_signal
 
std::atomic< bool > threadInit_
 
const bool verifyAdler32_
 
std::vector< ReaderInfoworkerJob_
 
tbb::concurrent_queue
< unsigned int > 
workerPool_
 
std::vector< std::thread * > workerThreads_
 

Friends

class InputChunk
 
class InputFile
 Open Root file and provide MEs ############. More...
 

Additional Inherited Members

- Public Types inherited from edm::InputSource
enum  ItemType {
  IsInvalid, IsStop, IsFile, IsRun,
  IsLumi, IsEvent, IsRepeat, IsSynchronize
}
 
enum  ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents }
 
typedef
ProductRegistryHelper::TypeLabelList 
TypeLabelList
 
- Static Public Member Functions inherited from edm::RawInputSource
static void fillDescription (ParameterSetDescription &description)
 
- Static Public Member Functions inherited from edm::InputSource
static const std::string & baseType ()
 
static void fillDescription (ParameterSetDescription &desc)
 
static void fillDescriptions (ConfigurationDescriptions &descriptions)
 
static void prevalidate (ConfigurationDescriptions &)
 

Detailed Description

Definition at line 39 of file FedRawDataInputSource.h.

Member Typedef Documentation

Definition at line 113 of file FedRawDataInputSource.h.

Constructor & Destructor Documentation

FedRawDataInputSource::FedRawDataInputSource ( edm::ParameterSet const &  pset,
edm::InputSourceDescription const &  desc 
)
explicit

Definition at line 49 of file FedRawDataInputSource.cc.

References edm::Timestamp::beginOfTime(), cvReader_, daqDirector_, edm::DaqProvenanceHelper::daqInit(), daqProvenanceHelper_, defPath_, dpd_, eventChunkBlock_, eventChunkSize_, edm::hlt::Exception, fileDeleteLock_, filesToDelete_, fms_, freeChunks_, i, InputChunk, edm::Timestamp::invalidTimestamp(), numBuffers_, numConcurrentReads_, processHistoryID_, edm::InputSource::processHistoryRegistryForUpdate(), edm::InputSource::productRegistryUpdate(), quit_threads_, readBlocks_, readWorker(), edm::InputSource::runAuxiliary(), runNumber_, evf::EvFDaqDirector::setDeleteTracking(), evf::EvFDaqDirector::setFMS(), edm::InputSource::setNewRun(), edm::InputSource::setRunAuxiliary(), singleBufferMode_, startupCv_, startupLock_, AlCaHLTBitMon_QueryRunRegistry::string, testModeNoBuilderUnit_, thread_quit_signal, threadInit_, workerJob_, and workerThreads_.

50  :
51  edm::RawInputSource(pset, desc),
52  defPath_(pset.getUntrackedParameter<std::string> ("buDefPath", std::string(getenv("CMSSW_BASE"))+"/src/EventFilter/Utilities/plugins/budef.jsd")),
53  eventChunkSize_(pset.getUntrackedParameter<unsigned int> ("eventChunkSize",16)*1048576),
54  eventChunkBlock_(pset.getUntrackedParameter<unsigned int> ("eventChunkBlock",eventChunkSize_/1048576)*1048576),
55  numBuffers_(pset.getUntrackedParameter<unsigned int> ("numBuffers",1)),
56  getLSFromFilename_(pset.getUntrackedParameter<bool> ("getLSFromFilename", true)),
57  verifyAdler32_(pset.getUntrackedParameter<bool> ("verifyAdler32", true)),
58  testModeNoBuilderUnit_(edm::Service<evf::EvFDaqDirector>()->getTestModeNoBuilderUnit()),
62  eventID_(),
65  eventsThisLumi_(0),
66  dpd_(nullptr)
67 {
68  char thishost[256];
69  gethostname(thishost, 255);
70  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: "
71  << std::endl << (eventChunkSize_/1048576)
72  << " MB on host " << thishost;
74  edm::LogInfo("FedRawDataInputSource") << "Test mode is ON!";
75 
77  setNewRun();
80 
81  dpd_ = new DataPointDefinition();
82  std::string defLabel = "data";
83  DataPointDefinition::getDataPointDefinitionFor(defPath_, dpd_,&defLabel);
84 
85  //make sure that chunk size is N * block size
90 
91  if (!numBuffers_)
92  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") <<
93  "no reading enabled with numBuffers parameter 0";
94 
97 
98  //het handles to DaqDirector and FastMonitoringService because it isn't acessible in readSupervisor thread
99 
100  try {
102  } catch (...){
103  edm::LogWarning("FedRawDataInputSource") << "FastMonitoringService not found";
104  assert(0);//test
105  }
106 
107  try {
109  //set DaqDirector to delete files in preGlobalEndLumi callback
112  if (fms_) daqDirector_->setFMS(fms_);
113  } catch (...){
114  edm::LogWarning("FedRawDataInputSource") << "EvFDaqDirector not found";
115  assert(0);//test
116  }
117 
118  //should delete chunks when run stops
119  for (unsigned int i=0;i<numBuffers_;i++) {
121  }
122 
123  quit_threads_ = false;
124 
125  for (unsigned int i=0;i<numConcurrentReads_;i++)
126  {
127  std::unique_lock<std::mutex> lk(startupLock_);
128  //issue a memory fence here and in threads (constructor was segfaulting without this)
129  thread_quit_signal.push_back(false);
130  workerJob_.push_back(ReaderInfo(nullptr,nullptr));
131  cvReader_.push_back(new std::condition_variable);
132  threadInit_.store(false,std::memory_order_release);
133  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker,this,i));
134  startupCv_.wait(lk);
135  }
136 
137  runAuxiliary()->setProcessHistoryID(processHistoryID_);
138 }
int i
Definition: DBlmapReader.cc:9
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
jsoncollector::DataPointDefinition * dpd_
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
std::vector< std::condition_variable * > cvReader_
static Timestamp beginOfTime()
Definition: Timestamp.h:103
std::vector< bool > thread_quit_signal
const edm::RunNumber_t runNumber_
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
edm::ProcessHistoryID processHistoryID_
const edm::DaqProvenanceHelper daqProvenanceHelper_
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
std::vector< std::thread * > workerThreads_
const std::string fuOutputDir_
std::list< std::pair< int, InputFile * > > filesToDelete_
ProductRegistry & productRegistryUpdate() const
Definition: InputSource.h:347
std::condition_variable startupCv_
boost::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:259
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Non-const accessor for process history registry.
Definition: InputSource.h:174
std::atomic< bool > threadInit_
std::pair< InputFile *, InputChunk * > ReaderInfo
evf::EvFDaqDirector * daqDirector_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:350
void readWorker(unsigned int tid)
evf::FastMonitoringService * fms_
tbb::concurrent_queue< InputChunk * > freeChunks_
void setFMS(evf::FastMonitoringService *fms)
FedRawDataInputSource::~FedRawDataInputSource ( )
virtual

Definition at line 140 of file FedRawDataInputSource.cc.

References cvReader_, deleteFile(), filesToDelete_, i, mReader_, numConcurrentReads_, quit_threads_, readSupervisorThread_, startedSupervisorThread_, thread_quit_signal, and workerThreads_.

141 {
142  quit_threads_=true;
143 
144  //delete any remaining open files
145  for (auto it = filesToDelete_.begin();it!=filesToDelete_.end();it++) {
146  deleteFile(it->second->fileName_);
147  delete it->second;
148  }
150  readSupervisorThread_->join();
151  }
152  else {
153  //join aux threads in case the supervisor thread was not started
154  for (unsigned int i=0;i<workerThreads_.size();i++) {
155  std::unique_lock<std::mutex> lk(mReader_);
156  thread_quit_signal[i]=true;
157  cvReader_[i]->notify_one();
158  lk.unlock();
159  workerThreads_[i]->join();
160  delete workerThreads_[i];
161  }
162  }
163  for (unsigned int i=0;i<numConcurrentReads_;i++) delete cvReader_[i];
164  /*
165  for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
166  InputChunk *ch;
167  while (!freeChunks_.try_pop(ch)) {}
168  delete ch;
169  }
170  */
171 }
int i
Definition: DBlmapReader.cc:9
std::atomic< bool > quit_threads_
std::unique_ptr< std::thread > readSupervisorThread_
std::vector< std::condition_variable * > cvReader_
std::vector< bool > thread_quit_signal
std::vector< std::thread * > workerThreads_
std::list< std::pair< int, InputFile * > > filesToDelete_
void deleteFile(std::string const &)

Member Function Documentation

bool FedRawDataInputSource::checkNextEvent ( )
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 173 of file FedRawDataInputSource.cc.

References currentLumiSection_, daqDirector_, event_, eventID_, eventsThisLumi_, fms_, newFWLiteAna::found, evf::EvFDaqDirector::getEoLSFilePathOnBU(), evf::EvFDaqDirector::getEoLSFilePathOnFU(), evf::EvFDaqDirector::getEoRFilePathOnFU(), getLSFromFilename_, evf::EvFDaqDirector::lockFULocal2(), maybeOpenNewLumiSection(), evf::EvFDaqDirector::newLumi, nextEvent(), evf::EvFDaqDirector::noFile, readSupervisor(), readSupervisorThread_, evf::FastMonitoringService::reportEventsThisLumiInSource(), edm::InputSource::resetLuminosityBlockAuxiliary(), evf::EvFDaqDirector::runEnded, edm::InputSource::setEventCached(), startedSupervisorThread_, startupCv_, startupLock_, and AlCaHLTBitMon_QueryRunRegistry::string.

174 {
176  {
177  //this thread opens new files and dispatches reading to worker readers
178  //threadInit_.store(false,std::memory_order_release);
179  std::unique_lock<std::mutex> lk(startupLock_);
180  readSupervisorThread_.reset(new std::thread(&FedRawDataInputSource::readSupervisor,this));
182  startupCv_.wait(lk);
183  }
184  switch (nextEvent() ) {
186 
187  //maybe create EoL file in working directory before ending run
188  struct stat buf;
189  if ( currentLumiSection_ > 0 ) {
190  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
191  if (eolFound) {
193  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
194  if ( !found ) {
196  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
197  close(eol_fd);
199  }
200  }
201  }
202  //also create EoR file in FU data directory
203  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(),&buf) == 0);
204  if (!eorFound) {
205  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
206  close(eor_fd);
207  }
209  eventsThisLumi_=0;
211  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
212  return false;
213  }
215  //this is not reachable
216  return true;
217  }
219  //std::cout << "--------------NEW LUMI---------------" << std::endl;
220  return true;
221  }
222  default: {
223  if (!getLSFromFilename_) {
224  //get new lumi from file header
225  if (event_->lumi() > currentLumiSection_) {
227  eventsThisLumi_=0;
228  maybeOpenNewLumiSection( event_->lumi() );
229  }
230  }
232 
233  setEventCached();
234 
235  return true;
236  }
237  }
238 }
void maybeOpenNewLumiSection(const uint32_t lumiSection)
std::unique_ptr< std::thread > readSupervisorThread_
std::string getEoLSFilePathOnBU(const unsigned int ls) const
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:381
std::unique_ptr< FRDEventMsgView > event_
std::condition_variable startupCv_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:362
evf::EvFDaqDirector * daqDirector_
evf::FastMonitoringService * fms_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
evf::EvFDaqDirector::FileStatus nextEvent()
std::string getEoRFilePathOnFU() const
void FedRawDataInputSource::deleteFile ( std::string const &  fileName)
private

Definition at line 522 of file FedRawDataInputSource.cc.

References cppFunctionSkipper::exception, convertXMLtoSQLite_cfg::fileName, LogDebug, getHLTPrescaleColumns::path, python.multivaluedict::remove(), renameToNextFree(), and testModeNoBuilderUnit_.

Referenced by getNextEvent(), read(), and ~FedRawDataInputSource().

523 {
524  const boost::filesystem::path filePath(fileName);
525  if (!testModeNoBuilderUnit_) {
526  LogDebug("FedRawDataInputSource") << "Deleting input file -:" << fileName;
527  try {
528  //sometimes this fails but file gets deleted
529  boost::filesystem::remove(filePath);
530  }
531  catch (const boost::filesystem::filesystem_error& ex)
532  {
533  edm::LogError("FedRawDataInputSource") << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
534  << ". Trying again.";
535  usleep(100000);
536  try {
537  boost::filesystem::remove(filePath);
538  }
539  catch (...) {/*file gets deleted first time but exception is still thrown*/}
540  }
541  catch (std::exception& ex)
542  {
543  edm::LogError("FedRawDataInputSource") << " - deleteFile std::exception CAUGHT -: " << ex.what()
544  << ". Trying again.";
545  usleep(100000);
546  try {
547  boost::filesystem::remove(filePath);
548  } catch (...) {/*file gets deleted first time but exception is still thrown*/}
549  }
550  } else {
552  }
553 }
#define LogDebug(id)
void renameToNextFree(std::string const &fileName) const
bool FedRawDataInputSource::exceptionState ( )
inlineprivate

Definition at line 68 of file FedRawDataInputSource.h.

References setExceptionState_.

Referenced by InputFile::advance(), and getNextEvent().

edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection ( std::auto_ptr< FEDRawDataCollection > &  rawData) const
private

Definition at line 604 of file FedRawDataInputSource.cc.

References FEDRawData::data(), event(), event_, fedt_struct::eventsize, evf::evtn::evm_board_setformat(), FED_EVSZ_EXTRACT, FED_SOID_EXTRACT, evf::evtn::getgpshigh(), evf::evtn::getgpslow(), FEDNumbering::MINTriggerGTPFEDID, FEDRawData::resize(), and fedh_struct::sourceid.

Referenced by read().

605 {
606  edm::Timestamp tstamp;
607  uint32_t eventSize = event_->eventSize();
608  char* event = (char*)event_->payload();
609 
610  while (eventSize > 0) {
611  eventSize -= sizeof(fedt_t);
612  const fedt_t* fedTrailer = (fedt_t*) (event + eventSize);
613  const uint32_t fedSize = FED_EVSZ_EXTRACT(fedTrailer->eventsize) << 3; //trailer length counts in 8 bytes
614  eventSize -= (fedSize - sizeof(fedh_t));
615  const fedh_t* fedHeader = (fedh_t *) (event + eventSize);
616  const uint16_t fedId = FED_SOID_EXTRACT(fedHeader->sourceid);
617  if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
619  const uint64_t gpsl = evf::evtn::getgpslow((unsigned char*) fedHeader);
620  const uint64_t gpsh = evf::evtn::getgpshigh((unsigned char*) fedHeader);
621  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
622  }
623  FEDRawData& fedData = rawData->FEDData(fedId);
624  fedData.resize(fedSize);
625  memcpy(fedData.data(), event + eventSize, fedSize);
626  }
627  assert(eventSize == 0);
628 
629  return tstamp;
630 }
unsigned int getgpshigh(const unsigned char *)
struct fedh_struct fedh_t
unsigned int sourceid
Definition: fed_header.h:32
void evm_board_setformat(size_t size)
void resize(size_t newsize)
Definition: FEDRawData.cc:32
struct fedt_struct fedt_t
#define FED_EVSZ_EXTRACT(a)
Definition: fed_trailer.h:36
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
unsigned long long uint64_t
Definition: Time.h:15
std::unique_ptr< FRDEventMsgView > event_
unsigned int eventsize
Definition: fed_trailer.h:33
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:28
unsigned int getgpslow(const unsigned char *)
#define FED_SOID_EXTRACT(a)
Definition: fed_header.h:53
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent ( )
inlineprivate

Definition at line 289 of file FedRawDataInputSource.cc.

References InputFile::advance(), bufferInputRead_, InputFile::bufferPosition_, checkEvery_, chunkIsFree_, InputFile::chunkPosition_, InputFile::chunks_, InputFile::currentChunk_, currentFile_, currentFileIndex_, currentLumiSection_, cvWakeup_, daqDirector_, deleteFile(), event_, eventChunkSize_, eventsThisLumi_, edm::hlt::Exception, exceptionState(), fileDeleteLock_, InputFile::fileName_, fileQueue_, InputFile::fileSize_, filesToDelete_, fms_, freeChunks_, getLSFromFilename_, evf::EvFDaqDirector::getStreamFileTracker(), evf::EvFDaqDirector::isSingleStreamThread(), dttmaxenums::L, InputFile::lumi_, maybeOpenNewLumiSection(), InputFile::moveToPreviousChunk(), mWakeup_, InputFile::nChunks_, InputFile::nEvents_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, InputFile::nProcessed_, nStreams_, InputFile::parent_, readNextChunkIntoBuffer(), evf::FastMonitoringService::reportEventsThisLumiInSource(), InputFile::rewindChunk(), evf::EvFDaqDirector::runEnded, evf::EvFDaqDirector::sameFile, setExceptionState_, singleBufferMode_, ntuplemaker::status, InputFile::status_, streamFileTrackerPtr_, threadError(), evf::EvFDaqDirector::updateFileIndex(), verifyAdler32_, and InputFile::waitForChunk().

Referenced by nextEvent().

290 {
291  const size_t headerSize = (4 + 1024) * sizeof(uint32); //minimal size to fit any version of FRDEventHeader
292 
294  if (!currentFile_)
295  {
296  if (!streamFileTrackerPtr_) {
300  }
301 
303  if (!fileQueue_.try_pop(currentFile_))
304  {
305  //sleep until wakeup (only in single-buffer mode) or timeout
306  std::unique_lock<std::mutex> lkw(mWakeup_);
307  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_)
309  }
310  status = currentFile_->status_;
311  if ( status == evf::EvFDaqDirector::runEnded)
312  {
313  delete currentFile_;
314  currentFile_=nullptr;
315  return status;
316  }
317 
318  else if (status == evf::EvFDaqDirector::newLumi)
319  {
320  if (getLSFromFilename_) {
323  eventsThisLumi_=0;
325  }
326  }
327  else {//let this be picked up from next event
329  }
330 
331  delete currentFile_;
332  currentFile_=nullptr;
333  return status;
334  }
335  else if (status == evf::EvFDaqDirector::newFile) {
338  }
339  else
340  assert(0);
341  }
342 
343  //file is empty
344  if (!currentFile_->fileSize_) {
345  //try to open new lumi
346  assert(currentFile_->nChunks_==0);
347  if (getLSFromFilename_)
350  eventsThisLumi_=0;
352  }
353  //immediately delete empty file
355  delete currentFile_;
356  currentFile_=nullptr;
358  }
359 
360  //file is finished
362  //release last chunk (it is never released elsewhere)
365  {
366  throw cms::Exception("RuntimeError")
367  << "Fully processed " << currentFile_->nProcessed_
368  << " from the file " << currentFile_->fileName_
369  << " but according to BU JSON there should be "
370  << currentFile_->nEvents_ << " events";
371  }
372  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
373  if (singleBufferMode_) {
374  std::unique_lock<std::mutex> lkw(mWakeup_);
375  cvWakeup_.notify_one();
376  }
379  //put the file in pending delete list;
380  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
381  filesToDelete_.push_back(std::pair<int,InputFile*>(currentFileIndex_,currentFile_));
382  }
383  else {
384  //in single-thread and stream jobs, events are already processed
386  delete currentFile_;
387  }
388  currentFile_=nullptr;
390  }
391 
392 
393  //file is too short
394  if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < headerSize)
395  {
396  throw cms::Exception("FedRawDataInputSource::cacheNextEvent") <<
397  "Premature end of input file while reading event header";
398  }
399 
400  if (singleBufferMode_) {
401 
402  //should already be there
404  usleep(10000);
406  }
407 
408  unsigned char *dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
409 
410  if (bufferInputRead_ < headerSize)
411  {
413  //recalculate chunk position
414  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
415  if ( bufferInputRead_ < headerSize )
416  {
417  throw cms::Exception("FedRawDataInputSource::cacheNextEvent") <<
418  "Premature end of input file while reading event header";
419  }
420  }
421 
422  event_.reset( new FRDEventMsgView(dataPosition) );
423  if (event_->size()>eventChunkSize_) {
424  throw cms::Exception("FedRawDataInputSource::nextEvent")
425  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
426  << " run:" << event_->run() << " of size:" << event_->size()
427  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
428  }
429 
430  const uint32_t msgSize = event_->size()-headerSize;
431 
433  {
434  throw cms::Exception("FedRawDataInputSource::nextEvent") <<
435  "Premature end of input file while reading event data";
436  }
437  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
439  //recalculate chunk position
440  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
441  event_.reset( new FRDEventMsgView(dataPosition) );
442  }
443  currentFile_->bufferPosition_ += event_->size();
444  currentFile_->chunkPosition_ += event_->size();
445  //last chunk is released when this function is invoked next time
446 
447  }
448  //multibuffer mode:
449  else
450  {
451  //wait for the current chunk to become added to the vector
453  usleep(10000);
455  }
456 
457  //check if header is at the boundary of two chunks
458  chunkIsFree_ = false;
459  unsigned char *dataPosition;
460 
461  //read header, copy it to a single chunk if necessary
462  bool chunkEnd = currentFile_->advance(dataPosition,headerSize);
463 
464  event_.reset( new FRDEventMsgView(dataPosition) );
465  if (event_->size()>eventChunkSize_) {
466  throw cms::Exception("FedRawDataInputSource::nextEvent")
467  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
468  << " run:" << event_->run() << " of size:" << event_->size()
469  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
470  }
471 
472  const uint32_t msgSize = event_->size()-headerSize;
473 
475  {
476  throw cms::Exception("FedRawDataInputSource::nextEvent") <<
477  "Premature end of input file while reading event data";
478  }
479 
480  if (chunkEnd) {
481  //header was at the chunk boundary, we will have to move payload as well
482  currentFile_->moveToPreviousChunk(msgSize,headerSize);
483  chunkIsFree_ = true;
484  }
485  else {
486  //header was contiguous, but check if payload fits the chunk
487  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
488  //rewind to header start position
489  currentFile_->rewindChunk(headerSize);
490  //copy event to a chunk start and move pointers
491  chunkEnd = currentFile_->advance(dataPosition,headerSize+msgSize);
492  assert(chunkEnd);
493  chunkIsFree_=true;
494  //header is moved
495  event_.reset( new FRDEventMsgView(dataPosition) );
496  }
497  else {
498  //everything is in a single chunk, only move pointers forward
499  chunkEnd = currentFile_->advance(dataPosition,msgSize);
500  assert(!chunkEnd);
501  chunkIsFree_=false;
502  }
503  }
504  }//end multibuffer mode
505 
506  if ( verifyAdler32_ && event_->version() >= 3 )
507  {
508  uint32_t adler = adler32(0L,Z_NULL,0);
509  adler = adler32(adler,(Bytef*)event_->payload(),event_->eventSize());
510 
511  if ( adler != event_->adler32() ) {
512  throw cms::Exception("FedRawDataInputSource::nextEvent") <<
513  "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() <<
514  " but calculated 0x" << adler;
515  }
516  }
518 
520 }
unsigned int lumi_
uint32_t chunkPosition_
std::condition_variable cvWakeup_
std::vector< int > * getStreamFileTracker()
void maybeOpenNewLumiSection(const uint32_t lumiSection)
void rewindChunk(const size_t size)
std::vector< int > * streamFileTrackerPtr_
evf::EvFDaqDirector::FileStatus status_
FedRawDataInputSource * parent_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
uint32_t bufferPosition_
void updateFileIndex(int const &fileIndex)
bool advance(unsigned char *&dataPosition, const size_t size)
void moveToPreviousChunk(const size_t size, const size_t offset)
bool waitForChunk(unsigned int chunkid)
unsigned int uint32
Definition: MsgTools.h:13
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
unsigned int currentChunk_
tbb::concurrent_queue< InputFile * > fileQueue_
std::unique_ptr< FRDEventMsgView > event_
std::string fileName_
tbb::concurrent_vector< InputChunk * > chunks_
evf::EvFDaqDirector * daqDirector_
void deleteFile(std::string const &)
evf::FastMonitoringService * fms_
tbb::concurrent_queue< InputChunk * > freeChunks_
tuple status
Definition: ntuplemaker.py:245
void readNextChunkIntoBuffer(InputFile *file)
unsigned int nEvents_
int FedRawDataInputSource::grabNextJsonFile ( boost::filesystem::path const &  jsonSourcePath)
private

Definition at line 632 of file FedRawDataInputSource.cc.

References filterCSVwithJSON::copy, daqDirector_, data, jsoncollector::DataPoint::deserialize(), reco::dp, dpd_, alignCSCRings::e, cppFunctionSkipper::exception, edm::hlt::Exception, fuOutputDir_, jsoncollector::DataPoint::getData(), jsoncollector::DataPointDefinition::getNames(), i, LogDebug, Json::Reader::parse(), getHLTPrescaleColumns::path, matplotRender::reader, python.multivaluedict::remove(), AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, testModeNoBuilderUnit_, and evf::EvFDaqDirector::unlockFULocal().

Referenced by readSupervisor().

633 {
635  try {
636  // assemble json destination path
638 
639  //TODO:should be ported to use fffnaming
640  std::ostringstream fileNameWithPID;
641  fileNameWithPID << jsonSourcePath.stem().string() << "_pid"
642  << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
643  jsonDestPath /= fileNameWithPID.str();
644 
645  LogDebug("FedRawDataInputSource") << "JSON rename -: " << jsonSourcePath << " to "
646  << jsonDestPath;
647 
649  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
650  else {
651  try {
652  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
653  }
654  catch (const boost::filesystem::filesystem_error& ex)
655  {
656  // Input dir gone?
657  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
658  // << " Maybe the file is not yet visible by FU. Trying again in one second";
659  sleep(1);
660  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
661  }
663 
664 
665  try {
666  //sometimes this fails but file gets deleted
667  boost::filesystem::remove(jsonSourcePath);
668  }
669  catch (const boost::filesystem::filesystem_error& ex)
670  {
671  // Input dir gone?
672  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
673  }
674  catch (std::exception& ex)
675  {
676  // Input dir gone?
677  edm::LogError("FedRawDataInputSource") << "grabNextFile std::exception CAUGHT -: " << ex.what();
678  }
679 
680  }
681 
682  boost::filesystem::ifstream ij(jsonDestPath);
683  Json::Value deserializeRoot;
685 
686  if (!reader.parse(ij, deserializeRoot))
687  throw std::runtime_error("Cannot deserialize input JSON file");
688 
689  //read BU JSON
691  DataPoint dp;
692  dp.deserialize(deserializeRoot);
693  bool success = false;
694  for (unsigned int i=0;i<dpd_->getNames().size();i++) {
695  if (dpd_->getNames().at(i)=="NEvents")
696  if (i<dp.getData().size()) {
697  data = dp.getData()[i];
698  success=true;
699  }
700  }
701  if (!success) {
702  if (dp.getData().size())
703  data = dp.getData()[0];
704  else
705  throw cms::Exception("FedRawDataInputSource::grabNextJsonFile") <<
706  " error reading number of events from BU JSON -: No input value " << data;
707  }
708  return boost::lexical_cast<int>(data);
709 
710  }
711  catch (const boost::filesystem::filesystem_error& ex)
712  {
713  // Input dir gone?
715  edm::LogError("FedRawDataInputSource") << "grabNextFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
716  << " - Maybe the BU run dir disappeared? Ending process with code 0...";
717  _exit(-1);
718  }
719  catch (std::runtime_error e)
720  {
721  // Another process grabbed the file and NFS did not register this
723  edm::LogError("FedRawDataInputSource") << "grabNextFile - runtime Exception -: " << e.what();
724  }
725 
726  catch( boost::bad_lexical_cast const& ) {
727  edm::LogError("FedRawDataInputSource") << "grabNextFile - error parsing number of events from BU JSON. "
728  << "Input value is -: " << data;
729  }
730 
731  catch (std::exception e)
732  {
733  // BU run directory disappeared?
735  edm::LogError("FedRawDataInputSource") << "grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
736  }
737 
738  return -1;
739 }
#define LogDebug(id)
std::vector< std::string > & getData()
Definition: DataPoint.h:58
int i
Definition: DBlmapReader.cc:9
jsoncollector::DataPointDefinition * dpd_
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
Represents a JSON value.
Definition: value.h:111
virtual void deserialize(Json::Value &root)
Definition: DataPoint.cc:56
const std::string fuOutputDir_
auto dp
Definition: deltaR.h:24
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
evf::EvFDaqDirector * daqDirector_
Unserialize a JSON document into a Value.
Definition: reader.h:16
std::vector< std::string > const & getNames()
void FedRawDataInputSource::maybeOpenNewLumiSection ( const uint32_t  lumiSection)
private

Definition at line 240 of file FedRawDataInputSource.cc.

References currentLumiSection_, daqDirector_, newFWLiteAna::found, evf::EvFDaqDirector::getEoLSFilePathOnFU(), edm::Timestamp::invalidTimestamp(), evf::EvFDaqDirector::lockFULocal2(), edm::InputSource::luminosityBlock(), edm::InputSource::luminosityBlockAuxiliary(), processHistoryID_, edm::InputSource::resetLuminosityBlockAuxiliary(), edm::InputSource::run(), edm::InputSource::runAuxiliary(), edm::InputSource::setLuminosityBlockAuxiliary(), AlCaHLTBitMon_QueryRunRegistry::string, and evf::EvFDaqDirector::unlockFULocal2().

Referenced by checkNextEvent(), and getNextEvent().

241 {
243  || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
244 
245  if ( currentLumiSection_ > 0 ) {
246  const std::string fuEoLS =
248  struct stat buf;
249  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
250  if ( !found ) {
252  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
253  close(eol_fd);
255  }
256  }
257 
258  currentLumiSection_ = lumiSection;
259 
261 
262  timeval tv;
263  gettimeofday(&tv, 0);
264  const edm::Timestamp lsopentime( (unsigned long long) tv.tv_sec * 1000000 + (unsigned long long) tv.tv_usec );
265 
266  edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary =
268  runAuxiliary()->run(),
269  lumiSection, lsopentime,
271 
272  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
273  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
274 
275  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: "<< lumiSection;
276  }
277 }
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
boost::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:262
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:595
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:354
edm::ProcessHistoryID processHistoryID_
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:601
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:362
boost::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:259
evf::EvFDaqDirector * daqDirector_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::nextEvent ( )
inlineprivate

Definition at line 279 of file FedRawDataInputSource.cc.

References getNextEvent(), svgfig::load(), evf::EvFDaqDirector::noFile, edm::shutdown_flag, and ntuplemaker::status.

Referenced by checkNextEvent().

280 {
282  while ((status = getNextEvent())==evf::EvFDaqDirector::noFile)
283  {
284  if (edm::shutdown_flag.load(std::memory_order_relaxed)) break;
285  }
286  return status;
287 }
volatile std::atomic< bool > shutdown_flag
def load
Definition: svgfig.py:546
evf::EvFDaqDirector::FileStatus getNextEvent()
tuple status
Definition: ntuplemaker.py:245
void FedRawDataInputSource::postForkReacquireResources ( boost::shared_ptr< edm::multicore::MessageReceiverForSource )
overrideprivatevirtual

Definition at line 755 of file FedRawDataInputSource.cc.

References edm::Timestamp::beginOfTime(), edm::Timestamp::invalidTimestamp(), runNumber_, and edm::InputSource::setRunAuxiliary().

756 {
757  InputSource::rewind();
761 }
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
static Timestamp beginOfTime()
Definition: Timestamp.h:103
const edm::RunNumber_t runNumber_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:350
void FedRawDataInputSource::preForkReleaseResources ( )
overrideprivatevirtual

Reimplemented from edm::RawInputSource.

Definition at line 752 of file FedRawDataInputSource.cc.

753 {}
void FedRawDataInputSource::read ( edm::EventPrincipal eventPrincipal)
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 555 of file FedRawDataInputSource.cc.

References printConversionInfo::aux, edm::DaqProvenanceHelper::branchDescription(), checkEvery_, chunkIsFree_, InputFile::chunks_, InputFile::currentChunk_, currentFile_, daqProvenanceHelper_, deleteFile(), edm::DaqProvenanceHelper::dummyProvenance(), eventID_, eventsThisLumi_, fileDeleteLock_, filesToDelete_, fillFEDRawDataCollection(), freeChunks_, i, edm::RawInputSource::makeEvent(), InputFile::nProcessed_, nStreams_, edm::EventAuxiliary::PhysicsTrigger, edm::InputSource::processGUID(), processHistoryID_, edm::EventPrincipal::put(), edm::EventAuxiliary::setProcessHistoryID(), and streamFileTrackerPtr_.

Referenced by Vispa.Plugins.EdmBrowser.EdmDataAccessor.EdmDataAccessor::goto(), readNextChunkIntoBuffer(), readWorker(), and Vispa.Plugins.EdmBrowser.EdmDataAccessor.EdmDataAccessor::setFilterBranches().

556 {
557  std::auto_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
558  edm::Timestamp tstamp = fillFEDRawDataCollection(rawData);
559 
560  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
562  aux.setProcessHistoryID(processHistoryID_);
563  makeEvent(eventPrincipal, aux);
564 
567 
568  //FWCore/Sources DaqProvenanceHelper before 7_1_0_pre3
569  //eventPrincipal.put(daqProvenanceHelper_.constBranchDescription_, edp,
570  // daqProvenanceHelper_.dummyProvenance_);
571 
572  eventPrincipal.put(daqProvenanceHelper_.branchDescription(), edp,
574 
575  eventsThisLumi_++;
576 
577  //this old file check runs no more often than every 10 events
578  if (!((currentFile_->nProcessed_-1)%(checkEvery_))) {
579  //delete files that are not in processing
580  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
581  auto it = filesToDelete_.begin();
582  while (it!=filesToDelete_.end()) {
583  bool fileIsBeingProcessed = false;
584  for (unsigned int i=0;i<nStreams_;i++) {
585  if (it->first == streamFileTrackerPtr_->at(i)) {
586  fileIsBeingProcessed = true;
587  break;
588  }
589  }
590  if (!fileIsBeingProcessed) {
591  deleteFile(it->second->fileName_);
592  delete it->second;
593  it = filesToDelete_.erase(it);
594  }
595  else it++;
596  }
597 
598  }
600  chunkIsFree_=false;
601  return;
602 }
int i
Definition: DBlmapReader.cc:9
std::vector< int > * streamFileTrackerPtr_
ProductProvenance const & dummyProvenance() const
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:214
BranchDescription const & branchDescription() const
edm::ProcessHistoryID processHistoryID_
const edm::DaqProvenanceHelper daqProvenanceHelper_
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
unsigned int currentChunk_
edm::Timestamp fillFEDRawDataCollection(std::auto_ptr< FEDRawDataCollection > &) const
tbb::concurrent_vector< InputChunk * > chunks_
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
void deleteFile(std::string const &)
tbb::concurrent_queue< InputChunk * > freeChunks_
void put(BranchDescription const &bd, WrapperOwningHolder const &edp, ProductProvenance const &productProvenance)
void FedRawDataInputSource::readNextChunkIntoBuffer ( InputFile file)
private

Definition at line 1069 of file FedRawDataInputSource.cc.

References bufferInputRead_, InputFile::chunkPosition_, InputFile::chunks_, eventChunkBlock_, eventChunkSize_, edm::hlt::Exception, fileDescriptor_, InputFile::fileName_, InputFile::fileSize_, i, prof2calltree::last, LogDebug, read(), and readBlocks_.

Referenced by getNextEvent().

1070 {
1071 
1072  if (fileDescriptor_<0) {
1073  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1074  bufferInputRead_ = 0;
1075  //off_t pos = lseek(fileDescriptor,0,SEEK_SET);
1076  if (fileDescriptor_>=0)
1077  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1078  else
1079  {
1080  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer") << "failed to open file " << std::endl
1081  << file->fileName_ << " fd:" << fileDescriptor_;
1082  }
1083  }
1084 
1085  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1086  uint32_t existingSize = 0;
1087  for (unsigned int i=0;i<readBlocks_;i++)
1088  {
1089  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1090  bufferInputRead_+=last;
1091  existingSize+=last;
1092  }
1093  }
1094  else {
1095  const uint32_t chunksize = file->chunkPosition_;
1096  const uint32_t blockcount=chunksize/eventChunkBlock_;
1097  const uint32_t leftsize = chunksize%eventChunkBlock_;
1098  uint32_t existingSize = eventChunkSize_ - file->chunkPosition_;
1099  memmove((void*) file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSize);
1100 
1101  for (uint32_t i=0;i<blockcount;i++) {
1102  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1103  bufferInputRead_+=last;
1104  existingSize+=last;
1105  }
1106  if (leftsize) {
1107  const ssize_t last = ::read(fileDescriptor_,( void*)( file->chunks_[0]->buf_ + existingSize ), leftsize);
1108  bufferInputRead_+=last;
1109  existingSize+=last;
1110  }
1111  file->chunkPosition_=0;//data was moved to beginning of the chunk
1112  }
1113  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1114  if (fileDescriptor_!=-1)
1115  {
1116  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1117  close(fileDescriptor_);
1118  fileDescriptor_=-1;
1119  }
1120  }
1121 }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
uint32_t chunkPosition_
virtual void read(edm::EventPrincipal &eventPrincipal) override
std::string fileName_
tbb::concurrent_vector< InputChunk * > chunks_
void FedRawDataInputSource::readSupervisor ( )
private

Definition at line 767 of file FedRawDataInputSource.cc.

References InputFile::chunks_, counter, cvReader_, cvWakeup_, daqDirector_, eventChunkSize_, fileQueue_, fms_, freeChunks_, getLSFromFilename_, grabNextJsonFile(), i, InputFile, LogDebug, python.rootplot.utilities::ls(), mReader_, mWakeup_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, getHLTPrescaleColumns::path, quit_threads_, InputChunk::readComplete_, InputChunk::reset(), evf::EvFDaqDirector::runEnded, edm::shutdown_flag, singleBufferMode_, evf::FastMonitoringService::startedLookingForFile(), startupCv_, startupLock_, ntuplemaker::status, evf::FastMonitoringService::stoppedLookingForFile(), AlCaHLTBitMon_QueryRunRegistry::string, thread_quit_signal, evf::EvFDaqDirector::updateFuLock(), workerJob_, workerPool_, and workerThreads_.

Referenced by checkNextEvent().

768 {
769  bool stop=false;
770  unsigned int currentLumiSection = 0;
771  //threadInit_.exchange(true,std::memory_order_acquire);
772 
773  {
774  std::unique_lock<std::mutex> lk(startupLock_);
775  startupCv_.notify_one();
776  }
777 
778  while (!stop) {
779 
780  //wait for at least one free thread and chunk
781  int counter=0;
782  while ((workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty())
783  {
784  std::unique_lock<std::mutex> lkw(mWakeup_);
785  //sleep until woken up by condition or a timeout
786  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
787  counter++;
788  //if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads...";
789  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
790  }
791  else {
792  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
793  }
794  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {stop=true;break;}
795  }
796 
797  if (stop) break;
798 
799  //look for a new file
800  std::string nextFile;
801  uint32_t ls;
802  uint32_t fileSize;
803 
805 
807 
808  while (status == evf::EvFDaqDirector::noFile) {
809  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
810  stop=true;
811  break;
812  }
813  else
814  status = daqDirector_->updateFuLock(ls,nextFile,fileSize);
815 
816  if ( status == evf::EvFDaqDirector::runEnded) {
818  stop=true;
819  break;
820  }
821 
822  //queue new lumisection
823  if( getLSFromFilename_ && ls > currentLumiSection) {
824  currentLumiSection = ls;
825  fileQueue_.push(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
826  }
827 
828  int dbgcount=0;
829  if (status == evf::EvFDaqDirector::noFile) {
830  dbgcount++;
831  //if (!(dbgcount%20)) edm::LogInfo("FedRawDataInputSource") << "No file for me... sleep and try again...";
832  if (!(dbgcount%20)) LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
833  usleep(100000);
834  }
835  }
836  if ( status == evf::EvFDaqDirector::newFile ) {
837  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
838 
839 
840  boost::filesystem::path rawFilePath(nextFile);
841  std::string rawFile = rawFilePath.replace_extension(".raw").string();
842 
843  struct stat st;
844  stat(rawFile.c_str(),&st);
845  fileSize=st.st_size;
846 
847  int eventsInNewFile = grabNextJsonFile(nextFile);
848  if (fms_) fms_->stoppedLookingForFile(ls);
849  assert( eventsInNewFile>=0 );
850  assert((eventsInNewFile>0) == (fileSize>0));//file without events must be empty
851 
852  if (!singleBufferMode_) {
853  //calculate number of needed chunks
854  unsigned int neededChunks = fileSize/eventChunkSize_;
855  if (fileSize%eventChunkSize_) neededChunks++;
856 
857  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,this);
858  fileQueue_.push(newInputFile);
859 
860  for (unsigned int i=0;i<neededChunks;i++) {
861 
862  //get thread
863  unsigned int newTid = 0xffffffff;
864  while (!workerPool_.try_pop(newTid)) {
865  usleep(100000);
866  }
867 
868  InputChunk * newChunk = nullptr;
869  while (!freeChunks_.try_pop(newChunk)) {
870  usleep(100000);
871  if (quit_threads_.load(std::memory_order_relaxed)) break;
872  }
873 
874  if (newChunk == nullptr) {
875  //return unused tid if we received shutdown (nullptr chunk)
876  if (newTid!=0xffffffff) workerPool_.push(newTid);
877  stop = true;
878  break;
879  }
880 
881  std::unique_lock<std::mutex> lk(mReader_);
882 
883  unsigned int toRead = eventChunkSize_;
884  if (i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
885  newChunk->reset(i*eventChunkSize_,toRead,i);
886 
887  workerJob_[newTid].first=newInputFile;
888  workerJob_[newTid].second=newChunk;
889 
890  //wake up the worker thread
891  cvReader_[newTid]->notify_one();
892  }
893  }
894  else {
895  if (!eventsInNewFile) {
896  //still queue file for lumi update
897  std::unique_lock<std::mutex> lkw(mWakeup_);
898  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,this);
899  fileQueue_.push(newInputFile);
900  cvWakeup_.notify_one();
901  return;
902  }
903  //in single-buffer mode put single chunk in the file and let the main thread read the file
904  InputChunk * newChunk;
905  //should be available immediately
906  while(!freeChunks_.try_pop(newChunk)) usleep(100000);
907 
908  std::unique_lock<std::mutex> lkw(mWakeup_);
909 
910  unsigned int toRead = eventChunkSize_;
911  if (fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
912  newChunk->reset(0,toRead,0);
913  newChunk->readComplete_=true;
914 
915  //push file and wakeup main thread
916  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,this);
917  newInputFile->chunks_[0]=newChunk;
918  fileQueue_.push(newInputFile);
919  cvWakeup_.notify_one();
920  }
921  }
922  }
923  //make sure threads finish reading
924  unsigned numFinishedThreads = 0;
925  while (numFinishedThreads < workerThreads_.size()) {
926  unsigned int tid;
927  while (!workerPool_.try_pop(tid)) {usleep(10000);}
928  std::unique_lock<std::mutex> lk(mReader_);
929  thread_quit_signal[tid]=true;
930  cvReader_[tid]->notify_one();
931  numFinishedThreads++;
932  }
933  for (unsigned int i=0;i<workerThreads_.size();i++) {
934  workerThreads_[i]->join();
935  delete workerThreads_[i];
936  }
937 }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
std::condition_variable cvWakeup_
tbb::concurrent_queue< unsigned int > workerPool_
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
volatile std::atomic< bool > shutdown_flag
std::vector< std::condition_variable * > cvReader_
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize)
void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex)
std::vector< bool > thread_quit_signal
int grabNextJsonFile(boost::filesystem::path const &)
std::vector< std::thread * > workerThreads_
std::atomic< bool > readComplete_
tbb::concurrent_queue< InputFile * > fileQueue_
std::condition_variable startupCv_
void stoppedLookingForFile(unsigned int lumi)
tbb::concurrent_vector< InputChunk * > chunks_
evf::EvFDaqDirector * daqDirector_
static std::atomic< unsigned int > counter
evf::FastMonitoringService * fms_
friend class InputFile
Open Root file and provide MEs ############.
tbb::concurrent_queue< InputChunk * > freeChunks_
tuple status
Definition: ntuplemaker.py:245
void FedRawDataInputSource::readWorker ( unsigned int  tid)
private

Definition at line 939 of file FedRawDataInputSource.cc.

References InputChunk::buf_, InputFile::chunks_, cvReader_, diffTreeTool::diff, end, eventChunkBlock_, mergeVDriftHistosByStation::file, InputChunk::fileIndex_, InputFile::fileName_, first, i, init, prof2calltree::last, LogDebug, mReader_, cmsPerfSuiteHarvest::now, InputChunk::offset_, read(), readBlocks_, InputChunk::readComplete_, edm::second(), setExceptionState_, dqm_diff::start, startupCv_, startupLock_, thread_quit_signal, threadInit_, InputChunk::usedSize_, workerJob_, and workerPool_.

Referenced by FedRawDataInputSource().

940 {
941  bool init = true;
942  threadInit_.exchange(true,std::memory_order_acquire);
943 
944  while (1) {
945 
946  std::unique_lock<std::mutex> lk(mReader_);
947  workerJob_[tid].first=nullptr;
948  workerJob_[tid].first=nullptr;
949 
950  assert(!thread_quit_signal[tid]);//should never get it here
951  workerPool_.push(tid);
952 
953  if (init) {
954  std::unique_lock<std::mutex> lk(startupLock_);
955  init = false;
956  startupCv_.notify_one();
957  }
958  cvReader_[tid]->wait(lk);
959 
960  if (thread_quit_signal[tid]) return;
961 
962  InputFile * file;
963  InputChunk * chunk;
964 
965  assert(workerJob_[tid].first!=nullptr && workerJob_[tid].second!=nullptr);
966 
967  file = workerJob_[tid].first;
968  chunk = workerJob_[tid].second;
969 
970  int fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
971  off_t pos = lseek(fileDescriptor,chunk->offset_,SEEK_SET);
972 
973  if (fileDescriptor>=1)
974  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_ << " at offset " << pos;
975  else
976  {
977  edm::LogError("FedRawDataInputSource") <<
978  "readWorker failed to open file -: " << file->fileName_ << " fd:" << fileDescriptor <<
979  " or seek to offset " << chunk->offset_ << ", lseek returned:" << pos;
980  setExceptionState_=true;
981  return;
982 
983  }
984 
985  unsigned int bufferLeft = 0;
987  for (unsigned int i=0;i<readBlocks_;i++)
988  {
989  const ssize_t last = ::read(fileDescriptor,( void*) (chunk->buf_+bufferLeft), eventChunkBlock_);
990  if ( last > 0 )
991  bufferLeft+=last;
992  if (last < eventChunkBlock_) {
993  assert(chunk->usedSize_==i*eventChunkBlock_+last);
994  break;
995  }
996  }
998  auto diff = end-start;
999  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1000  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB" << " in " << msec.count() << " ms ("<< (bufferLeft >> 20)/double(msec.count())<<" GB/s)";
1001  close(fileDescriptor);
1002 
1003  chunk->readComplete_=true;//this is atomic to secure the sequential buffer fill before becoming available for processing)
1004  file->chunks_[chunk->fileIndex_]=chunk;//put the completed chunk in the file chunk vector at predetermined index
1005 
1006  }
1007 }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
tuple start
Check for commandline option errors.
Definition: dqm_diff.py:58
virtual void read(edm::EventPrincipal &eventPrincipal) override
unsigned int offset_
tbb::concurrent_queue< unsigned int > workerPool_
int init
Definition: HydjetWrapper.h:62
std::vector< ReaderInfo > workerJob_
std::vector< std::condition_variable * > cvReader_
U second(std::pair< T, U > const &p)
std::vector< bool > thread_quit_signal
unsigned char * buf_
#define end
Definition: vmac.h:37
unsigned int fileIndex_
bool first
Definition: L1TdeRCT.cc:75
std::atomic< bool > readComplete_
std::string fileName_
std::condition_variable startupCv_
tbb::concurrent_vector< InputChunk * > chunks_
std::atomic< bool > threadInit_
void FedRawDataInputSource::renameToNextFree ( std::string const &  fileName) const
private

Definition at line 741 of file FedRawDataInputSource.cc.

References daqDirector_, evf::EvFDaqDirector::getJumpFilePath(), getHLTPrescaleColumns::path, and source.

Referenced by deleteFile().

742 {
745 
746  edm::LogInfo("FedRawDataInputSource") << "Instead of delete, RENAME -: " << fileName
747  << " to: " << destination.string();
748  boost::filesystem::rename(source,destination);
749  boost::filesystem::rename(source.replace_extension(".jsn"),destination.replace_extension(".jsn"));
750 }
evf::EvFDaqDirector * daqDirector_
std::string getJumpFilePath() const
static std::string const source
Definition: EdmProvDump.cc:43
void FedRawDataInputSource::rewind_ ( )
overrideprivatevirtual

Reimplemented from edm::RawInputSource.

Definition at line 763 of file FedRawDataInputSource.cc.

764 {}
void FedRawDataInputSource::threadError ( )
private

Definition at line 1009 of file FedRawDataInputSource.cc.

References edm::hlt::Exception, and quit_threads_.

Referenced by InputFile::advance(), and getNextEvent().

1010 {
1011  quit_threads_=true;
1012  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1013 
1014 }
std::atomic< bool > quit_threads_

Friends And Related Function Documentation

friend class InputChunk
friend

Definition at line 42 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

friend class InputFile
friend

Open Root file and provide MEs ############.

Definition at line 41 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

Member Data Documentation

uint32_t FedRawDataInputSource::bufferInputRead_ = 0
private

Definition at line 152 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readNextChunkIntoBuffer().

unsigned int FedRawDataInputSource::checkEvery_ = 10
private

Definition at line 143 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

bool FedRawDataInputSource::chunkIsFree_ =false
private

Definition at line 116 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

InputFile* FedRawDataInputSource::currentFile_ = nullptr
private

Definition at line 115 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

int FedRawDataInputSource::currentFileIndex_ = -1
private

Definition at line 137 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

unsigned int FedRawDataInputSource::currentLumiSection_
private

Definition at line 101 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), getNextEvent(), and maybeOpenNewLumiSection().

std::vector<std::condition_variable*> FedRawDataInputSource::cvReader_
private
std::condition_variable FedRawDataInputSource::cvWakeup_
private

Definition at line 147 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

evf::EvFDaqDirector* FedRawDataInputSource::daqDirector_ =nullptr
private
const edm::DaqProvenanceHelper FedRawDataInputSource::daqProvenanceHelper_
private

Definition at line 94 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and read().

std::string FedRawDataInputSource::defPath_
private

Definition at line 77 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

jsoncollector::DataPointDefinition* FedRawDataInputSource::dpd_
private

Definition at line 105 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and grabNextJsonFile().

std::unique_ptr<FRDEventMsgView> FedRawDataInputSource::event_
private

Definition at line 96 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), fillFEDRawDataCollection(), and getNextEvent().

unsigned int FedRawDataInputSource::eventChunkBlock_
private
unsigned int FedRawDataInputSource::eventChunkSize_
private
edm::EventID FedRawDataInputSource::eventID_
private

Definition at line 98 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and read().

unsigned int FedRawDataInputSource::eventsThisLumi_
private

Definition at line 102 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), getNextEvent(), and read().

unsigned long FedRawDataInputSource::eventsThisRun_ = 0
private

Definition at line 103 of file FedRawDataInputSource.h.

std::mutex FedRawDataInputSource::fileDeleteLock_
private

Definition at line 140 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and read().

int FedRawDataInputSource::fileDescriptor_ = -1
private

Definition at line 151 of file FedRawDataInputSource.h.

Referenced by readNextChunkIntoBuffer().

std::list<std::pair<int,std::string> > FedRawDataInputSource::fileNamesToDelete_
private

Definition at line 139 of file FedRawDataInputSource.h.

tbb::concurrent_queue<InputFile*> FedRawDataInputSource::fileQueue_
private

Definition at line 126 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

std::list<std::pair<int,InputFile*> > FedRawDataInputSource::filesToDelete_
private
evf::FastMonitoringService* FedRawDataInputSource::fms_ =nullptr
private
tbb::concurrent_queue<InputChunk*> FedRawDataInputSource::freeChunks_
private

Definition at line 125 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), read(), and readSupervisor().

const std::string FedRawDataInputSource::fuOutputDir_
private

Definition at line 92 of file FedRawDataInputSource.h.

Referenced by grabNextJsonFile().

const bool FedRawDataInputSource::getLSFromFilename_
private

Definition at line 86 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), getNextEvent(), and readSupervisor().

std::mutex FedRawDataInputSource::mReader_
private

Definition at line 128 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), readWorker(), and ~FedRawDataInputSource().

std::mutex FedRawDataInputSource::mWakeup_
private

Definition at line 146 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

unsigned int FedRawDataInputSource::nStreams_ = 0
private

Definition at line 142 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

unsigned int FedRawDataInputSource::numBuffers_
private

Definition at line 82 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

unsigned int FedRawDataInputSource::numConcurrentReads_
private

Definition at line 83 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and ~FedRawDataInputSource().

edm::ProcessHistoryID FedRawDataInputSource::processHistoryID_
private

Definition at line 99 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), maybeOpenNewLumiSection(), and read().

std::atomic<bool> FedRawDataInputSource::quit_threads_
private
unsigned int FedRawDataInputSource::readBlocks_
private
std::unique_ptr<std::thread> FedRawDataInputSource::readSupervisorThread_
private

Definition at line 119 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and ~FedRawDataInputSource().

const edm::RunNumber_t FedRawDataInputSource::runNumber_
private

Definition at line 90 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and postForkReacquireResources().

bool FedRawDataInputSource::setExceptionState_ = false
private

Definition at line 133 of file FedRawDataInputSource.h.

Referenced by exceptionState(), getNextEvent(), and readWorker().

bool FedRawDataInputSource::singleBufferMode_
private

Definition at line 150 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

bool FedRawDataInputSource::startedSupervisorThread_ = false
private

Definition at line 118 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and ~FedRawDataInputSource().

std::condition_variable FedRawDataInputSource::startupCv_
private
std::mutex FedRawDataInputSource::startupLock_
private
std::vector<int>* FedRawDataInputSource::streamFileTrackerPtr_ = nullptr
private

Definition at line 141 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

const bool FedRawDataInputSource::testModeNoBuilderUnit_
private

Definition at line 88 of file FedRawDataInputSource.h.

Referenced by deleteFile(), FedRawDataInputSource(), and grabNextJsonFile().

std::vector<bool> FedRawDataInputSource::thread_quit_signal
private
std::atomic<bool> FedRawDataInputSource::threadInit_
private

Definition at line 154 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and readWorker().

const bool FedRawDataInputSource::verifyAdler32_
private

Definition at line 87 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

std::vector<ReaderInfo> FedRawDataInputSource::workerJob_
private

Definition at line 123 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), readSupervisor(), and readWorker().

tbb::concurrent_queue<unsigned int> FedRawDataInputSource::workerPool_
private

Definition at line 122 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), and readWorker().

std::vector<std::thread*> FedRawDataInputSource::workerThreads_
private