CMS 3D CMS Logo

List of all members | Public Member Functions | Static Public Member Functions | Protected Member Functions | Private Types | Private Member Functions | Private Attributes | Friends
FedRawDataInputSource Class Reference

#include <FedRawDataInputSource.h>

Inheritance diagram for FedRawDataInputSource:
edm::RawInputSource edm::InputSource edm::ProductRegistryHelper

Public Member Functions

 FedRawDataInputSource (edm::ParameterSet const &, edm::InputSourceDescription const &)
 
std::pair< bool, unsigned int > getEventReport (unsigned int lumi, bool erase)
 
virtual ~FedRawDataInputSource ()
 
- Public Member Functions inherited from edm::RawInputSource
 RawInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
 
virtual ~RawInputSource ()
 
- Public Member Functions inherited from edm::InputSource
std::shared_ptr< ActivityRegistryactReg () const
 Accessor for Activity Registry. More...
 
std::shared_ptr< BranchIDListHelper const > branchIDListHelper () const
 Accessors for branchIDListHelper. More...
 
std::shared_ptr< BranchIDListHelper > & branchIDListHelper ()
 
void closeFile (FileBlock *, bool cleaningUpAfterException)
 close current file More...
 
void doBeginJob ()
 Called by framework at beginning of job. More...
 
void doBeginLumi (LuminosityBlockPrincipal &lbp, ProcessContext const *)
 Called by framework at beginning of lumi block. More...
 
void doBeginRun (RunPrincipal &rp, ProcessContext const *)
 Called by framework at beginning of run. More...
 
void doEndJob ()
 Called by framework at end of job. More...
 
void doEndLumi (LuminosityBlockPrincipal &lbp, bool cleaningUpAfterException, ProcessContext const *)
 Called by framework at end of lumi block. More...
 
void doEndRun (RunPrincipal &rp, bool cleaningUpAfterException, ProcessContext const *)
 Called by framework at end of run. More...
 
ProcessingController::ForwardState forwardState () const
 
bool goToEvent (EventID const &eventID)
 
 InputSource (ParameterSet const &, InputSourceDescription const &)
 Constructor. More...
 
 InputSource (InputSource const &)=delete
 
void issueReports (EventID const &eventID)
 issue an event report More...
 
LuminosityBlockNumber_t luminosityBlock () const
 Accessor for current luminosity block number. More...
 
std::shared_ptr< LuminosityBlockAuxiliaryluminosityBlockAuxiliary () const
 Called by the framework to merge or insert lumi in principal cache. More...
 
int maxEvents () const
 
int maxLuminosityBlocks () const
 
ModuleDescription const & moduleDescription () const
 Accessor for 'module' description. More...
 
ItemType nextItemType ()
 Advances the source to the next item. More...
 
InputSourceoperator= (InputSource const &)=delete
 
ProcessConfiguration const & processConfiguration () const
 Accessor for Process Configuration. More...
 
std::string const & processGUID () const
 Accessor for global process identifier. More...
 
ProcessHistoryRegistry const & processHistoryRegistry () const
 Accessors for process history registry. More...
 
ProcessHistoryRegistryprocessHistoryRegistry ()
 
ProcessingMode processingMode () const
 RunsLumisAndEvents (default), RunsAndLumis, or Runs. More...
 
std::shared_ptr< ProductRegistry const > productRegistry () const
 Accessors for product registry. More...
 
std::shared_ptr< ProductRegistry > & productRegistry ()
 
bool randomAccess () const
 
void readAndMergeLumi (LuminosityBlockPrincipal &lbp)
 Read next luminosity block (same as a prior lumi) More...
 
void readAndMergeRun (RunPrincipal &rp)
 Read next run (same as a prior run) More...
 
void readEvent (EventPrincipal &ep, StreamContext &)
 Read next event. More...
 
bool readEvent (EventPrincipal &ep, EventID const &, StreamContext &)
 Read a specific event. More...
 
std::unique_ptr< FileBlockreadFile ()
 Read next file. More...
 
void readLuminosityBlock (LuminosityBlockPrincipal &lumiPrincipal, HistoryAppender &historyAppender)
 Read next luminosity block (new lumi) More...
 
std::shared_ptr< LuminosityBlockAuxiliaryreadLuminosityBlockAuxiliary ()
 Read next luminosity block Auxilary. More...
 
void readRun (RunPrincipal &runPrincipal, HistoryAppender &historyAppender)
 Read next run (new run) More...
 
std::shared_ptr< RunAuxiliaryreadRunAuxiliary ()
 Read next run Auxiliary. More...
 
ProcessHistoryID const & reducedProcessHistoryID () const
 
void registerProducts ()
 Register any produced products. More...
 
int remainingEvents () const
 
int remainingLuminosityBlocks () const
 
void repeat ()
 Reset the remaining number of events/lumis to the maximum number. More...
 
std::pair< SharedResourcesAcquirer *, std::recursive_mutex * > resourceSharedWithDelayedReader ()
 Returns nullptr if no resource shared between the Source and a DelayedReader. More...
 
ProcessingController::ReverseState reverseState () const
 
void rewind ()
 Begin again at the first event. More...
 
RunNumber_t run () const
 Accessor for current run number. More...
 
std::shared_ptr< RunAuxiliaryrunAuxiliary () const
 Called by the framework to merge or insert run in principal cache. More...
 
void setLuminosityBlockNumber_t (LuminosityBlockNumber_t lb)
 Set the luminosity block ID. More...
 
void setRunNumber (RunNumber_t r)
 Set the run number. More...
 
void skipEvents (int offset)
 
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper () const
 Accessors for thinnedAssociationsHelper. More...
 
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper ()
 
Timestamp const & timestamp () const
 Accessor for the current time, as seen by the input source. More...
 
virtual ~InputSource () noexcept(false)
 Destructor. More...
 

Static Public Member Functions

static void fillDescriptions (edm::ConfigurationDescriptions &descriptions)
 
- Static Public Member Functions inherited from edm::RawInputSource
static void fillDescription (ParameterSetDescription &description)
 
- Static Public Member Functions inherited from edm::InputSource
static const std::string & baseType ()
 
static void fillDescription (ParameterSetDescription &desc)
 
static void fillDescriptions (ConfigurationDescriptions &descriptions)
 
static void prevalidate (ConfigurationDescriptions &)
 

Protected Member Functions

virtual bool checkNextEvent () override
 
virtual void read (edm::EventPrincipal &eventPrincipal) override
 
