CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Static Public Member Functions | Protected Member Functions | Private Types | Private Member Functions | Private Attributes | Friends
FedRawDataInputSource Class Reference

#include <FedRawDataInputSource.h>

Inheritance diagram for FedRawDataInputSource:
edm::RawInputSource edm::InputSource edm::ProductRegistryHelper

Public Member Functions

 FedRawDataInputSource (edm::ParameterSet const &, edm::InputSourceDescription const &)
 
std::pair< bool, unsigned int > getEventReport (unsigned int lumi, bool erase)
 
virtual ~FedRawDataInputSource ()
 
- Public Member Functions inherited from edm::RawInputSource
 RawInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
 
virtual ~RawInputSource ()
 
- Public Member Functions inherited from edm::InputSource
std::shared_ptr< ActivityRegistryactReg () const
 Accessor for Activity Registry. More...
 
std::shared_ptr
< BranchIDListHelper const > 
branchIDListHelper () const
 Accessors for branchIDListHelper. More...
 
std::shared_ptr
< BranchIDListHelper > & 
branchIDListHelper ()
 
void closeFile (FileBlock *, bool cleaningUpAfterException)
 close current file More...
 
void doBeginJob ()
 Called by framework at beginning of job. More...
 
void doBeginLumi (LuminosityBlockPrincipal &lbp, ProcessContext const *)
 Called by framework at beginning of lumi block. More...
 
void doBeginRun (RunPrincipal &rp, ProcessContext const *)
 Called by framework at beginning of run. More...
 
void doEndJob ()
 Called by framework at end of job. More...
 
void doEndLumi (LuminosityBlockPrincipal &lbp, bool cleaningUpAfterException, ProcessContext const *)
 Called by framework at end of lumi block. More...
 
void doEndRun (RunPrincipal &rp, bool cleaningUpAfterException, ProcessContext const *)
 Called by framework at end of run. More...
 
void doPostForkReacquireResources (std::shared_ptr< multicore::MessageReceiverForSource >)
 
void doPreForkReleaseResources ()
 Called by the framework before forking the process. More...
 
ProcessingController::ForwardState forwardState () const
 
bool goToEvent (EventID const &eventID)
 
 InputSource (ParameterSet const &, InputSourceDescription const &)
 Constructor. More...
 
 InputSource (InputSource const &)=delete
 
void issueReports (EventID const &eventID)
 issue an event report More...
 
LuminosityBlockNumber_t luminosityBlock () const
 Accessor for current luminosity block number. More...
 
std::shared_ptr
< LuminosityBlockAuxiliary
luminosityBlockAuxiliary () const
 Called by the framework to merge or insert lumi in principal cache. More...
 
int maxEvents () const
 
int maxLuminosityBlocks () const
 
ModuleDescription const & moduleDescription () const
 Accessor for 'module' description. More...
 
ItemType nextItemType ()
 Advances the source to the next item. More...
 
InputSourceoperator= (InputSource const &)=delete
 
ProcessConfiguration const & processConfiguration () const
 Accessor for Process Configuration. More...
 
std::string const & processGUID () const
 Accessor for global process identifier. More...
 
ProcessHistoryRegistry const & processHistoryRegistry () const
 Accessors for process history registry. More...
 
ProcessHistoryRegistryprocessHistoryRegistry ()
 
ProcessingMode processingMode () const
 RunsLumisAndEvents (default), RunsAndLumis, or Runs. More...
 
std::shared_ptr
< ProductRegistry const > 
productRegistry () const
 Accessors for product registry. More...
 
std::shared_ptr
< ProductRegistry > & 
productRegistry ()
 
bool randomAccess () const
 
void readAndMergeLumi (LuminosityBlockPrincipal &lbp)
 Read next luminosity block (same as a prior lumi) More...
 
void readAndMergeRun (RunPrincipal &rp)
 Read next run (same as a prior run) More...
 
void readEvent (EventPrincipal &ep, StreamContext &)
 Read next event. More...
 
bool readEvent (EventPrincipal &ep, EventID const &, StreamContext &)
 Read a specific event. More...
 
std::unique_ptr< FileBlockreadFile ()
 Read next file. More...
 
void readLuminosityBlock (LuminosityBlockPrincipal &lumiPrincipal, HistoryAppender &historyAppender)
 Read next luminosity block (new lumi) More...
 
std::shared_ptr
< LuminosityBlockAuxiliary
readLuminosityBlockAuxiliary ()
 Read next luminosity block Auxilary. More...
 
void readRun (RunPrincipal &runPrincipal, HistoryAppender &historyAppender)
 Read next run (new run) More...
 
std::shared_ptr< RunAuxiliaryreadRunAuxiliary ()
 Read next run Auxiliary. More...
 
ProcessHistoryID const & reducedProcessHistoryID () const
 
void registerProducts ()
 Register any produced products. More...
 
int remainingEvents () const
 
int remainingLuminosityBlocks () const
 
void repeat ()
 Reset the remaining number of events/lumis to the maximum number. More...
 
SharedResourcesAcquirerresourceSharedWithDelayedReader ()
 Returns nullptr if no resource shared between the Source and a DelayedReader. More...
 
ProcessingController::ReverseState reverseState () const
 
void rewind ()
 Begin again at the first event. More...
 
RunNumber_t run () const
 Accessor for current run number. More...
 
std::shared_ptr< RunAuxiliaryrunAuxiliary () const
 Called by the framework to merge or insert run in principal cache. More...
 
void setLuminosityBlockNumber_t (LuminosityBlockNumber_t lb)
 Set the luminosity block ID. More...
 
void setRunNumber (RunNumber_t r)
 Set the run number. More...
 
void skipEvents (int offset)
 
bool skipForForking ()
 
std::shared_ptr
< ThinnedAssociationsHelper
const > 
thinnedAssociationsHelper () const
 Accessors for thinnedAssociationsHelper. More...
 
std::shared_ptr
< ThinnedAssociationsHelper > & 
thinnedAssociationsHelper ()
 
Timestamp const & timestamp () const
 Accessor for the current time, as seen by the input source. More...
 
virtual ~InputSource ()
 Destructor. More...
 

Static Public Member Functions

static void fillDescriptions (edm::ConfigurationDescriptions &descriptions)
 
- Static Public Member Functions inherited from edm::RawInputSource
static void fillDescription (ParameterSetDescription &description)
 
- Static Public Member Functions inherited from edm::InputSource
static const std::string & baseType ()
 
static void fillDescription (ParameterSetDescription &desc)
 
static void fillDescriptions (ConfigurationDescriptions &descriptions)
 
static void prevalidate (ConfigurationDescriptions &)
 

Protected Member Functions

virtual bool checkNextEvent () override
 
virtual void read (edm::EventPrincipal &eventPrincipal) override
 
- Protected Member Functions inherited from edm::RawInputSource
void makeEvent (EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
 
void setInputFileTransitionsEachEvent ()
 
- Protected Member Functions inherited from edm::InputSource
void decreaseRemainingEventsBy (int iSkipped)
 
bool eventCached () const
 
bool newLumi () const
 
bool newRun () const
 
ProcessHistoryRegistryprocessHistoryRegistryForUpdate ()
 
ProductRegistryproductRegistryUpdate ()
 
void reset () const
 
void resetEventCached ()
 
void resetLuminosityBlockAuxiliary (bool isNewLumi=true) const
 
void resetNewLumi ()
 
void resetNewRun ()
 
void resetRunAuxiliary (bool isNewRun=true) const
 
void setEventCached ()
 Called by the framework to merge or ached() const {return eventCached_;}. More...
 
void setLuminosityBlockAuxiliary (LuminosityBlockAuxiliary *lbp)
 
void setNewLumi ()
 
void setNewRun ()
 
void setRunAuxiliary (RunAuxiliary *rp)
 
void setTimestamp (Timestamp const &theTime)
 To set the current time, as seen by the input source. More...
 
virtual void skip (int offset)
 
ItemType state () const
 

Private Types

typedef std::pair< InputFile
*, InputChunk * > 
ReaderInfo
 

Private Member Functions

void createBoLSFile (const uint32_t lumiSection, bool checkIfExists)
 
void deleteFile (std::string const &)
 
bool exceptionState ()
 
edm::Timestamp fillFEDRawDataCollection (FEDRawDataCollection &)
 
evf::EvFDaqDirector::FileStatus getFile (unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
 
evf::EvFDaqDirector::FileStatus getNextEvent ()
 
int grabNextJsonFile (boost::filesystem::path const &)
 
long initFileList ()
 
void maybeOpenNewLumiSection (const uint32_t lumiSection)
 
evf::EvFDaqDirector::FileStatus nextEvent ()
 
virtual void postForkReacquireResources (std::shared_ptr< edm::multicore::MessageReceiverForSource >) override
 
virtual void preForkReleaseResources () override
 
void readNextChunkIntoBuffer (InputFile *file)
 
void readSupervisor ()
 
void readWorker (unsigned int tid)
 
void reportEventsThisLumiInSource (unsigned int lumi, unsigned int events)
 
virtual void rewind_ () override
 
void threadError ()
 

Private Attributes

uint32_t bufferInputRead_ = 0
 
unsigned int checkEvery_ = 10
 
bool chunkIsFree_ =false
 
InputFilecurrentFile_ = 0
 
int currentFileIndex_ = -1
 
unsigned int currentLumiSection_
 
std::vector
< std::condition_variable * > 
cvReader_
 
std::condition_variable cvWakeup_
 
evf::EvFDaqDirectordaqDirector_ = 0
 
const edm::DaqProvenanceHelper daqProvenanceHelper_
 
std::string defPath_
 
uint32 detectedFRDversion_ =0
 
jsoncollector::DataPointDefinitiondpd_
 
std::unique_ptr< FRDEventMsgViewevent_
 
unsigned int eventChunkBlock_
 
unsigned int eventChunkSize_
 
edm::EventID eventID_
 
uint32_t eventRunNumber_ =0
 
unsigned int eventsThisLumi_
 
unsigned long eventsThisRun_ = 0
 
std::mutex fileDeleteLock_
 
int fileDescriptor_ = -1
 
unsigned int fileListIndex_ = 0
 
const bool fileListMode_
 
std::vector< std::string > fileNames_
 
std::list< std::pair< int,
std::string > > 
fileNamesToDelete_
 
tbb::concurrent_queue
< InputFile * > 
fileQueue_
 
std::list< std::pair< int,
InputFile * > > 
filesToDelete_
 
evf::FastMonitoringServicefms_ = 0
 
tbb::concurrent_queue
< InputChunk * > 
freeChunks_
 
std::string fuOutputDir_
 
const bool getLSFromFilename_
 
uint32_t GTPEventID_ = 0
 
uint32_t L1EventID_ = 0
 
unsigned int maxBufferedFiles_
 
std::mutex monlock_
 
std::mutex mReader_
 
std::mutex mWakeup_
 
unsigned int nStreams_ = 0
 
unsigned int numBuffers_
 
unsigned int numConcurrentReads_
 
edm::ProcessHistoryID processHistoryID_
 
std::atomic< bool > quit_threads_
 
unsigned int readBlocks_
 
std::atomic< unsigned int > readingFilesCount_
 
std::unique_ptr< std::thread > readSupervisorThread_
 
edm::RunNumber_t runNumber_
 
bool setExceptionState_ = false
 
bool singleBufferMode_
 
std::map< unsigned int,
unsigned int > 
sourceEventsReport_
 
bool startedSupervisorThread_ = false
 
std::condition_variable startupCv_
 
std::mutex startupLock_
 
std::vector< int > * streamFileTrackerPtr_ = 0
 
unsigned char * tcds_pointer_
 
std::vector< bool > thread_quit_signal
 
std::atomic< bool > threadInit_
 
const bool useL1EventID_
 
const bool verifyAdler32_
 
const bool verifyChecksum_
 
std::vector< ReaderInfoworkerJob_
 
tbb::concurrent_queue
< unsigned int > 
workerPool_
 
std::vector< std::thread * > workerThreads_
 

Friends

class InputChunk
 
class InputFile
 Open Root file and provide MEs ############. More...
 

Additional Inherited Members

- Public Types inherited from edm::InputSource
enum  ItemType {
  IsInvalid, IsStop, IsFile, IsRun,
  IsLumi, IsEvent, IsRepeat, IsSynchronize
}
 
enum  ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents }
 
typedef
ProductRegistryHelper::TypeLabelList 
TypeLabelList
 

Detailed Description

Definition at line 43 of file FedRawDataInputSource.h.

Member Typedef Documentation

Definition at line 136 of file FedRawDataInputSource.h.

Constructor & Destructor Documentation

FedRawDataInputSource::FedRawDataInputSource ( edm::ParameterSet const &  pset,
edm::InputSourceDescription const &  desc 
)
explicit

Definition at line 54 of file FedRawDataInputSource.cc.

References assert(), edm::Timestamp::beginOfTime(), crc32c_hw_test(), cvReader_, daqDirector_, edm::DaqProvenanceHelper::daqInit(), daqProvenanceHelper_, defPath_, dpd_, eventChunkBlock_, eventChunkSize_, Exception, fileDeleteLock_, fileListMode_, filesToDelete_, fms_, freeChunks_, i, initFileList(), InputChunk, edm::Timestamp::invalidTimestamp(), numBuffers_, numConcurrentReads_, cppFunctionSkipper::operator, processHistoryID_, edm::InputSource::processHistoryRegistryForUpdate(), edm::InputSource::productRegistryUpdate(), quit_threads_, readBlocks_, readingFilesCount_, readWorker(), edm::InputSource::runAuxiliary(), runNumber_, evf::EvFDaqDirector::setDeleteTracking(), evf::EvFDaqDirector::setFMS(), evf::FastMonitoringService::setInputSource(), edm::InputSource::setNewRun(), edm::InputSource::setRunAuxiliary(), singleBufferMode_, startupCv_, startupLock_, AlCaHLTBitMon_QueryRunRegistry::string, thread_quit_signal, threadInit_, workerJob_, and workerThreads_.

55  :
57  defPath_(pset.getUntrackedParameter<std::string> ("buDefPath", std::string(getenv("CMSSW_BASE"))+"/src/EventFilter/Utilities/plugins/budef.jsd")),
58  eventChunkSize_(pset.getUntrackedParameter<unsigned int> ("eventChunkSize",32)*1048576),
59  eventChunkBlock_(pset.getUntrackedParameter<unsigned int> ("eventChunkBlock",32)*1048576),
60  numBuffers_(pset.getUntrackedParameter<unsigned int> ("numBuffers",2)),
61  maxBufferedFiles_(pset.getUntrackedParameter<unsigned int> ("maxBufferedFiles",2)),
62  getLSFromFilename_(pset.getUntrackedParameter<bool> ("getLSFromFilename", true)),
63  verifyAdler32_(pset.getUntrackedParameter<bool> ("verifyAdler32", true)),
64  verifyChecksum_(pset.getUntrackedParameter<bool> ("verifyChecksum", true)),
65  useL1EventID_(pset.getUntrackedParameter<bool> ("useL1EventID", false)),
66  fileNames_(pset.getUntrackedParameter<std::vector<std::string>> ("fileNames",std::vector<std::string>())),
67  fileListMode_(pset.getUntrackedParameter<bool> ("fileListMode", false)),
71  eventID_(),
74  tcds_pointer_(0),
75  eventsThisLumi_(0),
76  dpd_(nullptr)
77 {
78  char thishost[256];
79  gethostname(thishost, 255);
80  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: "
81  << std::endl << (eventChunkSize_/1048576)
82  << " MB on host " << thishost;
83 
84  long autoRunNumber = -1;
85  if (fileListMode_) {
86  autoRunNumber = initFileList();
87  if (autoRunNumber<0)
88  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") << "Run number not found from filename";
89  //override run number
90  runNumber_ = (edm::RunNumber_t)autoRunNumber;
91  edm::Service<evf::EvFDaqDirector>()->overrideRunNumber((unsigned int)autoRunNumber);
92  }
93 
95  setNewRun();
96  //todo:autodetect from file name (assert if names differ)
99 
100  std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
101  defPath_ = std::string(getenv("CMSSW_BASE")) + "/" + defPathSuffix;
102  struct stat statbuf;
103  if (stat(defPath_.c_str(), &statbuf)) {
104  defPath_ = std::string(getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
105  if (stat(defPath_.c_str(), &statbuf)) {
106  defPath_ = defPathSuffix;
107  }
108  }
109 
110  dpd_ = new DataPointDefinition();
111  std::string defLabel = "data";
112  DataPointDefinition::getDataPointDefinitionFor(defPath_, dpd_,&defLabel);
113 
114  //make sure that chunk size is N * block size
119 
120  if (!numBuffers_)
121  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") <<
122  "no reading enabled with numBuffers parameter 0";
123 
127 
128  if (!crc32c_hw_test())
129  edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
130 
131  //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
132  if (fileListMode_) {
133  try {
135  } catch(...) {}
136  }
137  else {
139  if (!fms_) {
140  throw cms::Exception("FedRawDataInputSource") << "FastMonitoringService not found";
141  }
142  }
143 
145  if (!daqDirector_)
146  cms::Exception("FedRawDataInputSource") << "EvFDaqDirector not found";
147 
148  //set DaqDirector to delete files in preGlobalEndLumi callback
150  if (fms_) {
152  fms_->setInputSource(this);
153  }
154  //should delete chunks when run stops
155  for (unsigned int i=0;i<numBuffers_;i++) {
157  }
158 
159  quit_threads_ = false;
160 
161  for (unsigned int i=0;i<numConcurrentReads_;i++)
162  {
163  std::unique_lock<std::mutex> lk(startupLock_);
164  //issue a memory fence here and in threads (constructor was segfaulting without this)
165  thread_quit_signal.push_back(false);
166  workerJob_.push_back(ReaderInfo(nullptr,nullptr));
167  cvReader_.push_back(new std::condition_variable);
168  threadInit_.store(false,std::memory_order_release);
169  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker,this,i));
170  startupCv_.wait(lk);
171  }
172 
173  runAuxiliary()->setProcessHistoryID(processHistoryID_);
174 }
int i
Definition: DBlmapReader.cc:9
std::vector< std::string > fileNames_
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:350
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
bool crc32c_hw_test()
Definition: crc32c.cc:354
jsoncollector::DataPointDefinition * dpd_
assert(m_qm.get())
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
std::vector< std::condition_variable * > cvReader_
static Timestamp beginOfTime()
Definition: Timestamp.h:103
std::vector< bool > thread_quit_signal
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
edm::ProcessHistoryID processHistoryID_
const edm::DaqProvenanceHelper daqProvenanceHelper_
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
std::vector< std::thread * > workerThreads_
void setInputSource(FedRawDataInputSource *inputSource)
std::list< std::pair< int, InputFile * > > filesToDelete_
std::condition_variable startupCv_
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:262
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:351
std::atomic< bool > threadInit_
std::pair< InputFile *, InputChunk * > ReaderInfo
evf::EvFDaqDirector * daqDirector_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:353
void readWorker(unsigned int tid)
std::atomic< unsigned int > readingFilesCount_
evf::FastMonitoringService * fms_
unsigned int RunNumber_t
tbb::concurrent_queue< InputChunk * > freeChunks_
void setFMS(evf::FastMonitoringService *fms)
FedRawDataInputSource::~FedRawDataInputSource ( )
virtual

Definition at line 176 of file FedRawDataInputSource.cc.

References cvReader_, deleteFile(), filesToDelete_, i, mReader_, numConcurrentReads_, quit_threads_, readSupervisorThread_, startedSupervisorThread_, thread_quit_signal, and workerThreads_.

177 {
178  quit_threads_=true;
179 
180  //delete any remaining open files
181  for (auto it = filesToDelete_.begin();it!=filesToDelete_.end();it++) {
182  deleteFile(it->second->fileName_);
183  delete it->second;
184  }
186  readSupervisorThread_->join();
187  }
188  else {
189  //join aux threads in case the supervisor thread was not started
190  for (unsigned int i=0;i<workerThreads_.size();i++) {
191  std::unique_lock<std::mutex> lk(mReader_);
192  thread_quit_signal[i]=true;
193  cvReader_[i]->notify_one();
194  lk.unlock();
195  workerThreads_[i]->join();
196  delete workerThreads_[i];
197  }
198  }
199  for (unsigned int i=0;i<numConcurrentReads_;i++) delete cvReader_[i];
200  /*
201  for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
202  InputChunk *ch;
203  while (!freeChunks_.try_pop(ch)) {}
204  delete ch;
205  }
206  */
207 }
int i
Definition: DBlmapReader.cc:9
std::atomic< bool > quit_threads_
std::unique_ptr< std::thread > readSupervisorThread_
std::vector< std::condition_variable * > cvReader_
std::vector< bool > thread_quit_signal
std::vector< std::thread * > workerThreads_
std::list< std::pair< int, InputFile * > > filesToDelete_
void deleteFile(std::string const &)