- Protected Member Functions inherited from edm::RawInputSource
void makeEvent (EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
 
void setInputFileTransitionsEachEvent ()
 
- Protected Member Functions inherited from edm::InputSource
void decreaseRemainingEventsBy (int iSkipped)
 
bool eventCached () const
 
bool newLumi () const
 
bool newRun () const
 
ProcessHistoryRegistryprocessHistoryRegistryForUpdate ()
 
ProductRegistryproductRegistryUpdate ()
 
void reset () const
 
void resetEventCached ()
 
void resetLuminosityBlockAuxiliary (bool isNewLumi=true) const
 
void resetNewLumi ()
 
void resetNewRun ()
 
void resetRunAuxiliary (bool isNewRun=true) const
 
void setEventCached ()
 Called by the framework to merge or ached() const {return eventCached_;}. More...
 
void setLuminosityBlockAuxiliary (LuminosityBlockAuxiliary *lbp)
 
void setNewLumi ()
 
void setNewRun ()
 
void setRunAuxiliary (RunAuxiliary *rp)
 
void setTimestamp (Timestamp const &theTime)
 To set the current time, as seen by the input source. More...
 
virtual void skip (int offset)
 
ItemType state () const
 

Private Types

typedef std::pair< InputFile *, InputChunk * > ReaderInfo
 

Private Member Functions

void createBoLSFile (const uint32_t lumiSection, bool checkIfExists)
 
void deleteFile (std::string const &)
 
bool exceptionState ()
 
edm::Timestamp fillFEDRawDataCollection (FEDRawDataCollection &)
 
evf::EvFDaqDirector::FileStatus getFile (unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
 
evf::EvFDaqDirector::FileStatus getNextEvent ()
 
int grabNextJsonFile (boost::filesystem::path const &)
 
long initFileList ()
 
void maybeOpenNewLumiSection (const uint32_t lumiSection)
 
evf::EvFDaqDirector::FileStatus nextEvent ()
 
void readNextChunkIntoBuffer (InputFile *file)
 
void readSupervisor ()
 
void readWorker (unsigned int tid)
 
void reportEventsThisLumiInSource (unsigned int lumi, unsigned int events)
 
virtual void rewind_ () override
 
void threadError ()
 

Private Attributes

uint32_t bufferInputRead_ = 0
 
unsigned int checkEvery_ = 10
 
bool chunkIsFree_ =false
 
InputFilecurrentFile_ = 0
 
int currentFileIndex_ = -1
 
unsigned int currentLumiSection_
 
std::vector< std::condition_variable * > cvReader_
 
std::condition_variable cvWakeup_
 
evf::EvFDaqDirectordaqDirector_ = 0
 
const edm::DaqProvenanceHelper daqProvenanceHelper_
 
std::string defPath_
 
uint32 detectedFRDversion_ =0
 
jsoncollector::DataPointDefinitiondpd_
 
std::unique_ptr< FRDEventMsgViewevent_
 
unsigned int eventChunkBlock_
 
unsigned int eventChunkSize_
 
edm::EventID eventID_
 
uint32_t eventRunNumber_ =0
 
unsigned int eventsThisLumi_
 
unsigned long eventsThisRun_ = 0
 
std::mutex fileDeleteLock_
 
int fileDescriptor_ = -1
 
unsigned int fileListIndex_ = 0
 
const bool fileListLoopMode_
 
const bool fileListMode_
 
std::vector< std::string > fileNames_
 
std::list< std::pair< int, std::string > > fileNamesToDelete_
 
tbb::concurrent_queue< InputFile * > fileQueue_
 
std::list< std::pair< int, InputFile * > > filesToDelete_
 
evf::FastMonitoringServicefms_ = 0
 
tbb::concurrent_queue< InputChunk * > freeChunks_
 
std::string fuOutputDir_
 
const bool getLSFromFilename_
 
uint32_t GTPEventID_ = 0
 
uint32_t L1EventID_ = 0
 
unsigned int loopModeIterationInc_ = 0
 
unsigned int maxBufferedFiles_
 
std::mutex monlock_
 
std::mutex mReader_
 
std::mutex mWakeup_
 
unsigned int nStreams_ = 0
 
unsigned int numBuffers_
 
unsigned int numConcurrentReads_
 
edm::ProcessHistoryID processHistoryID_
 
std::atomic< bool > quit_threads_
 
unsigned int readBlocks_
 
std::atomic< unsigned int > readingFilesCount_
 
std::unique_ptr< std::thread > readSupervisorThread_
 
edm::RunNumber_t runNumber_
 
bool setExceptionState_ = false
 
bool singleBufferMode_
 
std::map< unsigned int, unsigned int > sourceEventsReport_
 
bool startedSupervisorThread_ = false
 
std::condition_variable startupCv_
 
std::mutex startupLock_
 
std::vector< int > streamFileTracker_
 
unsigned char * tcds_pointer_
 
std::vector< unsigned int > thread_quit_signal
 
std::atomic< bool > threadInit_
 
std::vector< unsigned int > tid_active_
 
const bool useL1EventID_
 
const bool verifyAdler32_
 
const bool verifyChecksum_
 
std::vector< ReaderInfoworkerJob_
 
tbb::concurrent_queue< unsigned int > workerPool_
 
std::vector< std::thread * > workerThreads_
 

Friends

struct InputChunk
 
struct InputFile
 

Additional Inherited Members

- Public Types inherited from edm::InputSource
enum  ItemType {
  IsInvalid, IsStop, IsFile, IsRun,
  IsLumi, IsEvent, IsRepeat, IsSynchronize
}
 
enum  ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents }
 
typedef ProductRegistryHelper::TypeLabelList TypeLabelList
 
- Public Attributes inherited from edm::InputSource
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> postEventReadFromSourceSignal_
 
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> preEventReadFromSourceSignal_
 

Detailed Description

Definition at line 43 of file FedRawDataInputSource.h.

Member Typedef Documentation

Definition at line 136 of file FedRawDataInputSource.h.

Constructor & Destructor Documentation

FedRawDataInputSource::FedRawDataInputSource ( edm::ParameterSet const &  pset,
edm::InputSourceDescription const &  desc 
)
explicit

Definition at line 54 of file FedRawDataInputSource.cc.

References edm::Timestamp::beginOfTime(), crc32c_hw_test(), cvReader_, daqDirector_, edm::DaqProvenanceHelper::daqInit(), daqProvenanceHelper_, defPath_, dpd_, MillePedeFileConverter_cfg::e, eventChunkBlock_, eventChunkSize_, Exception, fileDeleteLock_, fileListLoopMode_, fileListMode_, filesToDelete_, fms_, freeChunks_, mps_fire::i, evf::FastMonitoringThread::inInit, initFileList(), InputChunk, edm::Timestamp::invalidTimestamp(), numBuffers_, numConcurrentReads_, Utilities::operator, processHistoryID_, edm::InputSource::processHistoryRegistryForUpdate(), edm::InputSource::productRegistryUpdate(), quit_threads_, readBlocks_, readingFilesCount_, readWorker(), edm::InputSource::runAuxiliary(), runNumber_, evf::EvFDaqDirector::setDeleteTracking(), evf::EvFDaqDirector::setFMS(), evf::FastMonitoringService::setInputSource(), evf::FastMonitoringService::setInState(), evf::FastMonitoringService::setInStateSup(), edm::InputSource::setNewRun(), edm::InputSource::setRunAuxiliary(), singleBufferMode_, startupCv_, startupLock_, trackingPlots::stat, AlCaHLTBitMon_QueryRunRegistry::string, thread_quit_signal, threadInit_, tid_active_, workerJob_, and workerThreads_.

55  :
57  defPath_(pset.getUntrackedParameter<std::string> ("buDefPath", std::string(getenv("CMSSW_BASE"))+"/src/EventFilter/Utilities/plugins/budef.jsd")),
58  eventChunkSize_(pset.getUntrackedParameter<unsigned int> ("eventChunkSize",32)*1048576),
59  eventChunkBlock_(pset.getUntrackedParameter<unsigned int> ("eventChunkBlock",32)*1048576),
60  numBuffers_(pset.getUntrackedParameter<unsigned int> ("numBuffers",2)),
61  maxBufferedFiles_(pset.getUntrackedParameter<unsigned int> ("maxBufferedFiles",2)),
62  getLSFromFilename_(pset.getUntrackedParameter<bool> ("getLSFromFilename", true)),
63  verifyAdler32_(pset.getUntrackedParameter<bool> ("verifyAdler32", true)),
64  verifyChecksum_(pset.getUntrackedParameter<bool> ("verifyChecksum", true)),
65  useL1EventID_(pset.getUntrackedParameter<bool> ("useL1EventID", false)),
66  fileNames_(pset.getUntrackedParameter<std::vector<std::string>> ("fileNames",std::vector<std::string>())),
67  fileListMode_(pset.getUntrackedParameter<bool> ("fileListMode", false)),
68  fileListLoopMode_(pset.getUntrackedParameter<bool> ("fileListLoopMode", false)),
72  eventID_(),
75  tcds_pointer_(0),
76  eventsThisLumi_(0),
77  dpd_(nullptr)
78 {
79  char thishost[256];
80  gethostname(thishost, 255);
81  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: "
82  << std::endl << (eventChunkSize_/1048576)
83  << " MB on host " << thishost;
84 
85  long autoRunNumber = -1;
86  if (fileListMode_) {
87  autoRunNumber = initFileList();
88  if (!fileListLoopMode_) {
89  if (autoRunNumber<0)
90  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") << "Run number not found from filename";
91  //override run number
92  runNumber_ = (edm::RunNumber_t)autoRunNumber;
93  edm::Service<evf::EvFDaqDirector>()->overrideRunNumber((unsigned int)autoRunNumber);
94  }
95  }
96 
98  setNewRun();
99  //todo:autodetect from file name (assert if names differ)
102 
103  std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
104  defPath_ = std::string(getenv("CMSSW_BASE")) + "/" + defPathSuffix;
105  struct stat statbuf;
106  if (stat(defPath_.c_str(), &statbuf)) {
107  defPath_ = std::string(getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
108  if (stat(defPath_.c_str(), &statbuf)) {
109  defPath_ = defPathSuffix;
110  }
111  }
112 
113  dpd_ = new DataPointDefinition();
114  std::string defLabel = "data";
115  DataPointDefinition::getDataPointDefinitionFor(defPath_, dpd_,&defLabel);
116 
117  //make sure that chunk size is N * block size
122 
123  if (!numBuffers_)
124  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") <<
125  "no reading enabled with numBuffers parameter 0";
126 
130 
131  if (!crc32c_hw_test())
132  edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
133 
134  //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
135  if (fileListMode_) {
136  try {
138  } catch(cms::Exception e) {
139  edm::LogInfo("FedRawDataInputSource") << "No FastMonitoringService found in the configuration";
140  }
141  }
142  else {
144  if (!fms_) {
145  throw cms::Exception("FedRawDataInputSource") << "FastMonitoringService not found";
146  }
147  }
148 
150  if (!daqDirector_)
151  cms::Exception("FedRawDataInputSource") << "EvFDaqDirector not found";
152 
153  //set DaqDirector to delete files in preGlobalEndLumi callback
155  if (fms_) {
157  fms_->setInputSource(this);
160  }
161  //should delete chunks when run stops
162  for (unsigned int i=0;i<numBuffers_;i++) {
164  }
165 
166  quit_threads_ = false;
167 
168  for (unsigned int i=0;i<numConcurrentReads_;i++)
169  {
170  std::unique_lock<std::mutex> lk(startupLock_);
171  //issue a memory fence here and in threads (constructor was segfaulting without this)
172  thread_quit_signal.push_back(false);
173  workerJob_.push_back(ReaderInfo(nullptr,nullptr));
174  cvReader_.push_back(new std::condition_variable);
175  tid_active_.push_back(0);
176  threadInit_.store(false,std::memory_order_release);
177  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker,this,i));
178  startupCv_.wait(lk);
179  }
180 
181  runAuxiliary()->setProcessHistoryID(processHistoryID_);
182 }
std::vector< std::string > fileNames_
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:344
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
bool crc32c_hw_test()
Definition: crc32c.cc:354
jsoncollector::DataPointDefinition * dpd_
void setInState(FastMonitoringThread::InputState inputState)
void setInStateSup(FastMonitoringThread::InputState inputState)
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
std::vector< std::condition_variable * > cvReader_
static Timestamp beginOfTime()
Definition: Timestamp.h:103
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
edm::ProcessHistoryID processHistoryID_
const edm::DaqProvenanceHelper daqProvenanceHelper_
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
std::vector< std::thread * > workerThreads_
void setInputSource(FedRawDataInputSource *inputSource)
std::list< std::pair< int, InputFile * > > filesToDelete_
def getRunNumber(filename)
std::condition_variable startupCv_
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:252
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:345
std::atomic< bool > threadInit_
std::pair< InputFile *, InputChunk * > ReaderInfo
evf::EvFDaqDirector * daqDirector_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:347
void readWorker(unsigned int tid)
std::vector< unsigned int > thread_quit_signal
std::atomic< unsigned int > readingFilesCount_
evf::FastMonitoringService * fms_
std::vector< unsigned int > tid_active_
unsigned int RunNumber_t
tbb::concurrent_queue< InputChunk * > freeChunks_
void setFMS(evf::FastMonitoringService *fms)
FedRawDataInputSource::~FedRawDataInputSource ( )
virtual

Definition at line 184 of file FedRawDataInputSource.cc.

References cvReader_, deleteFile(), filesToDelete_, mps_fire::i, mReader_, numConcurrentReads_, quit_threads_, readSupervisorThread_, startedSupervisorThread_, thread_quit_signal, and workerThreads_.

185 {
186  quit_threads_=true;
187 
188  //delete any remaining open files
189  for (auto it = filesToDelete_.begin();it!=filesToDelete_.end();it++) {
190  deleteFile(it->second->fileName_);
191  delete it->second;
192  }
194  readSupervisorThread_->join();
195  }
196  else {
197  //join aux threads in case the supervisor thread was not started
198  for (unsigned int i=0;i<workerThreads_.size();i++) {
199  std::unique_lock<std::mutex> lk(mReader_);
200  thread_quit_signal[i]=true;
201  cvReader_[i]->notify_one();
202  lk.unlock();
203  workerThreads_[i]->join();
204  delete workerThreads_[i];
205  }
206  }
207  for (unsigned int i=0;i<numConcurrentReads_;i++) delete cvReader_[i];
208  /*
209  for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
210  InputChunk *ch;
211  while (!freeChunks_.try_pop(ch)) {}
212  delete ch;
213  }
214  */
215 }
std::atomic< bool > quit_threads_
std::unique_ptr< std::thread > readSupervisorThread_
std::vector< std::condition_variable * > cvReader_
std::vector< std::thread * > workerThreads_
std::list< std::pair< int, InputFile * > > filesToDelete_
std::vector< unsigned int > thread_quit_signal
void deleteFile(std::string const &)

Member Function Documentation

bool FedRawDataInputSource::checkNextEvent ( )
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 234 of file FedRawDataInputSource.cc.