Member Function Documentation

bool FedRawDataInputSource::checkNextEvent ( )
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 226 of file FedRawDataInputSource.cc.

References evf::EvFDaqDirector::createProcessingNotificationMaybe(), currentLumiSection_, daqDirector_, evf::EvFDaqDirector::emptyLumisectionMode(), event_, eventRunNumber_, eventsThisLumi_, newFWLiteAna::found, fuOutputDir_, evf::EvFDaqDirector::getEoLSFilePathOnBU(), evf::EvFDaqDirector::getEoLSFilePathOnFU(), evf::EvFDaqDirector::getEoRFilePathOnFU(), getLSFromFilename_, L1EventID_, evf::EvFDaqDirector::lockFULocal2(), maybeOpenNewLumiSection(), evf::EvFDaqDirector::newLumi, nextEvent(), evf::EvFDaqDirector::noFile, readSupervisor(), readSupervisorThread_, reportEventsThisLumiInSource(), edm::InputSource::resetLuminosityBlockAuxiliary(), evf::EvFDaqDirector::runEnded, edm::InputSource::setEventCached(), startedSupervisorThread_, startupCv_, startupLock_, AlCaHLTBitMon_QueryRunRegistry::string, and evf::EvFDaqDirector::unlockFULocal2().

227 {
229  {
230  //late init of directory variable
232 
233  //this thread opens new files and dispatches reading to worker readers
234  //threadInit_.store(false,std::memory_order_release);
235  std::unique_lock<std::mutex> lk(startupLock_);
236  readSupervisorThread_.reset(new std::thread(&FedRawDataInputSource::readSupervisor,this));
238  startupCv_.wait(lk);
239  }
240  //signal hltd to start event accounting
243 
244  switch (nextEvent() ) {
246 
247  //maybe create EoL file in working directory before ending run
248  struct stat buf;
249  if ( currentLumiSection_ > 0) {
250  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
251  if (eolFound) {
253  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
254  if ( !found ) {
256  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
257  close(eol_fd);
259  }
260  }
261  }
262  //also create EoR file in FU data directory
263  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(),&buf) == 0);
264  if (!eorFound) {
265  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
266  close(eor_fd);
267  }
269  eventsThisLumi_=0;
271  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
272  return false;
273  }
275  //this is not reachable
276  return true;
277  }
279  //std::cout << "--------------NEW LUMI---------------" << std::endl;
280  return true;
281  }
282  default: {
283  if (!getLSFromFilename_) {
284  //get new lumi from file header
285  if (event_->lumi() > currentLumiSection_) {
287  eventsThisLumi_=0;
288  maybeOpenNewLumiSection( event_->lumi() );
289  }
290  }
291  eventRunNumber_=event_->run();
292  L1EventID_ = event_->event();
293 
294  setEventCached();
295 
296  return true;
297  }
298  }
299 }
void maybeOpenNewLumiSection(const uint32_t lumiSection)
void createProcessingNotificationMaybe() const
std::unique_ptr< std::thread > readSupervisorThread_
std::string getEoLSFilePathOnBU(const unsigned int ls) const
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:382
std::unique_ptr< FRDEventMsgView > event_
std::condition_variable startupCv_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:365
evf::EvFDaqDirector * daqDirector_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
bool emptyLumisectionMode() const
std::string getEoLSFilePathOnFU(const unsigned int ls) const
evf::EvFDaqDirector::FileStatus nextEvent()
std::string getEoRFilePathOnFU() const
void FedRawDataInputSource::createBoLSFile ( const uint32_t  lumiSection,
bool  checkIfExists 
)
private

Definition at line 301 of file FedRawDataInputSource.cc.

References daqDirector_, evf::EvFDaqDirector::getBoLSFilePathOnFU(), and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by maybeOpenNewLumiSection().

302 {
303  //used for backpressure mechanisms and monitoring
304  const std::string fuBoLS = daqDirector_->getBoLSFilePathOnFU(lumiSection);
305  struct stat buf;
306  if (checkIfExists==false || stat(fuBoLS.c_str(), &buf) != 0) {
307  int bol_fd = open(fuBoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
308  close(bol_fd);
309  }
310 }
std::string getBoLSFilePathOnFU(const unsigned int ls) const
evf::EvFDaqDirector * daqDirector_
void FedRawDataInputSource::deleteFile ( std::string const &  fileName)
private

Definition at line 624 of file FedRawDataInputSource.cc.

References cppFunctionSkipper::exception, fileListMode_, MillePedeFileConverter_cfg::fileName, LogDebug, fed_dqm_sourceclient-live_cfg::path, and MatrixUtil::remove().

Referenced by getNextEvent(), read(), and ~FedRawDataInputSource().

625 {
626  //no deletion in file list mode
627  if (fileListMode_) return;
628 
629  const boost::filesystem::path filePath(fileName);
630  LogDebug("FedRawDataInputSource") << "Deleting input file -:" << fileName;
631  try {
632  //sometimes this fails but file gets deleted
633  boost::filesystem::remove(filePath);
634  }
635  catch (const boost::filesystem::filesystem_error& ex)
636  {
637  edm::LogError("FedRawDataInputSource") << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
638  << ". Trying again.";
639  usleep(100000);
640  try {
641  boost::filesystem::remove(filePath);
642  }
643  catch (const boost::filesystem::filesystem_error&) {/*file gets deleted first time but exception is still thrown*/}
644  }
645  catch (std::exception& ex)
646  {
647  edm::LogError("FedRawDataInputSource") << " - deleteFile std::exception CAUGHT -: " << ex.what()
648  << ". Trying again.";
649  usleep(100000);
650  try {
651  boost::filesystem::remove(filePath);
652  } catch (std::exception&) {/*file gets deleted first time but exception is still thrown*/}
653  }
654 }
#define LogDebug(id)
bool FedRawDataInputSource::exceptionState ( )
inlineprivate

Definition at line 74 of file FedRawDataInputSource.h.

References setExceptionState_.

Referenced by InputFile::advance(), and getNextEvent().

void FedRawDataInputSource::fillDescriptions ( edm::ConfigurationDescriptions descriptions)
static

Definition at line 209 of file FedRawDataInputSource.cc.

References edm::ConfigurationDescriptions::add(), edm::ParameterSetDescription::addUntracked(), edm::ParameterSetDescription::setAllowAnything(), edm::ParameterSetDescription::setComment(), and edm::ParameterDescriptionNode::setComment().

210 {
212  desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk");
213  desc.addUntracked<unsigned int> ("eventChunkSize",32)->setComment("Input buffer (chunk) size");
214  desc.addUntracked<unsigned int> ("eventChunkBlock",32)->setComment("Block size used in a single file read call (must be smaller or equal to buffer size)");
215  desc.addUntracked<unsigned int> ("numBuffers",2)->setComment("Number of buffers used for reading input");
216  desc.addUntracked<unsigned int> ("maxBufferedFiles",2)->setComment("Maximum number of simultaneously buffered raw files");
217  desc.addUntracked<bool> ("verifyAdler32", true)->setComment("Verify event Adler32 checksum with FRDv3 or v4");
218  desc.addUntracked<bool> ("verifyChecksum", true)->setComment("Verify event CRC-32C checksum of FRDv5 or higher");
219  desc.addUntracked<bool> ("useL1EventID", false)->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
220  desc.addUntracked<bool> ("fileListMode", false)->setComment("Use fileNames parameter to directly specify raw files to open");
221  desc.addUntracked<std::vector<std::string>> ("fileNames", std::vector<std::string>())->setComment("file list used when fileListMode is enabled");
222  desc.setAllowAnything();
223  descriptions.add("source", desc);
224 }
void setComment(std::string const &value)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void setAllowAnything()
allow any parameter label/value pairs
void setComment(std::string const &value)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection ( FEDRawDataCollection rawData)
private

Definition at line 726 of file FedRawDataInputSource.cc.

References assert(), FEDRawData::data(), event(), event_, fedt_struct::eventsize, evf::evtn::evm_board_sense(), Exception, FED_EVSZ_EXTRACT, FED_SOID_EXTRACT, FEDRawDataCollection::FEDData(), stage2MP7BufferRaw_cfi::fedId, evf::evtn::get(), evf::evtn::getgpshigh(), evf::evtn::getgpslow(), evf::evtn::gtpe_board_sense(), evf::evtn::gtpe_get(), GTPEventID_, FEDNumbering::MAXFEDID, FEDNumbering::MINTCDSuTCAFEDID, FEDNumbering::MINTriggerEGTPFEDID, FEDNumbering::MINTriggerGTPFEDID, FEDRawData::resize(), fedh_struct::sourceid, and tcds_pointer_.

Referenced by read().

727 {
728  edm::TimeValue_t time;
729  timeval stv;
730  gettimeofday(&stv,0);
731  time = stv.tv_sec;
732  time = (time << 32) + stv.tv_usec;
733  edm::Timestamp tstamp(time);
734 
735  uint32_t eventSize = event_->eventSize();
736  char* event = (char*)event_->payload();
737  GTPEventID_=0;
738  tcds_pointer_ = 0;
739  while (eventSize > 0) {
740  assert(eventSize>=sizeof(fedt_t));
741  eventSize -= sizeof(fedt_t);
742  const fedt_t* fedTrailer = (fedt_t*) (event + eventSize);
743  const uint32_t fedSize = FED_EVSZ_EXTRACT(fedTrailer->eventsize) << 3; //trailer length counts in 8 bytes
744  assert(eventSize>=fedSize - sizeof(fedt_t));
745  eventSize -= (fedSize - sizeof(fedt_t));
746  const fedh_t* fedHeader = (fedh_t *) (event + eventSize);
747  const uint16_t fedId = FED_SOID_EXTRACT(fedHeader->sourceid);
748  if(fedId>FEDNumbering::MAXFEDID)
749  {
750  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
751  }
752  if (fedId == FEDNumbering::MINTCDSuTCAFEDID) {
753  tcds_pointer_ = (unsigned char *)(event + eventSize );
754  }
755  if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
756  if (evf::evtn::evm_board_sense((unsigned char*) fedHeader,fedSize))
757  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,true);
758  else
759  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,false);
760  //evf::evtn::evm_board_setformat(fedSize);
761  const uint64_t gpsl = evf::evtn::getgpslow((unsigned char*) fedHeader);
762  const uint64_t gpsh = evf::evtn::getgpshigh((unsigned char*) fedHeader);
763  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
764  }
765  //take event ID from GTPE FED
766  if (fedId == FEDNumbering::MINTriggerEGTPFEDID && GTPEventID_==0) {
767  if (evf::evtn::gtpe_board_sense((unsigned char*)fedHeader)) {
768  GTPEventID_ = evf::evtn::gtpe_get((unsigned char*) fedHeader);
769  }
770  }
771  FEDRawData& fedData = rawData.FEDData(fedId);
772  fedData.resize(fedSize);
773  memcpy(fedData.data(), event + eventSize, fedSize);
774  }
775  assert(eventSize == 0);
776 
777  return tstamp;
778 }
unsigned int getgpshigh(const unsigned char *)
bool gtpe_board_sense(const unsigned char *p)
unsigned int get(const unsigned char *, bool)
assert(m_qm.get())
unsigned int sourceid
Definition: fed_header.h:32
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
void resize(size_t newsize)
Definition: FEDRawData.cc:32
struct fedt_struct fedt_t
#define FED_EVSZ_EXTRACT(a)
Definition: fed_trailer.h:36
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
unsigned long long TimeValue_t
Definition: Timestamp.h:28
bool evm_board_sense(const unsigned char *p, size_t size)
unsigned long long uint64_t
Definition: Time.h:15
std::unique_ptr< FRDEventMsgView > event_
unsigned int eventsize
Definition: fed_trailer.h:33
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:28
unsigned int gtpe_get(const unsigned char *)
unsigned int getgpslow(const unsigned char *)
#define FED_SOID_EXTRACT(a)
Definition: fed_header.h:53
std::pair< bool, unsigned int > FedRawDataInputSource::getEventReport ( unsigned int  lumi,
bool  erase 
)

Definition at line 1325 of file FedRawDataInputSource.cc.

References CommonMethods::lock(), monlock_, runTheMatrix::ret, and sourceEventsReport_.

Referenced by evf::FastMonitoringService::preGlobalEndLumi().

1326 {
1327  std::lock_guard<std::mutex> lock(monlock_);
1328  auto itr = sourceEventsReport_.find(lumi);
1329  if (itr!=sourceEventsReport_.end()) {
1330  auto && ret = std::pair<bool,unsigned int>(true,itr->second);
1331  if (erase)
1332  sourceEventsReport_.erase(itr);
1333  return ret;
1334  }
1335  else
1336  return std::pair<bool,unsigned int>(false,0);
1337 }
tuple ret
prodAgent to be discontinued
tuple lumi
Definition: fjr2json.py:35
std::map< unsigned int, unsigned int > sourceEventsReport_
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getFile ( unsigned int &  ls,
std::string &  nextFile,
uint32_t &  fsize,
uint64_t &  lockWaitTime 
)
private

Definition at line 1368 of file FedRawDataInputSource.cc.

References fileListIndex_, MillePedeFileConverter_cfg::fileName, fileNames_, evf::EvFDaqDirector::newFile, fed_dqm_sourceclient-live_cfg::path, evf::EvFDaqDirector::runEnded, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by readSupervisor().

1369 {
1370  if (fileListIndex_ < fileNames_.size()) {
1371  nextFile = fileNames_[fileListIndex_];
1372  if (nextFile.find("file://")==0) nextFile=nextFile.substr(7);
1373  else if (nextFile.find("file:")==0) nextFile=nextFile.substr(5);
1374  boost::filesystem::path fileName = nextFile;
1375  std::string fileStem = fileName.stem().string();
1376  if (fileStem.find("ls")) fileStem = fileStem.substr(fileStem.find("ls")+2);
1377  if (fileStem.find("_")) fileStem = fileStem.substr(0,fileStem.find("_"));
1378  ls = boost::lexical_cast<unsigned int>(fileStem);
1379  //fsize = 0;
1380  //lockWaitTime = 0;
1381  fileListIndex_++;
1383  }
1384  else
1386 }
std::vector< std::string > fileNames_
def ls
Definition: eostools.py:348
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent ( )
inlineprivate

Definition at line 363 of file FedRawDataInputSource.cc.

References InputFile::advance(), assert(), bufferInputRead_, InputFile::bufferPosition_, checkEvery_, chunkIsFree_, InputFile::chunkPosition_, InputFile::chunks_, crc32c(), InputFile::currentChunk_, currentFile_, currentFileIndex_, currentLumiSection_, cvWakeup_, daqDirector_, deleteFile(), detectedFRDversion_, event_, eventChunkSize_, eventsThisLumi_, Exception, exceptionState(), fileDeleteLock_, InputFile::fileName_, fileQueue_, InputFile::fileSize_, filesToDelete_, fms_, FRDHeaderVersionSize, freeChunks_, getLSFromFilename_, evf::EvFDaqDirector::getStreamFileTracker(), evf::EvFDaqDirector::isSingleStreamThread(), dttmaxenums::L, InputFile::lumi_, maybeOpenNewLumiSection(), InputFile::moveToPreviousChunk(), mWakeup_, InputFile::nChunks_, InputFile::nEvents_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, InputFile::nProcessed_, nStreams_, InputFile::parent_, readingFilesCount_, readNextChunkIntoBuffer(), reportEventsThisLumiInSource(), InputFile::rewindChunk(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, evf::EvFDaqDirector::sameFile, evf::FastMonitoringService::setExceptionDetected(), setExceptionState_, singleBufferMode_, ntuplemaker::status, InputFile::status_, streamFileTrackerPtr_, threadError(), evf::EvFDaqDirector::updateFileIndex(), verifyAdler32_, verifyChecksum_, and InputFile::waitForChunk().

Referenced by nextEvent().

364 {
365 
367  if (!currentFile_)
368  {
369  if (!streamFileTrackerPtr_) {
373  }
374 
376  if (!fileQueue_.try_pop(currentFile_))
377  {
378  //sleep until wakeup (only in single-buffer mode) or timeout
379  std::unique_lock<std::mutex> lkw(mWakeup_);
380  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_)
382  }
383  status = currentFile_->status_;
384  if ( status == evf::EvFDaqDirector::runEnded)
385  {
386  delete currentFile_;
387  currentFile_=nullptr;
388  return status;
389  }
390  else if ( status == evf::EvFDaqDirector::runAbort)
391  {
392  throw cms::Exception("FedRawDataInputSource::getNextEvent") << "Run has been aborted by the input source reader thread";
393  }
394  else if (status == evf::EvFDaqDirector::newLumi)
395  {
396  if (getLSFromFilename_) {
399  eventsThisLumi_=0;
401  }
402  }
403  else {//let this be picked up from next event
405  }
406 
407  delete currentFile_;
408  currentFile_=nullptr;
409  return status;
410  }
411  else if (status == evf::EvFDaqDirector::newFile) {
414  }
415  else
416  assert(0);
417  }
418 
419  //file is empty
420  if (!currentFile_->fileSize_) {
422  //try to open new lumi
424  if (getLSFromFilename_)
427  eventsThisLumi_=0;
429  }
430  //immediately delete empty file
432  delete currentFile_;
433  currentFile_=nullptr;
435  }
436 
437  //file is finished
440  //release last chunk (it is never released elsewhere)
443  {
444  throw cms::Exception("FedRawDataInputSource::getNextEvent")
445  << "Fully processed " << currentFile_->nProcessed_
446  << " from the file " << currentFile_->fileName_
447  << " but according to BU JSON there should be "
448  << currentFile_->nEvents_ << " events";
449  }
450  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
451  if (singleBufferMode_) {
452  std::unique_lock<std::mutex> lkw(mWakeup_);
453  cvWakeup_.notify_one();
454  }
457  //put the file in pending delete list;
458  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
459  filesToDelete_.push_back(std::pair<int,InputFile*>(currentFileIndex_,currentFile_));
460  }
461  else {
462  //in single-thread and stream jobs, events are already processed
464  delete currentFile_;
465  }
466  currentFile_=nullptr;
468  }
469 
470 
471  //file is too short
473  {
474  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
475  "Premature end of input file while reading event header";
476  }
477  if (singleBufferMode_) {
478 
479  //should already be there
481  usleep(10000);
483  }
484 
485  unsigned char *dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
486 
487  //conditions when read amount is not sufficient for the header to fit
488  if (!bufferInputRead_ || bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_]
490  {
492 
493  if (detectedFRDversion_==0) {
494  detectedFRDversion_=*((uint32*)dataPosition);
495  if (detectedFRDversion_>5)
496  throw cms::Exception("FedRawDataInputSource::getNextEvent")
497  << "Unknown FRD version -: " << detectedFRDversion_;
498  assert(detectedFRDversion_>=1);
499  }
500 
501  //recalculate chunk position
502  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
503  if ( bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_])
504  {
505  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
506  "Premature end of input file while reading event header";
507  }
508  }
509 
510  event_.reset( new FRDEventMsgView(dataPosition) );
511  if (event_->size()>eventChunkSize_) {
512  throw cms::Exception("FedRawDataInputSource::getNextEvent")
513  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
514  << " run:" << event_->run() << " of size:" << event_->size()
515  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
516  }
517 
518  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
519 
521  {
522  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
523  "Premature end of input file while reading event data";
524  }
525  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
527  //recalculate chunk position
528  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
529  event_.reset( new FRDEventMsgView(dataPosition) );
530  }
531  currentFile_->bufferPosition_ += event_->size();
532  currentFile_->chunkPosition_ += event_->size();
533  //last chunk is released when this function is invoked next time
534 
535  }
536  //multibuffer mode:
537  else
538  {
539  //wait for the current chunk to become added to the vector
541  usleep(10000);
543  }
544 
545  //check if header is at the boundary of two chunks
546  chunkIsFree_ = false;
547  unsigned char *dataPosition;
548 
549  //read header, copy it to a single chunk if necessary
550  bool chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]);
551 
552  event_.reset( new FRDEventMsgView(dataPosition) );
553  if (event_->size()>eventChunkSize_) {
554  throw cms::Exception("FedRawDataInputSource::getNextEvent")
555  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
556  << " run:" << event_->run() << " of size:" << event_->size()
557  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
558  }
559 
560  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
561 
563  {
564  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
565  "Premature end of input file while reading event data";
566  }
567 
568  if (chunkEnd) {
569  //header was at the chunk boundary, we will have to move payload as well
570  currentFile_->moveToPreviousChunk(msgSize,FRDHeaderVersionSize[detectedFRDversion_]);
571  chunkIsFree_ = true;
572  }
573  else {
574  //header was contiguous, but check if payload fits the chunk
575  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
576  //rewind to header start position
577  currentFile_->rewindChunk(FRDHeaderVersionSize[detectedFRDversion_]);
578  //copy event to a chunk start and move pointers
579  chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]+msgSize);
580  assert(chunkEnd);
581  chunkIsFree_=true;
582  //header is moved
583  event_.reset( new FRDEventMsgView(dataPosition) );
584  }
585  else {
586  //everything is in a single chunk, only move pointers forward
587  chunkEnd = currentFile_->advance(dataPosition,msgSize);
588  assert(!chunkEnd);
589  chunkIsFree_=false;
590  }
591  }
592  }//end multibuffer mode
593 
594  if (verifyChecksum_ && event_->version() >= 5)
595  {
596  uint32_t crc=0;
597  crc = crc32c(crc,(const unsigned char*)event_->payload(),event_->eventSize());
598  if ( crc != event_->crc32c() ) {
600  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
601  "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() <<
602  " but calculated 0x" << crc;
603  }
604  }
605  else if ( verifyAdler32_ && event_->version() >= 3)
606  {
607  uint32_t adler = adler32(0L,Z_NULL,0);
608  adler = adler32(adler,(Bytef*)event_->payload(),event_->eventSize());
609 
610  if ( adler != event_->adler32() ) {
612  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
613  "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() <<
614  " but calculated 0x" << adler;
615  }
616  }
617 
618 
620 
622 }
unsigned int lumi_
uint32_t chunkPosition_
std::condition_variable cvWakeup_
void setExceptionDetected(unsigned int ls)
std::vector< int > * getStreamFileTracker()
void maybeOpenNewLumiSection(const uint32_t lumiSection)
void rewindChunk(const size_t size)
const uint32 FRDHeaderVersionSize[6]
assert(m_qm.get())
std::vector< int > * streamFileTrackerPtr_
evf::EvFDaqDirector::FileStatus status_
FedRawDataInputSource * parent_
uint32_t bufferPosition_
void updateFileIndex(int const &fileIndex)
bool advance(unsigned char *&dataPosition, const size_t size)
void moveToPreviousChunk(const size_t size, const size_t offset)
bool waitForChunk(unsigned int chunkid)
unsigned int uint32
Definition: MsgTools.h:13
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
Definition: crc32c.cc:340
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
unsigned int currentChunk_
tbb::concurrent_queue< InputFile * > fileQueue_
std::unique_ptr< FRDEventMsgView > event_
std::string fileName_
tbb::concurrent_vector< InputChunk * > chunks_
evf::EvFDaqDirector * daqDirector_
std::atomic< unsigned int > readingFilesCount_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
void deleteFile(std::string const &)
evf::FastMonitoringService * fms_
tbb::concurrent_queue< InputChunk * > freeChunks_
tuple status
Definition: ntuplemaker.py:245
void readNextChunkIntoBuffer(InputFile *file)
int FedRawDataInputSource::grabNextJsonFile ( boost::filesystem::path const &  jsonSourcePath)
private