References evf::EvFDaqDirector::createProcessingNotificationMaybe(), currentLumiSection_, daqDirector_, evf::EvFDaqDirector::emptyLumisectionMode(), event_, eventRunNumber_, eventsThisLumi_, fileListLoopMode_, fileListMode_, fms_, runEdmFileComparison::found, fuOutputDir_, evf::EvFDaqDirector::getEoLSFilePathOnBU(), evf::EvFDaqDirector::getEoLSFilePathOnFU(), evf::EvFDaqDirector::getEoRFilePathOnFU(), getLSFromFilename_, evf::FastMonitoringThread::inWaitInput, L1EventID_, evf::EvFDaqDirector::lockFULocal2(), maybeOpenNewLumiSection(), evf::EvFDaqDirector::newLumi, nextEvent(), evf::EvFDaqDirector::noFile, readSupervisor(), readSupervisorThread_, reportEventsThisLumiInSource(), edm::InputSource::resetLuminosityBlockAuxiliary(), evf::EvFDaqDirector::runEnded, runNumber_, edm::InputSource::setEventCached(), evf::FastMonitoringService::setInState(), startedSupervisorThread_, startupCv_, startupLock_, trackingPlots::stat, AlCaHLTBitMon_QueryRunRegistry::string, and evf::EvFDaqDirector::unlockFULocal2().

235 {
237  {
238  //late init of directory variable
240 
241  //this thread opens new files and dispatches reading to worker readers
242  //threadInit_.store(false,std::memory_order_release);
243  std::unique_lock<std::mutex> lk(startupLock_);
244  readSupervisorThread_.reset(new std::thread(&FedRawDataInputSource::readSupervisor,this));
246  startupCv_.wait(lk);
247  }
248  //signal hltd to start event accounting
252  switch (nextEvent() ) {
254  //maybe create EoL file in working directory before ending run
255  struct stat buf;
256  if ( currentLumiSection_ > 0) {
257  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
258  if (eolFound) {
260  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
261  if ( !found ) {
263  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
264  close(eol_fd);
266  }
267  }
268  }
269  //also create EoR file in FU data directory
270  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(),&buf) == 0);
271  if (!eorFound) {
272  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
273  close(eor_fd);
274  }
276  eventsThisLumi_=0;
278  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
279  return false;
280  }
282  //this is not reachable
283  return true;
284  }
286  //std::cout << "--------------NEW LUMI---------------" << std::endl;
287  return true;
288  }
289  default: {
290  if (!getLSFromFilename_) {
291  //get new lumi from file header
292  if (event_->lumi() > currentLumiSection_) {
294  eventsThisLumi_=0;
295  maybeOpenNewLumiSection( event_->lumi() );
296  }
297  }
300  else
301  eventRunNumber_=event_->run();
302  L1EventID_ = event_->event();
303 
304  setEventCached();
305 
306  return true;
307  }
308  }
309 }
void maybeOpenNewLumiSection(const uint32_t lumiSection)
void setInState(FastMonitoringThread::InputState inputState)
void createProcessingNotificationMaybe() const
std::unique_ptr< std::thread > readSupervisorThread_
std::string getEoLSFilePathOnBU(const unsigned int ls) const
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:376
std::unique_ptr< FRDEventMsgView > event_
std::condition_variable startupCv_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:359
evf::EvFDaqDirector * daqDirector_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
evf::FastMonitoringService * fms_
bool emptyLumisectionMode() const
std::string getEoLSFilePathOnFU(const unsigned int ls) const
evf::EvFDaqDirector::FileStatus nextEvent()
std::string getEoRFilePathOnFU() const
void FedRawDataInputSource::createBoLSFile ( const uint32_t  lumiSection,
bool  checkIfExists 
)
private

Definition at line 311 of file FedRawDataInputSource.cc.

References daqDirector_, evf::EvFDaqDirector::getBoLSFilePathOnFU(), trackingPlots::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by maybeOpenNewLumiSection().

312 {
313  //used for backpressure mechanisms and monitoring
314  const std::string fuBoLS = daqDirector_->getBoLSFilePathOnFU(lumiSection);
315  struct stat buf;
316  if (checkIfExists==false || stat(fuBoLS.c_str(), &buf) != 0) {
317  int bol_fd = open(fuBoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
318  close(bol_fd);
319  }
320 }
std::string getBoLSFilePathOnFU(const unsigned int ls) const
evf::EvFDaqDirector * daqDirector_
void FedRawDataInputSource::deleteFile ( std::string const &  fileName)
private

Definition at line 636 of file FedRawDataInputSource.cc.

References cppFunctionSkipper::exception, fileListMode_, MillePedeFileConverter_cfg::fileName, LogDebug, callgraph::path, and MatrixUtil::remove().

Referenced by getNextEvent(), read(), and ~FedRawDataInputSource().

637 {
638  //no deletion in file list mode
639  if (fileListMode_) return;
640 
641  const boost::filesystem::path filePath(fileName);
642  LogDebug("FedRawDataInputSource") << "Deleting input file -:" << fileName;
643  try {
644  //sometimes this fails but file gets deleted
645  boost::filesystem::remove(filePath);
646  }
647  catch (const boost::filesystem::filesystem_error& ex)
648  {
649  edm::LogError("FedRawDataInputSource") << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
650  << ". Trying again.";
651  usleep(100000);
652  try {
653  boost::filesystem::remove(filePath);
654  }
655  catch (const boost::filesystem::filesystem_error&) {/*file gets deleted first time but exception is still thrown*/}
656  }
657  catch (std::exception& ex)
658  {
659  edm::LogError("FedRawDataInputSource") << " - deleteFile std::exception CAUGHT -: " << ex.what()
660  << ". Trying again.";
661  usleep(100000);
662  try {
663  boost::filesystem::remove(filePath);
664  } catch (std::exception&) {/*file gets deleted first time but exception is still thrown*/}
665  }
666 }
#define LogDebug(id)
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:209
bool FedRawDataInputSource::exceptionState ( )
inlineprivate
void FedRawDataInputSource::fillDescriptions ( edm::ConfigurationDescriptions descriptions)
static

Definition at line 217 of file FedRawDataInputSource.cc.

References edm::ConfigurationDescriptions::add(), edm::ParameterSetDescription::addUntracked(), edm::ParameterSetDescription::setAllowAnything(), edm::ParameterSetDescription::setComment(), and edm::ParameterDescriptionNode::setComment().

218 {
220  desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk");
221  desc.addUntracked<unsigned int> ("eventChunkSize",32)->setComment("Input buffer (chunk) size");
222  desc.addUntracked<unsigned int> ("eventChunkBlock",32)->setComment("Block size used in a single file read call (must be smaller or equal to buffer size)");
223  desc.addUntracked<unsigned int> ("numBuffers",2)->setComment("Number of buffers used for reading input");
224  desc.addUntracked<unsigned int> ("maxBufferedFiles",2)->setComment("Maximum number of simultaneously buffered raw files");
225  desc.addUntracked<bool> ("verifyAdler32", true)->setComment("Verify event Adler32 checksum with FRDv3 or v4");
226  desc.addUntracked<bool> ("verifyChecksum", true)->setComment("Verify event CRC-32C checksum of FRDv5 or higher");
227  desc.addUntracked<bool> ("useL1EventID", false)->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
228  desc.addUntracked<bool> ("fileListMode", false)->setComment("Use fileNames parameter to directly specify raw files to open");
229  desc.addUntracked<std::vector<std::string>> ("fileNames", std::vector<std::string>())->setComment("file list used when fileListMode is enabled");
230  desc.setAllowAnything();
231  descriptions.add("source", desc);
232 }
void setComment(std::string const &value)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void setAllowAnything()
allow any parameter label/value pairs
void setComment(std::string const &value)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection ( FEDRawDataCollection rawData)
private

Definition at line 743 of file FedRawDataInputSource.cc.

References FEDRawData::data(), event_, fedt_struct::eventsize, evf::evtn::evm_board_sense(), Exception, FED_EVSZ_EXTRACT, FED_SOID_EXTRACT, FEDRawDataCollection::FEDData(), l1t::stage2::layer2::fedId, evf::evtn::get(), evf::evtn::getgpshigh(), evf::evtn::getgpslow(), evf::evtn::gtpe_board_sense(), evf::evtn::gtpe_get(), GTPEventID_, FEDNumbering::MAXFEDID, FEDNumbering::MINTCDSuTCAFEDID, FEDNumbering::MINTriggerEGTPFEDID, FEDNumbering::MINTriggerGTPFEDID, FEDRawData::resize(), fedh_struct::sourceid, tcds_pointer_, and ntuplemaker::time.

Referenced by read().

744 {
746  timeval stv;
747  gettimeofday(&stv,0);
748  time = stv.tv_sec;
749  time = (time << 32) + stv.tv_usec;
750  edm::Timestamp tstamp(time);
751 
752  uint32_t eventSize = event_->eventSize();
753  char* event = (char*)event_->payload();
754  GTPEventID_=0;
755  tcds_pointer_ = 0;
756  while (eventSize > 0) {
757  assert(eventSize>=sizeof(fedt_t));
758  eventSize -= sizeof(fedt_t);
759  const fedt_t* fedTrailer = (fedt_t*) (event + eventSize);
760  const uint32_t fedSize = FED_EVSZ_EXTRACT(fedTrailer->eventsize) << 3; //trailer length counts in 8 bytes
761  assert(eventSize>=fedSize - sizeof(fedt_t));
762  eventSize -= (fedSize - sizeof(fedt_t));
763  const fedh_t* fedHeader = (fedh_t *) (event + eventSize);
764  const uint16_t fedId = FED_SOID_EXTRACT(fedHeader->sourceid);
765  if(fedId>FEDNumbering::MAXFEDID)
766  {
767  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
768  }
769  if (fedId == FEDNumbering::MINTCDSuTCAFEDID) {
770  tcds_pointer_ = (unsigned char *)(event + eventSize );
771  }
772  if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
773  if (evf::evtn::evm_board_sense((unsigned char*) fedHeader,fedSize))
774  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,true);
775  else
776  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,false);
777  //evf::evtn::evm_board_setformat(fedSize);
778  const uint64_t gpsl = evf::evtn::getgpslow((unsigned char*) fedHeader);
779  const uint64_t gpsh = evf::evtn::getgpshigh((unsigned char*) fedHeader);
780  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
781  }
782  //take event ID from GTPE FED
783  if (fedId == FEDNumbering::MINTriggerEGTPFEDID && GTPEventID_==0) {
784  if (evf::evtn::gtpe_board_sense((unsigned char*)fedHeader)) {
785  GTPEventID_ = evf::evtn::gtpe_get((unsigned char*) fedHeader);
786  }
787  }
788  FEDRawData& fedData = rawData.FEDData(fedId);
789  fedData.resize(fedSize);
790  memcpy(fedData.data(), event + eventSize, fedSize);
791  }
792  assert(eventSize == 0);
793 
794  return tstamp;
795 }
unsigned int getgpshigh(const unsigned char *)
#define FED_SOID_EXTRACT(a)
Definition: fed_header.h:53
bool gtpe_board_sense(const unsigned char *p)
unsigned int get(const unsigned char *, bool)
unsigned int sourceid
Definition: fed_header.h:32
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
void resize(size_t newsize)
Definition: FEDRawData.cc:32
#define FED_EVSZ_EXTRACT(a)
Definition: fed_trailer.h:36
struct fedt_struct fedt_t
unsigned long long TimeValue_t
Definition: Timestamp.h:28
bool evm_board_sense(const unsigned char *p, size_t size)
unsigned long long uint64_t
Definition: Time.h:15
std::unique_ptr< FRDEventMsgView > event_
unsigned int eventsize
Definition: fed_trailer.h:33
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:28
unsigned int gtpe_get(const unsigned char *)
unsigned int getgpslow(const unsigned char *)
Definition: event.py:1
std::pair< bool, unsigned int > FedRawDataInputSource::getEventReport ( unsigned int  lumi,
bool  erase 
)