Definition at line 780 of file FedRawDataInputSource.cc.

References filterCSVwithJSON::copy, daqDirector_, data, jsoncollector::DataPoint::deserialize(), reco::dp, dpd_, alignCSCRings::e, cppFunctionSkipper::exception, Exception, fuOutputDir_, jsoncollector::DataPoint::getData(), jsoncollector::DataPointDefinition::getNames(), i, LogDebug, Json::Reader::parse(), fed_dqm_sourceclient-live_cfg::path, matplotRender::reader, MatrixUtil::remove(), AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, and evf::EvFDaqDirector::unlockFULocal().

Referenced by readSupervisor().

781 {
783  try {
784  // assemble json destination path
786 
787  //TODO:should be ported to use fffnaming
788  std::ostringstream fileNameWithPID;
789  fileNameWithPID << jsonSourcePath.stem().string() << "_pid"
790  << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
791  jsonDestPath /= fileNameWithPID.str();
792 
793  LogDebug("FedRawDataInputSource") << "JSON rename -: " << jsonSourcePath << " to "
794  << jsonDestPath;
795  try {
796  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
797  }
798  catch (const boost::filesystem::filesystem_error& ex)
799  {
800  // Input dir gone?
801  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
802  // << " Maybe the file is not yet visible by FU. Trying again in one second";
803  sleep(1);
804  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
805  }
807 
808  try {
809  //sometimes this fails but file gets deleted
810  boost::filesystem::remove(jsonSourcePath);
811  }
812  catch (const boost::filesystem::filesystem_error& ex)
813  {
814  // Input dir gone?
815  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
816  }
817  catch (std::exception& ex)
818  {
819  // Input dir gone?
820  edm::LogError("FedRawDataInputSource") << "grabNextFile std::exception CAUGHT -: " << ex.what();
821  }
822 
823  boost::filesystem::ifstream ij(jsonDestPath);
824  Json::Value deserializeRoot;
826 
827  if (!reader.parse(ij, deserializeRoot))
828  throw std::runtime_error("Cannot deserialize input JSON file");
829 
830  //read BU JSON
832  DataPoint dp;
833  dp.deserialize(deserializeRoot);
834  bool success = false;
835  for (unsigned int i=0;i<dpd_->getNames().size();i++) {
836  if (dpd_->getNames().at(i)=="NEvents")
837  if (i<dp.getData().size()) {
838  data = dp.getData()[i];
839  success=true;
840  }
841  }
842  if (!success) {
843  if (dp.getData().size())
844  data = dp.getData()[0];
845  else
846  throw cms::Exception("FedRawDataInputSource::grabNextJsonFile") <<
847  " error reading number of events from BU JSON -: No input value " << data;
848  }
849  return boost::lexical_cast<int>(data);
850 
851  }
852  catch (const boost::filesystem::filesystem_error& ex)
853  {
854  // Input dir gone?
856  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
857  }
858  catch (std::runtime_error e)
859  {
860  // Another process grabbed the file and NFS did not register this
862  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - runtime Exception -: " << e.what();
863  }
864 
865  catch( boost::bad_lexical_cast const& ) {
866  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - error parsing number of events from BU JSON. "
867  << "Input value is -: " << data;
868  }
869 
870  catch (std::exception e)
871  {
872  // BU run directory disappeared?
874  edm::LogError("FedRawDataInputSource") << "grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
875  }
876 
877  return -1;
878 }
#define LogDebug(id)
std::vector< std::string > & getData()
Definition: DataPoint.h:58
int i
Definition: DBlmapReader.cc:9
jsoncollector::DataPointDefinition * dpd_
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
Represents a JSON value.
Definition: value.h:111
virtual void deserialize(Json::Value &root)
Definition: DataPoint.cc:56
auto dp
Definition: deltaR.h:22
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
evf::EvFDaqDirector * daqDirector_
Unserialize a JSON document into a Value.
Definition: reader.h:16
std::vector< std::string > const & getNames()
long FedRawDataInputSource::initFileList ( )
private

Definition at line 1339 of file FedRawDataInputSource.cc.

References a, b, end, MillePedeFileConverter_cfg::fileName, fileNames_, fed_dqm_sourceclient-live_cfg::path, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FedRawDataInputSource().

1340 {
1341  std::sort(fileNames_.begin(),fileNames_.end(),
1342  [](std::string a, std::string b) {
1343  if (a.rfind("/")!=std::string::npos) a=a.substr(a.rfind("/"));
1344  if (b.rfind("/")!=std::string::npos) b=b.substr(b.rfind("/"));
1345  return b > a;});
1346 
1347  if (fileNames_.size()) {
1348  //get run number from first file in the vector
1350  std::string fileStem = fileName.stem().string();
1351  auto end = fileStem.find("_");
1352  if (fileStem.find("run")==0) {
1353  std::string runStr = fileStem.substr(3,end-3);
1354  try {
1355  //get long to support test run numbers < 2^32
1356  long rval = boost::lexical_cast<long>(runStr);
1357  edm::LogInfo("FedRawDataInputSource") << "Autodetected run number in fileListMode -: "<< rval;
1358  return rval;
1359  }
1360  catch( boost::bad_lexical_cast const& ) {
1361  edm::LogWarning("FedRawDataInputSource") << "Unable to autodetect run number in fileListMode from file -: "<< fileName;
1362  }
1363  }
1364  }
1365  return -1;
1366 }
std::vector< std::string > fileNames_
#define end
Definition: vmac.h:37
unsigned long long int rval
Definition: vlib.h:22
double b
Definition: hdecay.h:120
double a
Definition: hdecay.h:121
void FedRawDataInputSource::maybeOpenNewLumiSection ( const uint32_t  lumiSection)
private

Definition at line 312 of file FedRawDataInputSource.cc.

References createBoLSFile(), currentLumiSection_, daqDirector_, newFWLiteAna::found, evf::EvFDaqDirector::getEoLSFilePathOnFU(), edm::Timestamp::invalidTimestamp(), evf::EvFDaqDirector::lockFULocal2(), edm::InputSource::luminosityBlock(), edm::InputSource::luminosityBlockAuxiliary(), processHistoryID_, edm::InputSource::resetLuminosityBlockAuxiliary(), edm::InputSource::run(), edm::InputSource::runAuxiliary(), edm::InputSource::setLuminosityBlockAuxiliary(), AlCaHLTBitMon_QueryRunRegistry::string, and evf::EvFDaqDirector::unlockFULocal2().

Referenced by checkNextEvent(), and getNextEvent().

313 {
315  || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
316 
317  if ( currentLumiSection_ > 0) {
318  const std::string fuEoLS =
320  struct stat buf;
321  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
322  if ( !found ) {
324  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
325  close(eol_fd);
326  createBoLSFile(lumiSection,false);
328  }
329  }
330  else createBoLSFile(lumiSection,true);//needed for initial lumisection
331 
332  currentLumiSection_ = lumiSection;
333 
335 
336  timeval tv;
337  gettimeofday(&tv, 0);
338  const edm::Timestamp lsopentime( (unsigned long long) tv.tv_sec * 1000000 + (unsigned long long) tv.tv_usec );
339 
340  edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary =
342  runAuxiliary()->run(),
343  lumiSection, lsopentime,
345 
346  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
347  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
348 
349  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: "<< lumiSection;
350  }
351 }
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists)
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:590
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:357
edm::ProcessHistoryID processHistoryID_
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:596
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:262
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:365
evf::EvFDaqDirector * daqDirector_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:265
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::nextEvent ( )
inlineprivate