Definition at line 1382 of file FedRawDataInputSource.cc.

References CommonMethods::lock(), monlock_, and sourceEventsReport_.

Referenced by evf::FastMonitoringService::preGlobalEndLumi().

1383 {
1384  std::lock_guard<std::mutex> lock(monlock_);
1385  auto itr = sourceEventsReport_.find(lumi);
1386  if (itr!=sourceEventsReport_.end()) {
1387  auto && ret = std::pair<bool,unsigned int>(true,itr->second);
1388  if (erase)
1389  sourceEventsReport_.erase(itr);
1390  return ret;
1391  }
1392  else
1393  return std::pair<bool,unsigned int>(false,0);
1394 }
std::map< unsigned int, unsigned int > sourceEventsReport_
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getFile ( unsigned int &  ls,
std::string &  nextFile,
uint32_t &  fsize,
uint64_t &  lockWaitTime 
)
private

Definition at line 1425 of file FedRawDataInputSource.cc.

References fileListIndex_, fileListLoopMode_, MillePedeFileConverter_cfg::fileName, fileNames_, loopModeIterationInc_, evf::EvFDaqDirector::newFile, callgraph::path, evf::EvFDaqDirector::runEnded, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by readSupervisor().

1426 {
1427  if (fileListIndex_ < fileNames_.size()) {
1428  nextFile = fileNames_[fileListIndex_];
1429  if (nextFile.find("file://")==0) nextFile=nextFile.substr(7);
1430  else if (nextFile.find("file:")==0) nextFile=nextFile.substr(5);
1431  boost::filesystem::path fileName = nextFile;
1432  std::string fileStem = fileName.stem().string();
1433  if (fileStem.find("ls")) fileStem = fileStem.substr(fileStem.find("ls")+2);
1434  if (fileStem.find("_")) fileStem = fileStem.substr(0,fileStem.find("_"));
1435 
1436  if (!fileListLoopMode_)
1437  ls = boost::lexical_cast<unsigned int>(fileStem);
1438  else //always starting from LS 1 in loop mode
1439  ls = 1 + loopModeIterationInc_;
1440 
1441  //fsize = 0;
1442  //lockWaitTime = 0;
1443  fileListIndex_++;
1445  }
1446  else {
1447  if (!fileListLoopMode_)
1449  else {
1450  //loop through files until interrupted
1452  fileListIndex_=0;
1453  return getFile(ls,nextFile,fsize,lockWaitTime);
1454  }
1455  }
1456 }
std::vector< std::string > fileNames_
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
def ls(path, rec=False)
Definition: eostools.py:348
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent ( )
inlineprivate

Definition at line 373 of file FedRawDataInputSource.cc.

References InputFile::advance(), bufferInputRead_, InputFile::bufferPosition_, chunkIsFree_, InputFile::chunkPosition_, InputFile::chunks_, crc32c(), InputFile::currentChunk_, currentFile_, currentFileIndex_, currentLumiSection_, cvWakeup_, daqDirector_, deleteFile(), detectedFRDversion_, event_, eventChunkSize_, eventsThisLumi_, Exception, exceptionState(), fileDeleteLock_, fileListMode_, InputFile::fileName_, fileQueue_, InputFile::fileSize_, filesToDelete_, fms_, FRDHeaderVersionSize, freeChunks_, getLSFromFilename_, evf::FastMonitoringThread::inCachedEvent, evf::FastMonitoringThread::inChecksumEvent, evf::FastMonitoringThread::inChunkReceived, evf::FastMonitoringThread::inNewLumi, evf::FastMonitoringThread::inProcessingFile, evf::FastMonitoringThread::inRunEnd, evf::FastMonitoringThread::inWaitChunk, evf::FastMonitoringThread::inWaitInput, evf::EvFDaqDirector::isSingleStreamThread(), dttmaxenums::L, InputFile::lumi_, maybeOpenNewLumiSection(), InputFile::moveToPreviousChunk(), mWakeup_, InputFile::nChunks_, InputFile::nEvents_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, InputFile::nProcessed_, InputFile::parent_, readingFilesCount_, readNextChunkIntoBuffer(), reportEventsThisLumiInSource(), InputFile::rewindChunk(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, evf::EvFDaqDirector::sameFile, evf::FastMonitoringService::setExceptionDetected(), setExceptionState_, evf::FastMonitoringService::setInState(), singleBufferMode_, mps_update::status, InputFile::status_, threadError(), mps_check::timeout, verifyAdler32_, verifyChecksum_, and InputFile::waitForChunk().

Referenced by nextEvent().

374 {
375 
377  if (!currentFile_)
378  {
381  if (!fileQueue_.try_pop(currentFile_))
382  {
383  //sleep until wakeup (only in single-buffer mode) or timeout
384  std::unique_lock<std::mutex> lkw(mWakeup_);
385  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_)
387  }
388  status = currentFile_->status_;
389  if ( status == evf::EvFDaqDirector::runEnded)
390  {
392  delete currentFile_;
393  currentFile_=nullptr;
394  return status;
395  }
396  else if ( status == evf::EvFDaqDirector::runAbort)
397  {
398  throw cms::Exception("FedRawDataInputSource::getNextEvent") << "Run has been aborted by the input source reader thread";
399  }
400  else if (status == evf::EvFDaqDirector::newLumi)
401  {
403  if (getLSFromFilename_) {
406  eventsThisLumi_=0;
408  }
409  }
410  else {//let this be picked up from next event
412  }
413 
414  delete currentFile_;
415  currentFile_=nullptr;
416  return status;
417  }
418  else if (status == evf::EvFDaqDirector::newFile) {
420  }
421  else
422  assert(0);
423  }
425 
426  //file is empty
427  if (!currentFile_->fileSize_) {
429  //try to open new lumi
430  assert(currentFile_->nChunks_==0);
431  if (getLSFromFilename_)
434  eventsThisLumi_=0;
436  }
437  //immediately delete empty file
439  delete currentFile_;
440  currentFile_=nullptr;
442  }
443 
444  //file is finished
447  //release last chunk (it is never released elsewhere)
450  {
451  throw cms::Exception("FedRawDataInputSource::getNextEvent")
452  << "Fully processed " << currentFile_->nProcessed_
453  << " from the file " << currentFile_->fileName_
454  << " but according to BU JSON there should be "
455  << currentFile_->nEvents_ << " events";
456  }
457  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
458  if (singleBufferMode_) {
459  std::unique_lock<std::mutex> lkw(mWakeup_);
460  cvWakeup_.notify_one();
461  }
464  //put the file in pending delete list;
465  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
466  filesToDelete_.push_back(std::pair<int,InputFile*>(currentFileIndex_,currentFile_));
467  }
468  else {
469  //in single-thread and stream jobs, events are already processed
471  delete currentFile_;
472  }
473  currentFile_=nullptr;
475  }
476 
477 
478  //file is too short
480  {
481  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
482  "Premature end of input file while reading event header";
483  }
484  if (singleBufferMode_) {
485 
486  //should already be there
489  usleep(10000);
491  }
493 
494  unsigned char *dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
495 
496  //conditions when read amount is not sufficient for the header to fit
497  if (!bufferInputRead_ || bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_]
499  {
501 
502  if (detectedFRDversion_==0) {
503  detectedFRDversion_=*((uint32*)dataPosition);
504  if (detectedFRDversion_>5)
505  throw cms::Exception("FedRawDataInputSource::getNextEvent")
506  << "Unknown FRD version -: " << detectedFRDversion_;
507  assert(detectedFRDversion_>=1);
508  }
509 
510  //recalculate chunk position
511  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
512  if ( bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_])
513  {
514  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
515  "Premature end of input file while reading event header";
516  }
517  }
518 
519  event_.reset( new FRDEventMsgView(dataPosition) );
520  if (event_->size()>eventChunkSize_) {
521  throw cms::Exception("FedRawDataInputSource::getNextEvent")
522  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
523  << " run:" << event_->run() << " of size:" << event_->size()
524  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
525  }
526 
527  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
528 
530  {
531  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
532  "Premature end of input file while reading event data";
533  }
534  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
536  //recalculate chunk position
537  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
538  event_.reset( new FRDEventMsgView(dataPosition) );
539  }
540  currentFile_->bufferPosition_ += event_->size();
541  currentFile_->chunkPosition_ += event_->size();
542  //last chunk is released when this function is invoked next time
543 
544  }
545  //multibuffer mode:
546  else
547  {
548  //wait for the current chunk to become added to the vector
551  usleep(10000);
553  }
555 
556  //check if header is at the boundary of two chunks
557  chunkIsFree_ = false;
558  unsigned char *dataPosition;
559 
560  //read header, copy it to a single chunk if necessary
561  bool chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]);
562 
563  event_.reset( new FRDEventMsgView(dataPosition) );
564  if (event_->size()>eventChunkSize_) {
565  throw cms::Exception("FedRawDataInputSource::getNextEvent")
566  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
567  << " run:" << event_->run() << " of size:" << event_->size()
568  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
569  }
570 
571  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
572 
574  {
575  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
576  "Premature end of input file while reading event data";
577  }
578 
579  if (chunkEnd) {
580  //header was at the chunk boundary, we will have to move payload as well
581  currentFile_->moveToPreviousChunk(msgSize,FRDHeaderVersionSize[detectedFRDversion_]);
582  chunkIsFree_ = true;
583  }
584  else {
585  //header was contiguous, but check if payload fits the chunk
586  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
587  //rewind to header start position
588  currentFile_->rewindChunk(FRDHeaderVersionSize[detectedFRDversion_]);
589  //copy event to a chunk start and move pointers
590  chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]+msgSize);
591  assert(chunkEnd);
592  chunkIsFree_=true;
593  //header is moved
594  event_.reset( new FRDEventMsgView(dataPosition) );
595  }
596  else {
597  //everything is in a single chunk, only move pointers forward
598  chunkEnd = currentFile_->advance(dataPosition,msgSize);
599  assert(!chunkEnd);
600  chunkIsFree_=false;
601  }
602  }
603  }//end multibuffer mode
605 
606  if (verifyChecksum_ && event_->version() >= 5)
607  {
608  uint32_t crc=0;
609  crc = crc32c(crc,(const unsigned char*)event_->payload(),event_->eventSize());
610  if ( crc != event_->crc32c() ) {
612  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
613  "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() <<
614  " but calculated 0x" << crc;
615  }
616  }
617  else if ( verifyAdler32_ && event_->version() >= 3)
618  {
619  uint32_t adler = adler32(0L,Z_NULL,0);
620  adler = adler32(adler,(Bytef*)event_->payload(),event_->eventSize());
621 
622  if ( adler != event_->adler32() ) {
624  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
625  "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() <<
626  " but calculated 0x" << adler;
627  }
628  }
630 
632 
634 }
unsigned int lumi_
uint32_t chunkPosition_
std::condition_variable cvWakeup_
void setExceptionDetected(unsigned int ls)
void maybeOpenNewLumiSection(const uint32_t lumiSection)
void setInState(FastMonitoringThread::InputState inputState)
void rewindChunk(const size_t size)
const uint32 FRDHeaderVersionSize[6]
evf::EvFDaqDirector::FileStatus status_
int timeout
Definition: mps_check.py:48
FedRawDataInputSource * parent_
uint32_t bufferPosition_
bool advance(unsigned char *&dataPosition, const size_t size)
void moveToPreviousChunk(const size_t size, const size_t offset)
bool waitForChunk(unsigned int chunkid)
unsigned int uint32
Definition: MsgTools.h:13
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
Definition: crc32c.cc:340
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
unsigned int currentChunk_
tbb::concurrent_queue< InputFile * > fileQueue_
std::unique_ptr< FRDEventMsgView > event_
std::string fileName_
tbb::concurrent_vector< InputChunk * > chunks_
evf::EvFDaqDirector * daqDirector_
std::atomic< unsigned int > readingFilesCount_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
void deleteFile(std::string const &)
evf::FastMonitoringService * fms_
tbb::concurrent_queue< InputChunk * > freeChunks_
void readNextChunkIntoBuffer(InputFile *file)
int FedRawDataInputSource::grabNextJsonFile ( boost::filesystem::path const &  jsonSourcePath)
private

Definition at line 797 of file FedRawDataInputSource.cc.

References popcon2dropbox::copy(), daqDirector_, data, jsoncollector::DataPoint::deserialize(), reco::dp, dpd_, MillePedeFileConverter_cfg::e, cppFunctionSkipper::exception, Exception, fuOutputDir_, jsoncollector::DataPoint::getData(), Json::Reader::getFormatedErrorMessages(), jsoncollector::DataPointDefinition::getNames(), mps_fire::i, LogDebug, Json::Reader::parse(), callgraph::path, matplotRender::reader, MatrixUtil::remove(), AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, and evf::EvFDaqDirector::unlockFULocal().

Referenced by readSupervisor().

798 {
800  try {
801  // assemble json destination path
803 
804  //TODO:should be ported to use fffnaming
805  std::ostringstream fileNameWithPID;
806  fileNameWithPID << jsonSourcePath.stem().string() << "_pid"
807  << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
808  jsonDestPath /= fileNameWithPID.str();
809 
810  LogDebug("FedRawDataInputSource") << "JSON rename -: " << jsonSourcePath << " to "
811  << jsonDestPath;
812  try {
813  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
814  }
815  catch (const boost::filesystem::filesystem_error& ex)
816  {
817  // Input dir gone?
818  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
819  // << " Maybe the file is not yet visible by FU. Trying again in one second";
820  sleep(1);
821  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
822  }
824 
825  try {
826  //sometimes this fails but file gets deleted
827  boost::filesystem::remove(jsonSourcePath);
828  }
829  catch (const boost::filesystem::filesystem_error& ex)
830  {
831  // Input dir gone?
832  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
833  }
834  catch (std::exception& ex)
835  {
836  // Input dir gone?
837  edm::LogError("FedRawDataInputSource") << "grabNextFile std::exception CAUGHT -: " << ex.what();
838  }
839 
840  boost::filesystem::ifstream ij(jsonDestPath);
841  Json::Value deserializeRoot;
843 
844  std::stringstream ss;
845  ss << ij.rdbuf();
846  if (!reader.parse(ss.str(), deserializeRoot)) {
847  edm::LogError("FedRawDataInputSource") << "Failed to deserialize JSON file -: " << jsonDestPath
848  << "\nERROR:\n" << reader.getFormatedErrorMessages()
849  << "CONTENT:\n" << ss.str()<<".";
850  throw std::runtime_error("Cannot deserialize input JSON file");
851  }
852 
853  //read BU JSON
855  DataPoint dp;
856  dp.deserialize(deserializeRoot);
857  bool success = false;
858  for (unsigned int i=0;i<dpd_->getNames().size();i++) {
859  if (dpd_->getNames().at(i)=="NEvents")
860  if (i<dp.getData().size()) {
861  data = dp.getData()[i];
862  success=true;
863  }
864  }
865  if (!success) {
866  if (dp.getData().size())
867  data = dp.getData()[0];
868  else
869  throw cms::Exception("FedRawDataInputSource::grabNextJsonFile") <<
870  " error reading number of events from BU JSON -: No input value " << data;
871  }
872  return boost::lexical_cast<int>(data);
873 
874  }
875  catch (const boost::filesystem::filesystem_error& ex)
876  {
877  // Input dir gone?
879  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
880  }
881  catch (std::runtime_error e)
882  {
883  // Another process grabbed the file and NFS did not register this
885  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - runtime Exception -: " << e.what();
886  }
887 
888  catch( boost::bad_lexical_cast const& ) {
889  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - error parsing number of events from BU JSON. "
890  << "Input value is -: " << data;
891  }
892 
893  catch (std::exception e)
894  {
895  // BU run directory disappeared?
897  edm::LogError("FedRawDataInputSource") << "grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
898  }
899 
900  return -1;
901 }
#define LogDebug(id)
std::vector< std::string > & getData()
Definition: DataPoint.h:58
jsoncollector::DataPointDefinition * dpd_
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
Represents a JSON value.
Definition: value.h:111
virtual void deserialize(Json::Value &root)
Definition: DataPoint.cc:56
auto dp
Definition: deltaR.h:22
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:209
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
evf::EvFDaqDirector * daqDirector_
Unserialize a JSON document into a Value.
Definition: reader.h:16
std::vector< std::string > const & getNames()
std::string getFormatedErrorMessages() const
Returns a user friendly string that list errors in the parsed document.
long FedRawDataInputSource::initFileList ( )
private

Definition at line 1396 of file FedRawDataInputSource.cc.

References a, b, end, MillePedeFileConverter_cfg::fileName, fileNames_, callgraph::path, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FedRawDataInputSource().

1397 {
1398  std::sort(fileNames_.begin(),fileNames_.end(),
1399  [](std::string a, std::string b) {
1400  if (a.rfind("/")!=std::string::npos) a=a.substr(a.rfind("/"));
1401  if (b.rfind("/")!=std::string::npos) b=b.substr(b.rfind("/"));
1402  return b > a;});
1403 
1404  if (fileNames_.size()) {
1405  //get run number from first file in the vector
1407  std::string fileStem = fileName.stem().string();
1408  auto end = fileStem.find("_");
1409  if (fileStem.find("run")==0) {
1410  std::string runStr = fileStem.substr(3,end-3);
1411  try {
1412  //get long to support test run numbers < 2^32
1413  long rval = boost::lexical_cast<long>(runStr);
1414  edm::LogInfo("FedRawDataInputSource") << "Autodetected run number in fileListMode -: "<< rval;
1415  return rval;
1416  }
1417  catch( boost::bad_lexical_cast const& ) {
1418  edm::LogWarning("FedRawDataInputSource") << "Unable to autodetect run number in fileListMode from file -: "<< fileName;
1419  }
1420  }
1421  }
1422  return -1;
1423 }
std::vector< std::string > fileNames_
#define end
Definition: vmac.h:37
unsigned long long int rval
Definition: vlib.h:22
double b
Definition: hdecay.h:120
double a
Definition: hdecay.h:121
void FedRawDataInputSource::maybeOpenNewLumiSection ( const uint32_t  lumiSection)
private

Definition at line 322 of file FedRawDataInputSource.cc.

References createBoLSFile(), currentLumiSection_, daqDirector_, runEdmFileComparison::found, evf::EvFDaqDirector::getEoLSFilePathOnFU(), edm::Timestamp::invalidTimestamp(), evf::EvFDaqDirector::lockFULocal2(), edm::InputSource::luminosityBlock(), edm::InputSource::luminosityBlockAuxiliary(), processHistoryID_, edm::InputSource::resetLuminosityBlockAuxiliary(), edm::InputSource::run(), edm::InputSource::runAuxiliary(), edm::InputSource::setLuminosityBlockAuxiliary(), trackingPlots::stat, AlCaHLTBitMon_QueryRunRegistry::string, and evf::EvFDaqDirector::unlockFULocal2().

Referenced by checkNextEvent(), and getNextEvent().

323 {
325  || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
326 
327  if ( currentLumiSection_ > 0) {
328  const std::string fuEoLS =
330  struct stat buf;
331  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
332  if ( !found ) {
334  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
335  close(eol_fd);
336  createBoLSFile(lumiSection,false);
338  }
339  }
340  else createBoLSFile(lumiSection,true);//needed for initial lumisection
341 
342  currentLumiSection_ = lumiSection;
343 
345 
346  timeval tv;
347  gettimeofday(&tv, 0);
348  const edm::Timestamp lsopentime( (unsigned long long) tv.tv_sec * 1000000 + (unsigned long long) tv.tv_usec );
349 
350  edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary =
352  runAuxiliary()->run(),
353  lumiSection, lsopentime,
355 
356  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
357  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
358 
359  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: "<< lumiSection;
360  }
361 }
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists)
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:534
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:351
edm::ProcessHistoryID processHistoryID_
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:540
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:252
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:359
evf::EvFDaqDirector * daqDirector_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:255
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::nextEvent ( )
inlineprivate

Definition at line 363 of file FedRawDataInputSource.cc.