Definition at line 353 of file FedRawDataInputSource.cc.

References getNextEvent(), load(), evf::EvFDaqDirector::noFile, edm::shutdown_flag, and ntuplemaker::status.

Referenced by checkNextEvent().

354 {
356  while ((status = getNextEvent())==evf::EvFDaqDirector::noFile)
357  {
358  if (edm::shutdown_flag.load(std::memory_order_relaxed)) break;
359  }
360  return status;
361 }
volatile std::atomic< bool > shutdown_flag
void load(int perCUT=90)
Definition: getMaxPt.h:59
evf::EvFDaqDirector::FileStatus getNextEvent()
tuple status
Definition: ntuplemaker.py:245
void FedRawDataInputSource::postForkReacquireResources ( std::shared_ptr< edm::multicore::MessageReceiverForSource )
overrideprivatevirtual

Definition at line 883 of file FedRawDataInputSource.cc.

References edm::Timestamp::beginOfTime(), edm::Timestamp::invalidTimestamp(), runNumber_, and edm::InputSource::setRunAuxiliary().

884 {
885  InputSource::rewind();
889 }
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
static Timestamp beginOfTime()
Definition: Timestamp.h:103
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:353
void FedRawDataInputSource::preForkReleaseResources ( )
overrideprivatevirtual

Reimplemented from edm::RawInputSource.

Definition at line 880 of file FedRawDataInputSource.cc.

881 {}
void FedRawDataInputSource::read ( edm::EventPrincipal eventPrincipal)
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 657 of file FedRawDataInputSource.cc.

References assert(), printConversionInfo::aux, edm::DaqProvenanceHelper::branchDescription(), checkEvery_, chunkIsFree_, InputFile::chunks_, InputFile::currentChunk_, currentFile_, currentLumiSection_, daqProvenanceHelper_, deleteFile(), edm::DaqProvenanceHelper::dummyProvenance(), eventID_, eventRunNumber_, eventsThisLumi_, fileDeleteLock_, filesToDelete_, fillFEDRawDataCollection(), freeChunks_, GTPEventID_, i, L1EventID_, edm::RawInputSource::makeEvent(), evf::evtn::makeEventAuxiliary(), eostools::move(), InputFile::nProcessed_, nStreams_, edm::EventAuxiliary::PhysicsTrigger, edm::InputSource::processGUID(), processHistoryID_, edm::EventPrincipal::put(), record, edm::EventAuxiliary::setProcessHistoryID(), streamFileTrackerPtr_, tcds_pointer_, and useL1EventID_.

Referenced by edmIntegrityCheck.PublishToFileSystem::get(), Vispa.Plugins.EdmBrowser.EdmDataAccessor.EdmDataAccessor::goto(), readNextChunkIntoBuffer(), readWorker(), and Vispa.Plugins.EdmBrowser.EdmDataAccessor.EdmDataAccessor::setFilterBranches().

658 {
659  std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
660  edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData);
661 
662  if (useL1EventID_){
664  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
666  aux.setProcessHistoryID(processHistoryID_);
667  makeEvent(eventPrincipal, aux);
668  }
669  else if(tcds_pointer_==0){
672  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
674  aux.setProcessHistoryID(processHistoryID_);
675  makeEvent(eventPrincipal, aux);
676  }
677  else{
678  evf::evtn::TCDSRecord record((unsigned char *)(tcds_pointer_));
681  processGUID());
683  makeEvent(eventPrincipal, aux);
684  }
685 
686 
687 
688  std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
689 
690  //FWCore/Sources DaqProvenanceHelper before 7_1_0_pre3
691  //eventPrincipal.put(daqProvenanceHelper_.constBranchDescription_, edp,
692  // daqProvenanceHelper_.dummyProvenance_);
693 
694  eventPrincipal.put(daqProvenanceHelper_.branchDescription(), std::move(edp),
696 
697  eventsThisLumi_++;
698 
699  //this old file check runs no more often than every 10 events
700  if (!((currentFile_->nProcessed_-1)%(checkEvery_))) {
701  //delete files that are not in processing
702  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
703  auto it = filesToDelete_.begin();
704  while (it!=filesToDelete_.end()) {
705  bool fileIsBeingProcessed = false;
706  for (unsigned int i=0;i<nStreams_;i++) {
707  if (it->first == streamFileTrackerPtr_->at(i)) {
708  fileIsBeingProcessed = true;
709  break;
710  }
711  }
712  if (!fileIsBeingProcessed) {
713  deleteFile(it->second->fileName_);
714  delete it->second;
715  it = filesToDelete_.erase(it);
716  }
717  else it++;
718  }
719 
720  }
722  chunkIsFree_=false;
723  return;
724 }
int i
Definition: DBlmapReader.cc:9
JetCorrectorParameters::Record record
Definition: classes.h:7
assert(m_qm.get())
std::vector< int > * streamFileTrackerPtr_
ProductProvenance const & dummyProvenance() const
edm::EventAuxiliary makeEventAuxiliary(TCDSRecord *record, unsigned int runNumber, unsigned int lumiSection, std::string const &processGUID)
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:217
def move
Definition: eostools.py:510
BranchDescription const & branchDescription() const
edm::ProcessHistoryID processHistoryID_
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance) const
const edm::DaqProvenanceHelper daqProvenanceHelper_
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
unsigned int currentChunk_
void setProcessHistoryID(ProcessHistoryID const &phid)
tbb::concurrent_vector< InputChunk * > chunks_
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
void deleteFile(std::string const &)
tbb::concurrent_queue< InputChunk * > freeChunks_
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection &)
void FedRawDataInputSource::readNextChunkIntoBuffer ( InputFile file)
private

Definition at line 1260 of file FedRawDataInputSource.cc.

References bufferInputRead_, InputFile::chunkPosition_, InputFile::chunks_, eventChunkBlock_, eventChunkSize_, Exception, fileDescriptor_, InputFile::fileName_, InputFile::fileSize_, i, plotBeamSpotDB::last, LogDebug, read(), and readBlocks_.

Referenced by getNextEvent().

1261 {
1262 
1263  if (fileDescriptor_<0) {
1264  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1265  bufferInputRead_ = 0;
1266  //off_t pos = lseek(fileDescriptor,0,SEEK_SET);
1267  if (fileDescriptor_>=0)
1268  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1269  else
1270  {
1271  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer") << "failed to open file " << std::endl
1272  << file->fileName_ << " fd:" << fileDescriptor_;
1273  }
1274  }
1275 
1276  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1277  uint32_t existingSize = 0;
1278  for (unsigned int i=0;i<readBlocks_;i++)
1279  {
1280  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1281  bufferInputRead_+=last;
1282  existingSize+=last;
1283  }
1284  }
1285  else {
1286  const uint32_t chunksize = file->chunkPosition_;
1287  const uint32_t blockcount=chunksize/eventChunkBlock_;
1288  const uint32_t leftsize = chunksize%eventChunkBlock_;
1289  uint32_t existingSize = eventChunkSize_ - file->chunkPosition_;
1290  memmove((void*) file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSize);
1291 
1292  for (uint32_t i=0;i<blockcount;i++) {
1293  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1294  bufferInputRead_+=last;
1295  existingSize+=last;
1296  }
1297  if (leftsize) {
1298  const ssize_t last = ::read(fileDescriptor_,( void*)( file->chunks_[0]->buf_ + existingSize ), leftsize);
1299  bufferInputRead_+=last;
1300  }
1301  file->chunkPosition_=0;//data was moved to beginning of the chunk
1302  }
1303  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1304  if (fileDescriptor_!=-1)
1305  {
1306  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1307  close(fileDescriptor_);
1308  fileDescriptor_=-1;
1309  }
1310  }
1311 }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
uint32_t chunkPosition_
virtual void read(edm::EventPrincipal &eventPrincipal) override
std::string fileName_
tbb::concurrent_vector< InputChunk * > chunks_
void FedRawDataInputSource::readSupervisor ( )
private

Definition at line 895 of file FedRawDataInputSource.cc.