References getNextEvent(), svgfig::load(), evf::EvFDaqDirector::noFile, edm::shutdown_flag, and mps_update::status.

Referenced by checkNextEvent().

364 {
366  while ((status = getNextEvent())==evf::EvFDaqDirector::noFile)
367  {
368  if (edm::shutdown_flag.load(std::memory_order_relaxed)) break;
369  }
370  return status;
371 }
volatile std::atomic< bool > shutdown_flag
def load(fileName)
Definition: svgfig.py:546
evf::EvFDaqDirector::FileStatus getNextEvent()
void FedRawDataInputSource::read ( edm::EventPrincipal eventPrincipal)
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 669 of file FedRawDataInputSource.cc.

References printConversionInfo::aux, edm::DaqProvenanceHelper::branchDescription(), checkEvery_, chunkIsFree_, InputFile::chunks_, InputFile::currentChunk_, currentFile_, currentFileIndex_, currentLumiSection_, daqProvenanceHelper_, deleteFile(), edm::DaqProvenanceHelper::dummyProvenance(), eventID_, eventRunNumber_, eventsThisLumi_, fileDeleteLock_, fileListLoopMode_, filesToDelete_, fillFEDRawDataCollection(), fms_, freeChunks_, GTPEventID_, mps_fire::i, evf::FastMonitoringThread::inNoRequest, evf::FastMonitoringThread::inReadCleanup, evf::FastMonitoringThread::inReadEvent, L1EventID_, edm::RawInputSource::makeEvent(), evf::evtn::makeEventAuxiliary(), eostools::move(), InputFile::nProcessed_, nStreams_, edm::EventAuxiliary::PhysicsTrigger, edm::InputSource::processGUID(), processHistoryID_, edm::EventPrincipal::put(), record, evf::FastMonitoringService::setInState(), edm::EventAuxiliary::setProcessHistoryID(), streamFileTracker_, edm::EventPrincipal::streamID(), tcds_pointer_, and useL1EventID_.

Referenced by edmIntegrityCheck.PublishToFileSystem::get(), Vispa.Plugins.EdmBrowser.EdmDataAccessor.EdmDataAccessor::goto(), readNextChunkIntoBuffer(), readWorker(), and Vispa.Plugins.EdmBrowser.EdmDataAccessor.EdmDataAccessor::setFilterBranches().

670 {
672  std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
673  edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData);
674 
675  if (useL1EventID_){
677  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
679  aux.setProcessHistoryID(processHistoryID_);
680  makeEvent(eventPrincipal, aux);
681  }
682  else if(tcds_pointer_==0){
683  assert(GTPEventID_);
685  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
687  aux.setProcessHistoryID(processHistoryID_);
688  makeEvent(eventPrincipal, aux);
689  }
690  else{
691  evf::evtn::TCDSRecord record((unsigned char *)(tcds_pointer_));
696  makeEvent(eventPrincipal, aux);
697  }
698 
699 
700 
701  std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
702 
703  eventPrincipal.put(daqProvenanceHelper_.branchDescription(), std::move(edp),
705 
706  eventsThisLumi_++;
708 
709  //resize vector if needed
710  while (streamFileTracker_.size() <= eventPrincipal.streamID())
711  streamFileTracker_.push_back(-1);
712 
714 
715  //this old file check runs no more often than every 10 events
716  if (!((currentFile_->nProcessed_-1)%(checkEvery_))) {
717  //delete files that are not in processing
718  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
719  auto it = filesToDelete_.begin();
720  while (it!=filesToDelete_.end()) {
721  bool fileIsBeingProcessed = false;
722  for (unsigned int i=0;i<nStreams_;i++) {
723  if (it->first == streamFileTracker_.at(i)) {
724  fileIsBeingProcessed = true;
725  break;
726  }
727  }
728  if (!fileIsBeingProcessed) {
729  deleteFile(it->second->fileName_);
730  delete it->second;
731  it = filesToDelete_.erase(it);
732  }
733  else it++;
734  }
735 
736  }
738  chunkIsFree_=false;
740  return;
741 }
JetCorrectorParameters::Record record
Definition: classes.h:7
void setInState(FastMonitoringThread::InputState inputState)
ProductProvenance const & dummyProvenance() const
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:211
std::vector< int > streamFileTracker_
edm::EventAuxiliary makeEventAuxiliary(TCDSRecord *record, unsigned int runNumber, unsigned int lumiSection, std::string const &processGUID, bool verifyLumiSection)
StreamID streamID() const
BranchDescription const & branchDescription() const
edm::ProcessHistoryID processHistoryID_
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance) const
const edm::DaqProvenanceHelper daqProvenanceHelper_
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
unsigned int currentChunk_
void setProcessHistoryID(ProcessHistoryID const &phid)
tbb::concurrent_vector< InputChunk * > chunks_
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
void deleteFile(std::string const &)
evf::FastMonitoringService * fms_
tbb::concurrent_queue< InputChunk * > freeChunks_
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection &)
def move(src, dest)
Definition: eostools.py:510
void FedRawDataInputSource::readNextChunkIntoBuffer ( InputFile file)
private

Definition at line 1317 of file FedRawDataInputSource.cc.

References bufferInputRead_, InputFile::chunkPosition_, InputFile::chunks_, eventChunkBlock_, eventChunkSize_, Exception, fileDescriptor_, InputFile::fileName_, InputFile::fileSize_, mps_fire::i, plotBeamSpotDB::last, LogDebug, read(), and readBlocks_.

Referenced by getNextEvent().

1318 {
1319 
1320  if (fileDescriptor_<0) {
1321  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1322  bufferInputRead_ = 0;
1323  //off_t pos = lseek(fileDescriptor,0,SEEK_SET);
1324  if (fileDescriptor_>=0)
1325  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1326  else
1327  {
1328  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer") << "failed to open file " << std::endl
1329  << file->fileName_ << " fd:" << fileDescriptor_;
1330  }
1331  }
1332 
1333  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1334  uint32_t existingSize = 0;
1335  for (unsigned int i=0;i<readBlocks_;i++)
1336  {
1337  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1338  bufferInputRead_+=last;
1339  existingSize+=last;
1340  }
1341  }
1342  else {
1343  const uint32_t chunksize = file->chunkPosition_;
1344  const uint32_t blockcount=chunksize/eventChunkBlock_;
1345  const uint32_t leftsize = chunksize%eventChunkBlock_;
1346  uint32_t existingSize = eventChunkSize_ - file->chunkPosition_;
1347  memmove((void*) file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSize);
1348 
1349  for (uint32_t i=0;i<blockcount;i++) {
1350  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1351  bufferInputRead_+=last;
1352  existingSize+=last;
1353  }
1354  if (leftsize) {
1355  const ssize_t last = ::read(fileDescriptor_,( void*)( file->chunks_[0]->buf_ + existingSize ), leftsize);
1356  bufferInputRead_+=last;
1357  }
1358  file->chunkPosition_=0;//data was moved to beginning of the chunk
1359  }
1360  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1361  if (fileDescriptor_!=-1)
1362  {
1363  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1364  close(fileDescriptor_);
1365  fileDescriptor_=-1;
1366  }
1367  }
1368 }
#define LogDebug(id)
uint32_t chunkPosition_
virtual void read(edm::EventPrincipal &eventPrincipal) override
std::string fileName_
tbb::concurrent_vector< InputChunk * > chunks_
void FedRawDataInputSource::readSupervisor ( )
private

Definition at line 907 of file FedRawDataInputSource.cc.

References InputFile::chunks_, cvReader_, cvWakeup_, daqDirector_, eventChunkSize_, fileListMode_, fileQueue_, fms_, freeChunks_, getFile(), getLSFromFilename_, grabNextJsonFile(), mps_fire::i, InputFile, evf::FastMonitoringThread::inRunEnd, evf::FastMonitoringThread::inSupBusy, evf::FastMonitoringThread::inSupFileLimit, evf::FastMonitoringThread::inSupLockPolling, evf::FastMonitoringThread::inSupNewFile, evf::FastMonitoringThread::inSupNewFileWaitChunk, evf::FastMonitoringThread::inSupNewFileWaitChunkCopying, evf::FastMonitoringThread::inSupNewFileWaitThread, evf::FastMonitoringThread::inSupNewFileWaitThreadCopying, evf::FastMonitoringThread::inSupNoFile, evf::FastMonitoringThread::inSupWaitFreeChunk, evf::FastMonitoringThread::inSupWaitFreeChunkCopying, evf::FastMonitoringThread::inSupWaitFreeThread, evf::FastMonitoringThread::inSupWaitFreeThreadCopying, LogDebug, eostools::ls(), maxBufferedFiles_, mReader_, mWakeup_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, callgraph::path, quit_threads_, InputChunk::readComplete_, readingFilesCount_, evf::FastMonitoringService::reportLockWait(), InputChunk::reset(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, setExceptionState_, evf::FastMonitoringService::setInStateSup(), edm::shutdown_flag, singleBufferMode_, evf::FastMonitoringService::startedLookingForFile(), startupCv_, startupLock_, trackingPlots::stat, mps_update::status, evf::FastMonitoringService::stoppedLookingForFile(), AlCaHLTBitMon_QueryRunRegistry::string, thread_quit_signal, tid_active_, mps_check::timeout, evf::EvFDaqDirector::updateFuLock(), workerJob_, workerPool_, and workerThreads_.

Referenced by checkNextEvent().

908 {
909  bool stop=false;
910  unsigned int currentLumiSection = 0;
911  //threadInit_.exchange(true,std::memory_order_acquire);
912 
913  {
914  std::unique_lock<std::mutex> lk(startupLock_);
915  startupCv_.notify_one();
916  }
917 
918  uint32_t ls=0;
919  uint32_t monLS=1;
920  uint32_t lockCount=0;
921  uint64_t sumLockWaitTimeUs=0.;
922 
923  while (!stop) {
924 
925  //wait for at least one free thread and chunk
926  int counter=0;
928  {
929  //report state to monitoring
930  if (fms_) {
931  bool copy_active=false;
932  for (auto j : tid_active_) if (j) copy_active=true;
934  else if (freeChunks_.empty()) {
935  if (copy_active)
937  else
939  }
940  else {
941  if (copy_active)
943  else
945  }
946  }
947  std::unique_lock<std::mutex> lkw(mWakeup_);
948  //sleep until woken up by condition or a timeout
949  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
950  counter++;
951  //if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads...";
952  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
953  }
954  else {
955  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
956  }
957  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {stop=true;break;}
958  }
959 
960  if (stop) break;
961 
962  //look for a new file
963  std::string nextFile;
964  uint32_t fileSize;
965 
966  if (fms_) {
970  }
971 
973 
974  while (status == evf::EvFDaqDirector::noFile) {
975  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
976  stop=true;
977  break;
978  }
979 
980  uint64_t thisLockWaitTimeUs=0.;
981  if (fileListMode_) {
982  //return LS if LS not set, otherwise return file
983  status = getFile(ls,nextFile,fileSize,thisLockWaitTimeUs);
984  }
985  else
986  status = daqDirector_->updateFuLock(ls,nextFile,fileSize,thisLockWaitTimeUs);
987 
989 
990  if (currentLumiSection!=ls && status==evf::EvFDaqDirector::runEnded) status=evf::EvFDaqDirector::noFile;
991 
992  //monitoring of lock wait time
993  if (thisLockWaitTimeUs>0.)
994  sumLockWaitTimeUs+=thisLockWaitTimeUs;
995  lockCount++;
996  if (ls>monLS) {
997  monLS=ls;
998  if (lockCount)
999  if (fms_) fms_->reportLockWait(monLS,sumLockWaitTimeUs,lockCount);
1000  lockCount=0;
1001  sumLockWaitTimeUs=0;
1002  }
1003 
1004  //check again for any remaining index/EoLS files after EoR file is seen
1005  if ( status == evf::EvFDaqDirector::runEnded && !fileListMode_) {
1007  usleep(100000);
1008  //now all files should have appeared in ramdisk, check again if any raw files were left behind
1009  status = daqDirector_->updateFuLock(ls,nextFile,fileSize,thisLockWaitTimeUs);
1010  if (currentLumiSection!=ls && status==evf::EvFDaqDirector::runEnded) status=evf::EvFDaqDirector::noFile;
1011  }
1012 
1013  if ( status == evf::EvFDaqDirector::runEnded) {
1015  stop=true;
1016  break;
1017  }
1018 
1019  //queue new lumisection
1020  if( getLSFromFilename_ && ls > currentLumiSection) {
1021  //fms_->setInStateSup(evf::FastMonitoringThread::inSupNewLumi);
1022  currentLumiSection = ls;
1023  fileQueue_.push(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
1024  }
1025 
1026  if( getLSFromFilename_ && currentLumiSection>0 && ls < currentLumiSection) {
1027  edm::LogError("FedRawDataInputSource") << "Got old LS ("<<ls<<") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<". Aborting execution."<<std::endl;
1029  stop=true;
1030  break;
1031  }
1032 
1033  int dbgcount=0;
1034  if (status == evf::EvFDaqDirector::noFile) {
1036  dbgcount++;
1037  if (!(dbgcount%20)) LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
1038  usleep(100000);
1039  }
1040  }
1041  if ( status == evf::EvFDaqDirector::newFile ) {
1043  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
1044 
1045 
1046  boost::filesystem::path rawFilePath(nextFile);
1047  std::string rawFile = rawFilePath.replace_extension(".raw").string();
1048 
1049  struct stat st;
1050  int stat_res = stat(rawFile.c_str(),&st);
1051  if (stat_res==-1) {
1052  edm::LogError("FedRawDataInputSource") << "Can not stat file (" << errno << "):-"<< rawFile << std::endl;
1053  setExceptionState_=true;
1054  break;
1055  }
1056  fileSize=st.st_size;
1057 
1058  if (fms_) {
1062  }
1063  int eventsInNewFile;
1064  if (fileListMode_) {
1065  if (fileSize==0) eventsInNewFile=0;
1066  else eventsInNewFile=-1;
1067  }
1068  else {
1069  eventsInNewFile = grabNextJsonFile(nextFile);
1070  assert( eventsInNewFile>=0 );
1071  assert((eventsInNewFile>0) == (fileSize>0));//file without events must be empty
1072  }
1073 
1074  if (!singleBufferMode_) {
1075  //calculate number of needed chunks
1076  unsigned int neededChunks = fileSize/eventChunkSize_;
1077  if (fileSize%eventChunkSize_) neededChunks++;
1078 
1079  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,this);
1081  fileQueue_.push(newInputFile);
1082 
1083  for (unsigned int i=0;i<neededChunks;i++) {
1084 
1085  if (fms_) {
1086  bool copy_active=false;
1087  for (auto j : tid_active_) if (j) copy_active=true;
1088  if (copy_active)
1090  else
1092  }
1093  //get thread
1094  unsigned int newTid = 0xffffffff;
1095  while (!workerPool_.try_pop(newTid)) {
1096  usleep(100000);
1097  }
1098 
1099  if (fms_) {
1100  bool copy_active=false;
1101  for (auto j : tid_active_) if (j) copy_active=true;
1102  if (copy_active)
1104  else
1106  }
1107  InputChunk * newChunk = nullptr;
1108  while (!freeChunks_.try_pop(newChunk)) {
1109  usleep(100000);
1110  if (quit_threads_.load(std::memory_order_relaxed)) break;
1111  }
1112 
1113  if (newChunk == nullptr) {
1114  //return unused tid if we received shutdown (nullptr chunk)
1115  if (newTid!=0xffffffff) workerPool_.push(newTid);
1116  stop = true;
1117  break;
1118  }
1120 
1121  std::unique_lock<std::mutex> lk(mReader_);
1122 
1123  unsigned int toRead = eventChunkSize_;
1124  if (i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
1125  newChunk->reset(i*eventChunkSize_,toRead,i);
1126 
1127  workerJob_[newTid].first=newInputFile;
1128  workerJob_[newTid].second=newChunk;
1129 
1130  //wake up the worker thread
1131  cvReader_[newTid]->notify_one();
1132  }
1133  }
1134  else {
1135  if (!eventsInNewFile) {
1136  //still queue file for lumi update
1137  std::unique_lock<std::mutex> lkw(mWakeup_);
1138  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,this);
1140  fileQueue_.push(newInputFile);
1141  cvWakeup_.notify_one();
1142  break;
1143  }
1144  //in single-buffer mode put single chunk in the file and let the main thread read the file
1145  InputChunk * newChunk;
1146  //should be available immediately
1147  while(!freeChunks_.try_pop(newChunk)) usleep(100000);
1148 
1149  std::unique_lock<std::mutex> lkw(mWakeup_);
1150 
1151  unsigned int toRead = eventChunkSize_;
1152  if (fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
1153  newChunk->reset(0,toRead,0);
1154  newChunk->readComplete_=true;
1155 
1156  //push file and wakeup main thread
1157  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,this);
1158  newInputFile->chunks_[0]=newChunk;
1160  fileQueue_.push(newInputFile);
1161  cvWakeup_.notify_one();
1162  }
1163  }
1164  }
1166  //make sure threads finish reading
1167  unsigned numFinishedThreads = 0;
1168  while (numFinishedThreads < workerThreads_.size()) {
1169  unsigned int tid;
1170  while (!workerPool_.try_pop(tid)) {usleep(10000);}
1171  std::unique_lock<std::mutex> lk(mReader_);
1172  thread_quit_signal[tid]=true;
1173  cvReader_[tid]->notify_one();
1174  numFinishedThreads++;
1175  }
1176  for (unsigned int i=0;i<workerThreads_.size();i++) {
1177  workerThreads_[i]->join();
1178  delete workerThreads_[i];
1179  }
1180 }
#define LogDebug(id)
std::condition_variable cvWakeup_
tbb::concurrent_queue< unsigned int > workerPool_
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
volatile std::atomic< bool > shutdown_flag
void setInStateSup(FastMonitoringThread::InputState inputState)
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
int timeout
Definition: mps_check.py:48
std::vector< std::condition_variable * > cvReader_
void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
int grabNextJsonFile(boost::filesystem::path const &)
std::vector< std::thread * > workerThreads_
std::atomic< bool > readComplete_
def ls(path, rec=False)
Definition: eostools.py:348
unsigned long long uint64_t
Definition: Time.h:15
tbb::concurrent_queue< InputFile * > fileQueue_
std::condition_variable startupCv_
void stoppedLookingForFile(unsigned int lumi)
tbb::concurrent_vector< InputChunk * > chunks_
evf::EvFDaqDirector * daqDirector_
std::vector< unsigned int > thread_quit_signal
std::atomic< unsigned int > readingFilesCount_
evf::FastMonitoringService * fms_
std::vector< unsigned int > tid_active_
tbb::concurrent_queue< InputChunk * > freeChunks_
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
void FedRawDataInputSource::readWorker ( unsigned int  tid)
private

Definition at line 1182 of file FedRawDataInputSource.cc.

References InputChunk::buf_, InputFile::chunks_, cvReader_, detectedFRDversion_, mps_update::diff, end, eventChunkBlock_, FrontierConditions_GlobalTag_cff::file, InputChunk::fileIndex_, InputFile::fileName_, plotBeamSpotDB::first, mps_fire::i, init, plotBeamSpotDB::last, LogDebug, mReader_, cmsPerfSuiteHarvest::now, InputChunk::offset_, read(), readBlocks_, InputChunk::readComplete_, edm::second(), setExceptionState_, command_line::start, startupCv_, startupLock_, thread_quit_signal, threadInit_, tid_active_, InputChunk::usedSize_, workerJob_, and workerPool_.

Referenced by FedRawDataInputSource().