References assert(), InputFile::chunks_, counter, cvReader_, cvWakeup_, daqDirector_, eventChunkSize_, fileListMode_, fileQueue_, fms_, freeChunks_, getFile(), getLSFromFilename_, grabNextJsonFile(), i, InputFile, LogDebug, eostools::ls(), maxBufferedFiles_, mReader_, mWakeup_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, fed_dqm_sourceclient-live_cfg::path, quit_threads_, InputChunk::readComplete_, readingFilesCount_, evf::FastMonitoringService::reportLockWait(), InputChunk::reset(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, setExceptionState_, edm::shutdown_flag, singleBufferMode_, evf::FastMonitoringService::startedLookingForFile(), startupCv_, startupLock_, ntuplemaker::status, evf::FastMonitoringService::stoppedLookingForFile(), AlCaHLTBitMon_QueryRunRegistry::string, thread_quit_signal, evf::EvFDaqDirector::updateFuLock(), workerJob_, workerPool_, and workerThreads_.

Referenced by checkNextEvent().

896 {
897  bool stop=false;
898  unsigned int currentLumiSection = 0;
899  //threadInit_.exchange(true,std::memory_order_acquire);
900 
901  {
902  std::unique_lock<std::mutex> lk(startupLock_);
903  startupCv_.notify_one();
904  }
905 
906  uint32_t ls=0;
907  uint32_t monLS=1;
908  uint32_t lockCount=0;
909  uint64_t sumLockWaitTimeUs=0.;
910 
911  while (!stop) {
912 
913  //wait for at least one free thread and chunk
914  int counter=0;
916  {
917  std::unique_lock<std::mutex> lkw(mWakeup_);
918  //sleep until woken up by condition or a timeout
919  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
920  counter++;
921  //if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads...";
922  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
923  }
924  else {
925  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
926  }
927  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {stop=true;break;}
928  }
929 
930  if (stop) break;
931 
932  //look for a new file
933  std::string nextFile;
934  uint32_t fileSize;
935 
937 
939 
940  while (status == evf::EvFDaqDirector::noFile) {
941  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
942  stop=true;
943  break;
944  }
945 
946  uint64_t thisLockWaitTimeUs=0.;
947  if (fileListMode_) {
948  //return LS if LS not set, otherwise return file
949  status = getFile(ls,nextFile,fileSize,thisLockWaitTimeUs);
950  }
951  else
952  status = daqDirector_->updateFuLock(ls,nextFile,fileSize,thisLockWaitTimeUs);
953 
954  if (currentLumiSection!=ls && status==evf::EvFDaqDirector::runEnded) status=evf::EvFDaqDirector::noFile;
955 
956  //monitoring of lock wait time
957  if (thisLockWaitTimeUs>0.)
958  sumLockWaitTimeUs+=thisLockWaitTimeUs;
959  lockCount++;
960  if (ls>monLS) {
961  monLS=ls;
962  if (lockCount)
963  if (fms_) fms_->reportLockWait(monLS,sumLockWaitTimeUs,lockCount);
964  lockCount=0;
965  sumLockWaitTimeUs=0;
966  }
967 
968  //check again for any remaining index/EoLS files after EoR file is seen
969  if ( status == evf::EvFDaqDirector::runEnded && !fileListMode_) {
970  usleep(100000);
971  //now all files should have appeared in ramdisk, check again if any raw files were left behind
972  status = daqDirector_->updateFuLock(ls,nextFile,fileSize,thisLockWaitTimeUs);
973  if (currentLumiSection!=ls && status==evf::EvFDaqDirector::runEnded) status=evf::EvFDaqDirector::noFile;
974  }
975 
976  if ( status == evf::EvFDaqDirector::runEnded) {
978  stop=true;
979  break;
980  }
981 
982  //queue new lumisection
983  if( getLSFromFilename_ && ls > currentLumiSection) {
984  currentLumiSection = ls;
985  fileQueue_.push(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
986  }
987 
988  if( getLSFromFilename_ && currentLumiSection>0 && ls < currentLumiSection) {
989  edm::LogError("FedRawDataInputSource") << "Got old LS ("<<ls<<") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<". Aborting execution."<<std::endl;
991  stop=true;
992  break;
993  }
994 
995  int dbgcount=0;
996  if (status == evf::EvFDaqDirector::noFile) {
997  dbgcount++;
998  if (!(dbgcount%20)) LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
999  usleep(100000);
1000  }
1001  }
1002  if ( status == evf::EvFDaqDirector::newFile ) {
1003  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
1004 
1005 
1006  boost::filesystem::path rawFilePath(nextFile);
1007  std::string rawFile = rawFilePath.replace_extension(".raw").string();
1008 
1009  struct stat st;
1010  int stat_res = stat(rawFile.c_str(),&st);
1011  if (stat_res==-1) {
1012  edm::LogError("FedRawDataInputSource") << "Can not stat file (" << errno << "):-"<< rawFile << std::endl;
1013  setExceptionState_=true;
1014  stop=true;
1015  break;
1016  }
1017  fileSize=st.st_size;
1018 
1019  if (fms_) fms_->stoppedLookingForFile(ls);
1020  int eventsInNewFile;
1021  if (fileListMode_) {
1022  if (fileSize==0) eventsInNewFile=0;
1023  else eventsInNewFile=-1;
1024  }
1025  else {
1026  eventsInNewFile = grabNextJsonFile(nextFile);
1027  assert( eventsInNewFile>=0 );
1028  assert((eventsInNewFile>0) == (fileSize>0));//file without events must be empty
1029  }
1030  //int eventsInNewFile = fileListMode_ ? -1 : grabNextJsonFile(nextFile);
1031  //if (fileListMode_ && fileSize==0) {eventsInNewFile=0;}
1032  //if (!fileListMode_) {
1033  // assert( eventsInNewFile>=0 );
1034  // assert((eventsInNewFile>0) == (fileSize>0));//file without events must be empty
1035  //}
1036 
1037  if (!singleBufferMode_) {
1038  //calculate number of needed chunks
1039  unsigned int neededChunks = fileSize/eventChunkSize_;
1040  if (fileSize%eventChunkSize_) neededChunks++;
1041 
1042  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,this);
1044  fileQueue_.push(newInputFile);
1045 
1046  for (unsigned int i=0;i<neededChunks;i++) {
1047 
1048  //get thread
1049  unsigned int newTid = 0xffffffff;
1050  while (!workerPool_.try_pop(newTid)) {
1051  usleep(100000);
1052  }
1053 
1054  InputChunk * newChunk = nullptr;
1055  while (!freeChunks_.try_pop(newChunk)) {
1056  usleep(100000);
1057  if (quit_threads_.load(std::memory_order_relaxed)) break;
1058  }
1059 
1060  if (newChunk == nullptr) {
1061  //return unused tid if we received shutdown (nullptr chunk)
1062  if (newTid!=0xffffffff) workerPool_.push(newTid);
1063  stop = true;
1064  break;
1065  }
1066 
1067  std::unique_lock<std::mutex> lk(mReader_);
1068 
1069  unsigned int toRead = eventChunkSize_;
1070  if (i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
1071  newChunk->reset(i*eventChunkSize_,toRead,i);
1072 
1073  workerJob_[newTid].first=newInputFile;
1074  workerJob_[newTid].second=newChunk;
1075 
1076  //wake up the worker thread
1077  cvReader_[newTid]->notify_one();
1078  }
1079  }
1080  else {
1081  if (!eventsInNewFile) {
1082  //still queue file for lumi update
1083  std::unique_lock<std::mutex> lkw(mWakeup_);
1084  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,this);
1086  fileQueue_.push(newInputFile);
1087  cvWakeup_.notify_one();
1088  return;
1089  }
1090  //in single-buffer mode put single chunk in the file and let the main thread read the file
1091  InputChunk * newChunk;
1092  //should be available immediately
1093  while(!freeChunks_.try_pop(newChunk)) usleep(100000);
1094 
1095  std::unique_lock<std::mutex> lkw(mWakeup_);
1096 
1097  unsigned int toRead = eventChunkSize_;
1098  if (fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
1099  newChunk->reset(0,toRead,0);
1100  newChunk->readComplete_=true;
1101 
1102  //push file and wakeup main thread
1103  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,this);
1104  newInputFile->chunks_[0]=newChunk;
1106  fileQueue_.push(newInputFile);
1107  cvWakeup_.notify_one();
1108  }
1109  }
1110  }
1111  //make sure threads finish reading
1112  unsigned numFinishedThreads = 0;
1113  while (numFinishedThreads < workerThreads_.size()) {
1114  unsigned int tid;
1115  while (!workerPool_.try_pop(tid)) {usleep(10000);}
1116  std::unique_lock<std::mutex> lk(mReader_);
1117  thread_quit_signal[tid]=true;
1118  cvReader_[tid]->notify_one();
1119  numFinishedThreads++;
1120  }
1121  for (unsigned int i=0;i<workerThreads_.size();i++) {
1122  workerThreads_[i]->join();
1123  delete workerThreads_[i];
1124  }
1125 }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
std::condition_variable cvWakeup_
tbb::concurrent_queue< unsigned int > workerPool_
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
assert(m_qm.get())
def ls
Definition: eostools.py:348
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
volatile std::atomic< bool > shutdown_flag
std::vector< std::condition_variable * > cvReader_
void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex)
std::vector< bool > thread_quit_signal
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
int grabNextJsonFile(boost::filesystem::path const &)
std::vector< std::thread * > workerThreads_
std::atomic< bool > readComplete_
unsigned long long uint64_t
Definition: Time.h:15
tbb::concurrent_queue< InputFile * > fileQueue_
std::condition_variable startupCv_
void stoppedLookingForFile(unsigned int lumi)
tbb::concurrent_vector< InputChunk * > chunks_
evf::EvFDaqDirector * daqDirector_
static std::atomic< unsigned int > counter
std::atomic< unsigned int > readingFilesCount_
evf::FastMonitoringService * fms_
friend class InputFile
Open Root file and provide MEs ############.
tbb::concurrent_queue< InputChunk * > freeChunks_
tuple status
Definition: ntuplemaker.py:245
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
void FedRawDataInputSource::readWorker ( unsigned int  tid)
private

Definition at line 1127 of file FedRawDataInputSource.cc.

References assert(), InputChunk::buf_, InputFile::chunks_, cvReader_, detectedFRDversion_, diffTreeTool::diff, end, eventChunkBlock_, mergeVDriftHistosByStation::file, InputChunk::fileIndex_, InputFile::fileName_, plotBeamSpotDB::first, i, init, plotBeamSpotDB::last, LogDebug, mReader_, fileCollector::now, InputChunk::offset_, read(), readBlocks_, InputChunk::readComplete_, edm::second(), setExceptionState_, dqm_diff::start, startupCv_, startupLock_, thread_quit_signal, threadInit_, InputChunk::usedSize_, workerJob_, and workerPool_.

Referenced by FedRawDataInputSource().