1183 {
1184  bool init = true;
1185  threadInit_.exchange(true,std::memory_order_acquire);
1186 
1187  while (1) {
1188 
1189  tid_active_[tid]=false;
1190  std::unique_lock<std::mutex> lk(mReader_);
1191  workerJob_[tid].first=nullptr;
1192  workerJob_[tid].first=nullptr;
1193 
1194  assert(!thread_quit_signal[tid]);//should never get it here
1195  workerPool_.push(tid);
1196 
1197  if (init) {
1198  std::unique_lock<std::mutex> lk(startupLock_);
1199  init = false;
1200  startupCv_.notify_one();
1201  }
1202  cvReader_[tid]->wait(lk);
1203 
1204  if (thread_quit_signal[tid]) return;
1205  tid_active_[tid]=true;
1206 
1207  InputFile * file;
1208  InputChunk * chunk;
1209 
1210  assert(workerJob_[tid].first!=nullptr && workerJob_[tid].second!=nullptr);
1211 
1212  file = workerJob_[tid].first;
1213  chunk = workerJob_[tid].second;
1214 
1215  int fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1216  off_t pos = lseek(fileDescriptor,chunk->offset_,SEEK_SET);
1217 
1218 
1219  if (fileDescriptor>=0)
1220  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_ << " at offset " << pos;
1221  else
1222  {
1223  edm::LogError("FedRawDataInputSource") <<
1224  "readWorker failed to open file -: " << file->fileName_ << " fd:" << fileDescriptor <<
1225  " or seek to offset " << chunk->offset_ << ", lseek returned:" << pos;
1226  setExceptionState_=true;
1227  return;
1228 
1229  }
1230 
1231  unsigned int bufferLeft = 0;
1233  for (unsigned int i=0;i<readBlocks_;i++)
1234  {
1235  const ssize_t last = ::read(fileDescriptor,( void*) (chunk->buf_+bufferLeft), eventChunkBlock_);
1236  if ( last > 0 )
1237  bufferLeft+=last;
1238  if (last < eventChunkBlock_) {
1239  assert(chunk->usedSize_==i*eventChunkBlock_+last);
1240  break;
1241  }
1242  }
1244  auto diff = end-start;
1245  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1246  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB" << " in " << msec.count() << " ms ("<< (bufferLeft >> 20)/double(msec.count())<<" GB/s)";
1247  close(fileDescriptor);
1248 
1249  if (detectedFRDversion_==0 && chunk->offset_==0) detectedFRDversion_=*((uint32*)chunk->buf_);
1250  assert(detectedFRDversion_<=5);
1251  chunk->readComplete_=true;//this is atomic to secure the sequential buffer fill before becoming available for processing)
1252  file->chunks_[chunk->fileIndex_]=chunk;//put the completed chunk in the file chunk vector at predetermined index
1253 
1254  }
1255 }
#define LogDebug(id)
Definition: start.py:1
virtual void read(edm::EventPrincipal &eventPrincipal) override
unsigned int offset_
tbb::concurrent_queue< unsigned int > workerPool_
int init
Definition: HydjetWrapper.h:67
std::vector< ReaderInfo > workerJob_
std::vector< std::condition_variable * > cvReader_
U second(std::pair< T, U > const &p)
unsigned char * buf_
#define end
Definition: vmac.h:37
unsigned int fileIndex_
unsigned int uint32
Definition: MsgTools.h:13
std::atomic< bool > readComplete_
std::string fileName_
std::condition_variable startupCv_
tbb::concurrent_vector< InputChunk * > chunks_
std::atomic< bool > threadInit_
std::vector< unsigned int > thread_quit_signal
std::vector< unsigned int > tid_active_
void FedRawDataInputSource::reportEventsThisLumiInSource ( unsigned int  lumi,
unsigned int  events 
)
private

Definition at line 1371 of file FedRawDataInputSource.cc.

References CommonMethods::lock(), monlock_, and sourceEventsReport_.

Referenced by checkNextEvent(), and getNextEvent().

1372 {
1373 
1374  std::lock_guard<std::mutex> lock(monlock_);
1375  auto itr = sourceEventsReport_.find(lumi);
1376  if (itr!=sourceEventsReport_.end())
1377  itr->second+=events;
1378  else
1380 }
std::map< unsigned int, unsigned int > sourceEventsReport_
void FedRawDataInputSource::rewind_ ( )
overrideprivatevirtual

Reimplemented from edm::RawInputSource.

Definition at line 903 of file FedRawDataInputSource.cc.

904 {}
void FedRawDataInputSource::threadError ( )
private

Definition at line 1257 of file FedRawDataInputSource.cc.

References Exception, and quit_threads_.

Referenced by getNextEvent().

1258 {
1259  quit_threads_=true;
1260  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1261 
1262 }
std::atomic< bool > quit_threads_

Friends And Related Function Documentation

friend struct InputChunk
friend

Definition at line 46 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

friend struct InputFile
friend

Definition at line 45 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

Member Data Documentation

uint32_t FedRawDataInputSource::bufferInputRead_ = 0
private

Definition at line 177 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readNextChunkIntoBuffer().

unsigned int FedRawDataInputSource::checkEvery_ = 10
private

Definition at line 168 of file FedRawDataInputSource.h.

Referenced by read().

bool FedRawDataInputSource::chunkIsFree_ =false
private

Definition at line 140 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

InputFile* FedRawDataInputSource::currentFile_ = 0
private

Definition at line 139 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

int FedRawDataInputSource::currentFileIndex_ = -1
private

Definition at line 162 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

unsigned int FedRawDataInputSource::currentLumiSection_
private
std::vector<std::condition_variable*> FedRawDataInputSource::cvReader_
private
std::condition_variable FedRawDataInputSource::cvWakeup_
private

Definition at line 172 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

evf::EvFDaqDirector* FedRawDataInputSource::daqDirector_ = 0
private
const edm::DaqProvenanceHelper FedRawDataInputSource::daqProvenanceHelper_
private

Definition at line 113 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and read().

std::string FedRawDataInputSource::defPath_
private

Definition at line 87 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

uint32 FedRawDataInputSource::detectedFRDversion_ =0
private

Definition at line 138 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readWorker().

jsoncollector::DataPointDefinition* FedRawDataInputSource::dpd_
private

Definition at line 128 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and grabNextJsonFile().

std::unique_ptr<FRDEventMsgView> FedRawDataInputSource::event_
private

Definition at line 115 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), fillFEDRawDataCollection(), and getNextEvent().

unsigned int FedRawDataInputSource::eventChunkBlock_
private
unsigned int FedRawDataInputSource::eventChunkSize_
private
edm::EventID FedRawDataInputSource::eventID_
private

Definition at line 117 of file FedRawDataInputSource.h.

Referenced by read().

uint32_t FedRawDataInputSource::eventRunNumber_ =0
private

Definition at line 121 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and read().

unsigned int FedRawDataInputSource::eventsThisLumi_
private

Definition at line 125 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), getNextEvent(), and read().

unsigned long FedRawDataInputSource::eventsThisRun_ = 0
private

Definition at line 126 of file FedRawDataInputSource.h.

std::mutex FedRawDataInputSource::fileDeleteLock_
private

Definition at line 165 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and read().

int FedRawDataInputSource::fileDescriptor_ = -1
private

Definition at line 176 of file FedRawDataInputSource.h.

Referenced by readNextChunkIntoBuffer().

unsigned int FedRawDataInputSource::fileListIndex_ = 0
private

Definition at line 106 of file FedRawDataInputSource.h.

Referenced by getFile().

const bool FedRawDataInputSource::fileListLoopMode_
private

Definition at line 107 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), FedRawDataInputSource(), getFile(), and read().

const bool FedRawDataInputSource::fileListMode_
private
std::vector<std::string> FedRawDataInputSource::fileNames_
private

Definition at line 102 of file FedRawDataInputSource.h.

Referenced by getFile(), and initFileList().

std::list<std::pair<int,std::string> > FedRawDataInputSource::fileNamesToDelete_
private

Definition at line 164 of file FedRawDataInputSource.h.

tbb::concurrent_queue<InputFile*> FedRawDataInputSource::fileQueue_
private

Definition at line 150 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

std::list<std::pair<int,InputFile*> > FedRawDataInputSource::filesToDelete_
private
evf::FastMonitoringService* FedRawDataInputSource::fms_ = 0
private
tbb::concurrent_queue<InputChunk*> FedRawDataInputSource::freeChunks_
private

Definition at line 149 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), read(), and readSupervisor().

std::string FedRawDataInputSource::fuOutputDir_
private

Definition at line 111 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and grabNextJsonFile().

const bool FedRawDataInputSource::getLSFromFilename_
private

Definition at line 98 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), getNextEvent(), and readSupervisor().

uint32_t FedRawDataInputSource::GTPEventID_ = 0
private

Definition at line 122 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

uint32_t FedRawDataInputSource::L1EventID_ = 0
private

Definition at line 123 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and read().

unsigned int FedRawDataInputSource::loopModeIterationInc_ = 0
private

Definition at line 108 of file FedRawDataInputSource.h.

Referenced by getFile().

unsigned int FedRawDataInputSource::maxBufferedFiles_
private

Definition at line 93 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

std::mutex FedRawDataInputSource::monlock_
private

Definition at line 182 of file FedRawDataInputSource.h.

Referenced by getEventReport(), and reportEventsThisLumiInSource().

std::mutex FedRawDataInputSource::mReader_
private

Definition at line 152 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), readWorker(), and ~FedRawDataInputSource().

std::mutex FedRawDataInputSource::mWakeup_
private

Definition at line 171 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

unsigned int FedRawDataInputSource::nStreams_ = 0
private

Definition at line 167 of file FedRawDataInputSource.h.

Referenced by read().

unsigned int FedRawDataInputSource::numBuffers_
private

Definition at line 92 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

unsigned int FedRawDataInputSource::numConcurrentReads_
private

Definition at line 94 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and ~FedRawDataInputSource().

edm::ProcessHistoryID FedRawDataInputSource::processHistoryID_
private

Definition at line 118 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), maybeOpenNewLumiSection(), and read().

std::atomic<bool> FedRawDataInputSource::quit_threads_
private
unsigned int FedRawDataInputSource::readBlocks_
private
std::atomic<unsigned int> FedRawDataInputSource::readingFilesCount_
private

Definition at line 95 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

std::unique_ptr<std::thread> FedRawDataInputSource::readSupervisorThread_
private

Definition at line 143 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and ~FedRawDataInputSource().

edm::RunNumber_t FedRawDataInputSource::runNumber_
private

Definition at line 110 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and FedRawDataInputSource().

bool FedRawDataInputSource::setExceptionState_ = false
private

Definition at line 158 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), readSupervisor(), and readWorker().

bool FedRawDataInputSource::singleBufferMode_
private

Definition at line 175 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

std::map<unsigned int,unsigned int> FedRawDataInputSource::sourceEventsReport_
private

Definition at line 181 of file FedRawDataInputSource.h.

Referenced by getEventReport(), and reportEventsThisLumiInSource().

bool FedRawDataInputSource::startedSupervisorThread_ = false
private

Definition at line 142 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and ~FedRawDataInputSource().

std::condition_variable FedRawDataInputSource::startupCv_
private
std::mutex FedRawDataInputSource::startupLock_
private
std::vector<int> FedRawDataInputSource::streamFileTracker_
private

Definition at line 166 of file FedRawDataInputSource.h.

Referenced by read().

unsigned char* FedRawDataInputSource::tcds_pointer_
private

Definition at line 124 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

std::vector<unsigned int> FedRawDataInputSource::thread_quit_signal
private
std::atomic<bool> FedRawDataInputSource::threadInit_
private

Definition at line 179 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and readWorker().

std::vector<unsigned int> FedRawDataInputSource::tid_active_
private

Definition at line 154 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), readSupervisor(), and readWorker().

const bool FedRawDataInputSource::useL1EventID_
private

Definition at line 101 of file FedRawDataInputSource.h.

Referenced by read().

const bool FedRawDataInputSource::verifyAdler32_
private

Definition at line 99 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

const bool FedRawDataInputSource::verifyChecksum_
private

Definition at line 100 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

std::vector<ReaderInfo> FedRawDataInputSource::workerJob_
private

Definition at line 147 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), readSupervisor(), and readWorker().

tbb::concurrent_queue<unsigned int> FedRawDataInputSource::workerPool_
private

Definition at line 146 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), and readWorker().

std::vector<std::thread*> FedRawDataInputSource::workerThreads_
private