1128 {
1129  bool init = true;
1130  threadInit_.exchange(true,std::memory_order_acquire);
1131 
1132  while (1) {
1133 
1134  std::unique_lock<std::mutex> lk(mReader_);
1135  workerJob_[tid].first=nullptr;
1136  workerJob_[tid].first=nullptr;
1137 
1138  assert(!thread_quit_signal[tid]);//should never get it here
1139  workerPool_.push(tid);
1140 
1141  if (init) {
1142  std::unique_lock<std::mutex> lk(startupLock_);
1143  init = false;
1144  startupCv_.notify_one();
1145  }
1146  cvReader_[tid]->wait(lk);
1147 
1148  if (thread_quit_signal[tid]) return;
1149 
1150  InputFile * file;
1151  InputChunk * chunk;
1152 
1153  assert(workerJob_[tid].first!=nullptr && workerJob_[tid].second!=nullptr);
1154 
1155  file = workerJob_[tid].first;
1156  chunk = workerJob_[tid].second;
1157 
1158  int fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1159  off_t pos = lseek(fileDescriptor,chunk->offset_,SEEK_SET);
1160 
1161 
1162  if (fileDescriptor>=0)
1163  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_ << " at offset " << pos;
1164  else
1165  {
1166  edm::LogError("FedRawDataInputSource") <<
1167  "readWorker failed to open file -: " << file->fileName_ << " fd:" << fileDescriptor <<
1168  " or seek to offset " << chunk->offset_ << ", lseek returned:" << pos;
1169  setExceptionState_=true;
1170  return;
1171 
1172  }
1173 
1174  unsigned int bufferLeft = 0;
1176  for (unsigned int i=0;i<readBlocks_;i++)
1177  {
1178  const ssize_t last = ::read(fileDescriptor,( void*) (chunk->buf_+bufferLeft), eventChunkBlock_);
1179  if ( last > 0 )
1180  bufferLeft+=last;
1181  if (last < eventChunkBlock_) {
1182  assert(chunk->usedSize_==i*eventChunkBlock_+last);
1183  break;
1184  }
1185  }
1187  auto diff = end-start;
1188  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1189  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB" << " in " << msec.count() << " ms ("<< (bufferLeft >> 20)/double(msec.count())<<" GB/s)";
1190  close(fileDescriptor);
1191 
1192  if (detectedFRDversion_==0 && chunk->offset_==0) detectedFRDversion_=*((uint32*)chunk->buf_);
1194  chunk->readComplete_=true;//this is atomic to secure the sequential buffer fill before becoming available for processing)
1195  file->chunks_[chunk->fileIndex_]=chunk;//put the completed chunk in the file chunk vector at predetermined index
1196 
1197  }
1198 }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
tuple start
Check for commandline option errors.
Definition: dqm_diff.py:58
virtual void read(edm::EventPrincipal &eventPrincipal) override
unsigned int offset_
tbb::concurrent_queue< unsigned int > workerPool_
assert(m_qm.get())
int init
Definition: HydjetWrapper.h:67
std::vector< ReaderInfo > workerJob_
std::vector< std::condition_variable * > cvReader_
U second(std::pair< T, U > const &p)
std::vector< bool > thread_quit_signal
unsigned char * buf_
#define end
Definition: vmac.h:37
unsigned int fileIndex_
unsigned int uint32
Definition: MsgTools.h:13
std::atomic< bool > readComplete_
std::string fileName_
std::condition_variable startupCv_
tbb::concurrent_vector< InputChunk * > chunks_
std::atomic< bool > threadInit_
void FedRawDataInputSource::reportEventsThisLumiInSource ( unsigned int  lumi,
unsigned int  events 
)
private

Definition at line 1314 of file FedRawDataInputSource.cc.

References CommonMethods::lock(), monlock_, and sourceEventsReport_.

Referenced by checkNextEvent(), and getNextEvent().

1315 {
1316 
1317  std::lock_guard<std::mutex> lock(monlock_);
1318  auto itr = sourceEventsReport_.find(lumi);
1319  if (itr!=sourceEventsReport_.end())
1320  itr->second+=events;
1321  else
1323 }
tuple lumi
Definition: fjr2json.py:35
std::map< unsigned int, unsigned int > sourceEventsReport_
tuple events
Definition: patZpeak.py:19
void FedRawDataInputSource::rewind_ ( )
overrideprivatevirtual

Reimplemented from edm::RawInputSource.

Definition at line 891 of file FedRawDataInputSource.cc.

892 {}
void FedRawDataInputSource::threadError ( )
private

Definition at line 1200 of file FedRawDataInputSource.cc.

References Exception, and quit_threads_.

Referenced by InputFile::advance(), and getNextEvent().

1201 {
1202  quit_threads_=true;
1203  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1204 
1205 }
std::atomic< bool > quit_threads_

Friends And Related Function Documentation

friend class InputChunk
friend

Definition at line 46 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

friend class InputFile
friend

Open Root file and provide MEs ############.

Definition at line 45 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

Member Data Documentation

uint32_t FedRawDataInputSource::bufferInputRead_ = 0
private

Definition at line 176 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readNextChunkIntoBuffer().

unsigned int FedRawDataInputSource::checkEvery_ = 10
private

Definition at line 167 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

bool FedRawDataInputSource::chunkIsFree_ =false
private

Definition at line 140 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

InputFile* FedRawDataInputSource::currentFile_ = 0
private

Definition at line 139 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

int FedRawDataInputSource::currentFileIndex_ = -1
private

Definition at line 161 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

unsigned int FedRawDataInputSource::currentLumiSection_
private
std::vector<std::condition_variable*> FedRawDataInputSource::cvReader_
private
std::condition_variable FedRawDataInputSource::cvWakeup_
private

Definition at line 171 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

evf::EvFDaqDirector* FedRawDataInputSource::daqDirector_ = 0
private
const edm::DaqProvenanceHelper FedRawDataInputSource::daqProvenanceHelper_
private

Definition at line 113 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and read().

std::string FedRawDataInputSource::defPath_
private

Definition at line 89 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

uint32 FedRawDataInputSource::detectedFRDversion_ =0
private

Definition at line 138 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readWorker().

jsoncollector::DataPointDefinition* FedRawDataInputSource::dpd_
private

Definition at line 128 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and grabNextJsonFile().

std::unique_ptr<FRDEventMsgView> FedRawDataInputSource::event_
private

Definition at line 115 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), fillFEDRawDataCollection(), and getNextEvent().

unsigned int FedRawDataInputSource::eventChunkBlock_
private
unsigned int FedRawDataInputSource::eventChunkSize_
private
edm::EventID FedRawDataInputSource::eventID_
private

Definition at line 117 of file FedRawDataInputSource.h.

Referenced by read().

uint32_t FedRawDataInputSource::eventRunNumber_ =0
private

Definition at line 121 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and read().

unsigned int FedRawDataInputSource::eventsThisLumi_
private

Definition at line 125 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), getNextEvent(), and read().

unsigned long FedRawDataInputSource::eventsThisRun_ = 0
private

Definition at line 126 of file FedRawDataInputSource.h.

std::mutex FedRawDataInputSource::fileDeleteLock_
private

Definition at line 164 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and read().

int FedRawDataInputSource::fileDescriptor_ = -1
private

Definition at line 175 of file FedRawDataInputSource.h.

Referenced by readNextChunkIntoBuffer().

unsigned int FedRawDataInputSource::fileListIndex_ = 0
private

Definition at line 108 of file FedRawDataInputSource.h.

Referenced by getFile().

const bool FedRawDataInputSource::fileListMode_
private

Definition at line 107 of file FedRawDataInputSource.h.

Referenced by deleteFile(), FedRawDataInputSource(), and readSupervisor().

std::vector<std::string> FedRawDataInputSource::fileNames_
private

Definition at line 104 of file FedRawDataInputSource.h.

Referenced by getFile(), and initFileList().

std::list<std::pair<int,std::string> > FedRawDataInputSource::fileNamesToDelete_
private

Definition at line 163 of file FedRawDataInputSource.h.

tbb::concurrent_queue<InputFile*> FedRawDataInputSource::fileQueue_
private

Definition at line 150 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

std::list<std::pair<int,InputFile*> > FedRawDataInputSource::filesToDelete_
private
evf::FastMonitoringService* FedRawDataInputSource::fms_ = 0
private

Definition at line 86 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

tbb::concurrent_queue<InputChunk*> FedRawDataInputSource::freeChunks_
private

Definition at line 149 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), read(), and readSupervisor().

std::string FedRawDataInputSource::fuOutputDir_
private

Definition at line 111 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and grabNextJsonFile().

const bool FedRawDataInputSource::getLSFromFilename_
private

Definition at line 100 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), getNextEvent(), and readSupervisor().

uint32_t FedRawDataInputSource::GTPEventID_ = 0
private

Definition at line 122 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

uint32_t FedRawDataInputSource::L1EventID_ = 0
private

Definition at line 123 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and read().

unsigned int FedRawDataInputSource::maxBufferedFiles_
private

Definition at line 95 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

std::mutex FedRawDataInputSource::monlock_
private

Definition at line 181 of file FedRawDataInputSource.h.

Referenced by getEventReport(), and reportEventsThisLumiInSource().

std::mutex FedRawDataInputSource::mReader_
private

Definition at line 152 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), readWorker(), and ~FedRawDataInputSource().

std::mutex FedRawDataInputSource::mWakeup_
private

Definition at line 170 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

unsigned int FedRawDataInputSource::nStreams_ = 0
private

Definition at line 166 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

unsigned int FedRawDataInputSource::numBuffers_
private

Definition at line 94 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

unsigned int FedRawDataInputSource::numConcurrentReads_
private

Definition at line 96 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and ~FedRawDataInputSource().

edm::ProcessHistoryID FedRawDataInputSource::processHistoryID_
private

Definition at line 118 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), maybeOpenNewLumiSection(), and read().

std::atomic<bool> FedRawDataInputSource::quit_threads_
private
unsigned int FedRawDataInputSource::readBlocks_
private
std::atomic<unsigned int> FedRawDataInputSource::readingFilesCount_
private

Definition at line 97 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

std::unique_ptr<std::thread> FedRawDataInputSource::readSupervisorThread_
private

Definition at line 143 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and ~FedRawDataInputSource().

edm::RunNumber_t FedRawDataInputSource::runNumber_
private

Definition at line 110 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and postForkReacquireResources().

bool FedRawDataInputSource::setExceptionState_ = false
private

Definition at line 157 of file FedRawDataInputSource.h.

Referenced by exceptionState(), getNextEvent(), readSupervisor(), and readWorker().

bool FedRawDataInputSource::singleBufferMode_
private

Definition at line 174 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

std::map<unsigned int,unsigned int> FedRawDataInputSource::sourceEventsReport_
private

Definition at line 180 of file FedRawDataInputSource.h.

Referenced by getEventReport(), and reportEventsThisLumiInSource().

bool FedRawDataInputSource::startedSupervisorThread_ = false
private

Definition at line 142 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and ~FedRawDataInputSource().

std::condition_variable FedRawDataInputSource::startupCv_
private
std::mutex FedRawDataInputSource::startupLock_
private
std::vector<int>* FedRawDataInputSource::streamFileTrackerPtr_ = 0
private

Definition at line 165 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

unsigned char* FedRawDataInputSource::tcds_pointer_
private

Definition at line 124 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

std::vector<bool> FedRawDataInputSource::thread_quit_signal
private
std::atomic<bool> FedRawDataInputSource::threadInit_
private

Definition at line 178 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and readWorker().

const bool FedRawDataInputSource::useL1EventID_
private

Definition at line 103 of file FedRawDataInputSource.h.

Referenced by read().

const bool FedRawDataInputSource::verifyAdler32_
private

Definition at line 101 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

const bool FedRawDataInputSource::verifyChecksum_
private

Definition at line 102 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

std::vector<ReaderInfo> FedRawDataInputSource::workerJob_
private

Definition at line 147 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), readSupervisor(), and readWorker().

tbb::concurrent_queue<unsigned int> FedRawDataInputSource::workerPool_
private

Definition at line 146 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), and readWorker().

std::vector<std::thread*> FedRawDataInputSource::workerThreads_
private