CMS 3D CMS Logo

List of all members | Public Member Functions | Static Public Member Functions | Protected Member Functions | Private Types | Private Member Functions | Private Attributes | Friends
FedRawDataInputSource Class Reference

#include <FedRawDataInputSource.h>

Inheritance diagram for FedRawDataInputSource:
edm::RawInputSource edm::InputSource

Public Member Functions

 FedRawDataInputSource (edm::ParameterSet const &, edm::InputSourceDescription const &)
 
std::pair< bool, unsigned int > getEventReport (unsigned int lumi, bool erase)
 
 ~FedRawDataInputSource () override
 
- Public Member Functions inherited from edm::RawInputSource
 RawInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
 
 ~RawInputSource () override
 
- Public Member Functions inherited from edm::InputSource
std::shared_ptr< ActivityRegistryactReg () const
 Accessor for Activity Registry. More...
 
std::shared_ptr< BranchIDListHelper const > branchIDListHelper () const
 Accessors for branchIDListHelper. More...
 
std::shared_ptr< BranchIDListHelper > & branchIDListHelper ()
 
void closeFile (FileBlock *, bool cleaningUpAfterException)
 close current file More...
 
void doBeginJob ()
 Called by framework at beginning of job. More...
 
virtual void doBeginLumi (LuminosityBlockPrincipal &lbp, ProcessContext const *)
 Called by framework at beginning of lumi block. More...
 
virtual void doBeginRun (RunPrincipal &rp, ProcessContext const *)
 Called by framework at beginning of run. More...
 
void doEndJob ()
 Called by framework at end of job. More...
 
virtual void doEndLumi (LuminosityBlockPrincipal &lbp, bool cleaningUpAfterException, ProcessContext const *)
 Called by framework at end of lumi block. More...
 
virtual void doEndRun (RunPrincipal &rp, bool cleaningUpAfterException, ProcessContext const *)
 Called by framework at end of run. More...
 
ProcessingController::ForwardState forwardState () const
 
bool goToEvent (EventID const &eventID)
 
 InputSource (ParameterSet const &, InputSourceDescription const &)
 Constructor. More...
 
 InputSource (InputSource const &)=delete
 
void issueReports (EventID const &eventID)
 issue an event report More...
 
LuminosityBlockNumber_t luminosityBlock () const
 Accessor for current luminosity block number. More...
 
std::shared_ptr< LuminosityBlockAuxiliaryluminosityBlockAuxiliary () const
 Called by the framework to merge or insert lumi in principal cache. More...
 
int maxEvents () const
 
int maxLuminosityBlocks () const
 
ModuleDescription const & moduleDescription () const
 Accessor for 'module' description. More...
 
ItemType nextItemType ()
 Advances the source to the next item. More...
 
InputSourceoperator= (InputSource const &)=delete
 
ProcessConfiguration const & processConfiguration () const
 Accessor for Process Configuration. More...
 
std::string const & processGUID () const
 Accessor for global process identifier. More...
 
ProcessHistoryRegistry const & processHistoryRegistry () const
 Accessors for process history registry. More...
 
ProcessHistoryRegistryprocessHistoryRegistry ()
 
ProcessingMode processingMode () const
 RunsLumisAndEvents (default), RunsAndLumis, or Runs. More...
 
std::shared_ptr< ProductRegistry const > productRegistry () const
 Accessors for product registry. More...
 
std::shared_ptr< ProductRegistry > & productRegistry ()
 
bool randomAccess () const
 
void readAndMergeLumi (LuminosityBlockPrincipal &lbp)
 Read next luminosity block (same as a prior lumi) More...
 
void readAndMergeRun (RunPrincipal &rp)
 Read next run (same as a prior run) More...
 
void readEvent (EventPrincipal &ep, StreamContext &)
 Read next event. More...
 
bool readEvent (EventPrincipal &ep, EventID const &, StreamContext &)
 Read a specific event. More...
 
std::unique_ptr< FileBlockreadFile ()
 Read next file. More...
 
void readLuminosityBlock (LuminosityBlockPrincipal &lumiPrincipal, HistoryAppender &historyAppender)
 Read next luminosity block (new lumi) More...
 
std::shared_ptr< LuminosityBlockAuxiliaryreadLuminosityBlockAuxiliary ()
 Read next luminosity block Auxilary. More...
 
void readRun (RunPrincipal &runPrincipal, HistoryAppender &historyAppender)
 Read next run (new run) More...
 
std::shared_ptr< RunAuxiliaryreadRunAuxiliary ()
 Read next run Auxiliary. More...
 
ProcessHistoryID const & reducedProcessHistoryID () const
 
virtual void registerProducts ()
 Register any produced products. More...
 
int remainingEvents () const
 
int remainingLuminosityBlocks () const
 
void repeat ()
 Reset the remaining number of events/lumis to the maximum number. More...
 
std::pair< SharedResourcesAcquirer *, std::recursive_mutex * > resourceSharedWithDelayedReader ()
 Returns nullptr if no resource shared between the Source and a DelayedReader. More...
 
ProcessingController::ReverseState reverseState () const
 
void rewind ()
 Begin again at the first event. More...
 
RunNumber_t run () const
 Accessor for current run number. More...
 
std::shared_ptr< RunAuxiliaryrunAuxiliary () const
 Called by the framework to merge or insert run in principal cache. More...
 
void setLuminosityBlockNumber_t (LuminosityBlockNumber_t lb)
 Set the luminosity block ID. More...
 
void setRunNumber (RunNumber_t r)
 Set the run number. More...
 
void skipEvents (int offset)
 
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper () const
 Accessors for thinnedAssociationsHelper. More...
 
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper ()
 
Timestamp const & timestamp () const
 Accessor for the current time, as seen by the input source. More...
 
virtual ~InputSource () noexcept(false)
 Destructor. More...
 

Static Public Member Functions

static void fillDescriptions (edm::ConfigurationDescriptions &descriptions)
 
- Static Public Member Functions inherited from edm::RawInputSource
static void fillDescription (ParameterSetDescription &description)
 
- Static Public Member Functions inherited from edm::InputSource
static const std::string & baseType ()
 
static void fillDescription (ParameterSetDescription &desc)
 
static void fillDescriptions (ConfigurationDescriptions &descriptions)
 
static void prevalidate (ConfigurationDescriptions &)
 

Protected Member Functions

bool checkNextEvent () override
 
void read (edm::EventPrincipal &eventPrincipal) override
 
- Protected Member Functions inherited from edm::RawInputSource
void makeEvent (EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
 
void setInputFileTransitionsEachEvent ()
 
- Protected Member Functions inherited from edm::InputSource
void decreaseRemainingEventsBy (int iSkipped)
 
bool eventCached () const
 
bool newLumi () const
 
bool newRun () const
 
ProcessHistoryRegistryprocessHistoryRegistryForUpdate ()
 
ProductRegistryproductRegistryUpdate ()
 
void reset () const
 
void resetEventCached ()
 
void resetLuminosityBlockAuxiliary (bool isNewLumi=true) const
 
void resetNewLumi ()
 
void resetNewRun ()
 
void resetRunAuxiliary (bool isNewRun=true) const
 
void setEventCached ()
 Called by the framework to merge or ached() const {return eventCached_;}. More...
 
void setLuminosityBlockAuxiliary (LuminosityBlockAuxiliary *lbp)
 
void setNewLumi ()
 
void setNewRun ()
 
void setRunAuxiliary (RunAuxiliary *rp)
 
void setTimestamp (Timestamp const &theTime)
 To set the current time, as seen by the input source. More...
 
virtual void skip (int offset)
 
ItemType state () const
 

Private Types

typedef std::pair< InputFile *, InputChunk * > ReaderInfo
 

Private Member Functions

void createBoLSFile (const uint32_t lumiSection, bool checkIfExists)
 
void deleteFile (std::string const &)
 
bool exceptionState ()
 
edm::Timestamp fillFEDRawDataCollection (FEDRawDataCollection &)
 
evf::EvFDaqDirector::FileStatus getFile (unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
 
evf::EvFDaqDirector::FileStatus getNextEvent ()
 
int grabNextJsonFile (boost::filesystem::path const &)
 
long initFileList ()
 
void maybeOpenNewLumiSection (const uint32_t lumiSection)
 
evf::EvFDaqDirector::FileStatus nextEvent ()
 
void readNextChunkIntoBuffer (InputFile *file)
 
void readSupervisor ()
 
void readWorker (unsigned int tid)
 
void reportEventsThisLumiInSource (unsigned int lumi, unsigned int events)
 
void rewind_ () override
 
void threadError ()
 

Private Attributes

uint32_t bufferInputRead_ = 0
 
unsigned int checkEvery_ = 10
 
bool chunkIsFree_ =false
 
InputFilecurrentFile_ = 0
 
int currentFileIndex_ = -1
 
unsigned int currentLumiSection_
 
std::vector< std::condition_variable * > cvReader_
 
std::condition_variable cvWakeup_
 
evf::EvFDaqDirectordaqDirector_ = 0
 
const edm::DaqProvenanceHelper daqProvenanceHelper_
 
std::string defPath_
 
uint32 detectedFRDversion_ =0
 
jsoncollector::DataPointDefinitiondpd_
 
std::unique_ptr< FRDEventMsgViewevent_
 
unsigned int eventChunkBlock_
 
unsigned int eventChunkSize_
 
edm::EventID eventID_
 
uint32_t eventRunNumber_ =0
 
unsigned int eventsThisLumi_
 
unsigned long eventsThisRun_ = 0
 
std::mutex fileDeleteLock_
 
int fileDescriptor_ = -1
 
unsigned int fileListIndex_ = 0
 
const bool fileListLoopMode_
 
const bool fileListMode_
 
std::vector< std::string > fileNames_
 
std::list< std::pair< int, std::string > > fileNamesToDelete_
 
tbb::concurrent_queue< InputFile * > fileQueue_
 
std::list< std::pair< int, InputFile * > > filesToDelete_
 
evf::FastMonitoringServicefms_ = 0
 
tbb::concurrent_queue< InputChunk * > freeChunks_
 
std::string fuOutputDir_
 
const bool getLSFromFilename_
 
uint32_t GTPEventID_ = 0
 
uint32_t L1EventID_ = 0
 
unsigned int loopModeIterationInc_ = 0
 
unsigned int maxBufferedFiles_
 
std::mutex monlock_
 
std::mutex mReader_
 
std::mutex mWakeup_
 
unsigned int nStreams_ = 0
 
unsigned int numBuffers_
 
unsigned int numConcurrentReads_
 
edm::ProcessHistoryID processHistoryID_
 
std::atomic< bool > quit_threads_
 
unsigned int readBlocks_
 
std::atomic< unsigned int > readingFilesCount_
 
std::unique_ptr< std::thread > readSupervisorThread_
 
edm::RunNumber_t runNumber_
 
bool setExceptionState_ = false
 
bool singleBufferMode_
 
std::map< unsigned int, unsigned int > sourceEventsReport_
 
bool startedSupervisorThread_ = false
 
std::condition_variable startupCv_
 
std::mutex startupLock_
 
std::vector< int > streamFileTracker_
 
unsigned char * tcds_pointer_
 
std::vector< unsigned int > thread_quit_signal
 
std::atomic< bool > threadInit_
 
std::vector< unsigned int > tid_active_
 
const bool useL1EventID_
 
const bool verifyAdler32_
 
const bool verifyChecksum_
 
std::vector< ReaderInfoworkerJob_
 
tbb::concurrent_queue< unsigned int > workerPool_
 
std::vector< std::thread * > workerThreads_
 

Friends

struct InputChunk
 
struct InputFile
 

Additional Inherited Members

- Public Types inherited from edm::InputSource
enum  ItemType {
  IsInvalid, IsStop, IsFile, IsRun,
  IsLumi, IsEvent, IsRepeat, IsSynchronize
}
 
enum  ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents }
 
- Public Attributes inherited from edm::InputSource
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> postEventReadFromSourceSignal_
 
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> preEventReadFromSourceSignal_
 

Detailed Description

Definition at line 41 of file FedRawDataInputSource.h.

Member Typedef Documentation

Definition at line 134 of file FedRawDataInputSource.h.

Constructor & Destructor Documentation

FedRawDataInputSource::FedRawDataInputSource ( edm::ParameterSet const &  pset,
edm::InputSourceDescription const &  desc 
)
explicit

Definition at line 56 of file FedRawDataInputSource.cc.

References edm::Timestamp::beginOfTime(), crc32c_hw_test(), cvReader_, daqDirector_, edm::DaqProvenanceHelper::daqInit(), daqProvenanceHelper_, defPath_, dpd_, MillePedeFileConverter_cfg::e, eventChunkBlock_, eventChunkSize_, Exception, fileDeleteLock_, fileListLoopMode_, fileListMode_, filesToDelete_, fms_, freeChunks_, mps_fire::i, evf::FastMonitoringThread::inInit, initFileList(), InputChunk, edm::Timestamp::invalidTimestamp(), numBuffers_, numConcurrentReads_, Utilities::operator, processHistoryID_, edm::InputSource::processHistoryRegistryForUpdate(), edm::InputSource::productRegistryUpdate(), quit_threads_, readBlocks_, readingFilesCount_, readWorker(), edm::InputSource::runAuxiliary(), runNumber_, evf::EvFDaqDirector::setDeleteTracking(), evf::EvFDaqDirector::setFMS(), evf::FastMonitoringService::setInputSource(), evf::FastMonitoringService::setInState(), evf::FastMonitoringService::setInStateSup(), edm::InputSource::setNewRun(), edm::InputSource::setRunAuxiliary(), singleBufferMode_, startupCv_, startupLock_, trackingPlots::stat, AlCaHLTBitMon_QueryRunRegistry::string, thread_quit_signal, threadInit_, tid_active_, workerJob_, and workerThreads_.

57  :
59  defPath_(pset.getUntrackedParameter<std::string> ("buDefPath", std::string(getenv("CMSSW_BASE"))+"/src/EventFilter/Utilities/plugins/budef.jsd")),
60  eventChunkSize_(pset.getUntrackedParameter<unsigned int> ("eventChunkSize",32)*1048576),
61  eventChunkBlock_(pset.getUntrackedParameter<unsigned int> ("eventChunkBlock",32)*1048576),
62  numBuffers_(pset.getUntrackedParameter<unsigned int> ("numBuffers",2)),
63  maxBufferedFiles_(pset.getUntrackedParameter<unsigned int> ("maxBufferedFiles",2)),
64  getLSFromFilename_(pset.getUntrackedParameter<bool> ("getLSFromFilename", true)),
65  verifyAdler32_(pset.getUntrackedParameter<bool> ("verifyAdler32", true)),
66  verifyChecksum_(pset.getUntrackedParameter<bool> ("verifyChecksum", true)),
67  useL1EventID_(pset.getUntrackedParameter<bool> ("useL1EventID", false)),
68  fileNames_(pset.getUntrackedParameter<std::vector<std::string>> ("fileNames",std::vector<std::string>())),
69  fileListMode_(pset.getUntrackedParameter<bool> ("fileListMode", false)),
70  fileListLoopMode_(pset.getUntrackedParameter<bool> ("fileListLoopMode", false)),
74  eventID_(),
77  tcds_pointer_(nullptr),
78  eventsThisLumi_(0),
79  dpd_(nullptr)
80 {
81  char thishost[256];
82  gethostname(thishost, 255);
83  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: "
84  << std::endl << (eventChunkSize_/1048576)
85  << " MB on host " << thishost;
86 
87  long autoRunNumber = -1;
88  if (fileListMode_) {
89  autoRunNumber = initFileList();
90  if (!fileListLoopMode_) {
91  if (autoRunNumber<0)
92  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") << "Run number not found from filename";
93  //override run number
94  runNumber_ = (edm::RunNumber_t)autoRunNumber;
95  edm::Service<evf::EvFDaqDirector>()->overrideRunNumber((unsigned int)autoRunNumber);
96  }
97  }
98 
100  setNewRun();
101  //todo:autodetect from file name (assert if names differ)
104 
105  std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
106  defPath_ = std::string(getenv("CMSSW_BASE")) + "/" + defPathSuffix;
107  struct stat statbuf;
108  if (stat(defPath_.c_str(), &statbuf)) {
109  defPath_ = std::string(getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
110  if (stat(defPath_.c_str(), &statbuf)) {
111  defPath_ = defPathSuffix;
112  }
113  }
114 
115  dpd_ = new DataPointDefinition();
116  std::string defLabel = "data";
117  DataPointDefinition::getDataPointDefinitionFor(defPath_, dpd_,&defLabel);
118 
119  //make sure that chunk size is N * block size
124 
125  if (!numBuffers_)
126  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") <<
127  "no reading enabled with numBuffers parameter 0";
128 
132 
133  if (!crc32c_hw_test())
134  edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
135 
136  //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
137  if (fileListMode_) {
138  try {
140  } catch(cms::Exception e) {
141  edm::LogInfo("FedRawDataInputSource") << "No FastMonitoringService found in the configuration";
142  }
143  }
144  else {
146  if (!fms_) {
147  throw cms::Exception("FedRawDataInputSource") << "FastMonitoringService not found";
148  }
149  }
150 
152  if (!daqDirector_)
153  cms::Exception("FedRawDataInputSource") << "EvFDaqDirector not found";
154 
155  //set DaqDirector to delete files in preGlobalEndLumi callback
157  if (fms_) {
159  fms_->setInputSource(this);
162  }
163  //should delete chunks when run stops
164  for (unsigned int i=0;i<numBuffers_;i++) {
166  }
167 
168  quit_threads_ = false;
169 
170  for (unsigned int i=0;i<numConcurrentReads_;i++)
171  {
172  std::unique_lock<std::mutex> lk(startupLock_);
173  //issue a memory fence here and in threads (constructor was segfaulting without this)
174  thread_quit_signal.push_back(false);
175  workerJob_.push_back(ReaderInfo(nullptr,nullptr));
176  cvReader_.push_back(new std::condition_variable);
177  tid_active_.push_back(0);
178  threadInit_.store(false,std::memory_order_release);
179  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker,this,i));
180  startupCv_.wait(lk);
181  }
182 
183  runAuxiliary()->setProcessHistoryID(processHistoryID_);
184 }
std::vector< std::string > fileNames_
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:339
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
bool crc32c_hw_test()
Definition: crc32c.cc:354
jsoncollector::DataPointDefinition * dpd_
void setInState(FastMonitoringThread::InputState inputState)
void setInStateSup(FastMonitoringThread::InputState inputState)
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
std::vector< std::condition_variable * > cvReader_
static Timestamp beginOfTime()
Definition: Timestamp.h:103
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
edm::ProcessHistoryID processHistoryID_
const edm::DaqProvenanceHelper daqProvenanceHelper_
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
std::vector< std::thread * > workerThreads_
void setInputSource(FedRawDataInputSource *inputSource)
std::list< std::pair< int, InputFile * > > filesToDelete_
def getRunNumber(filename)
std::condition_variable startupCv_
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:250
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:340
std::atomic< bool > threadInit_
std::pair< InputFile *, InputChunk * > ReaderInfo
evf::EvFDaqDirector * daqDirector_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:342
void readWorker(unsigned int tid)
std::vector< unsigned int > thread_quit_signal
std::atomic< unsigned int > readingFilesCount_
evf::FastMonitoringService * fms_
std::vector< unsigned int > tid_active_
unsigned int RunNumber_t
tbb::concurrent_queue< InputChunk * > freeChunks_
void setFMS(evf::FastMonitoringService *fms)
FedRawDataInputSource::~FedRawDataInputSource ( )
override

Definition at line 186 of file FedRawDataInputSource.cc.

References cvReader_, deleteFile(), filesToDelete_, mps_fire::i, mReader_, numConcurrentReads_, quit_threads_, readSupervisorThread_, startedSupervisorThread_, thread_quit_signal, and workerThreads_.

187 {
188  quit_threads_=true;
189 
190  //delete any remaining open files
191  for (auto it = filesToDelete_.begin();it!=filesToDelete_.end();it++) {
192  deleteFile(it->second->fileName_);
193  delete it->second;
194  }
196  readSupervisorThread_->join();
197  }
198  else {
199  //join aux threads in case the supervisor thread was not started
200  for (unsigned int i=0;i<workerThreads_.size();i++) {
201  std::unique_lock<std::mutex> lk(mReader_);
202  thread_quit_signal[i]=true;
203  cvReader_[i]->notify_one();
204  lk.unlock();
205  workerThreads_[i]->join();
206  delete workerThreads_[i];
207  }
208  }
209  for (unsigned int i=0;i<numConcurrentReads_;i++) delete cvReader_[i];
210  /*
211  for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
212  InputChunk *ch;
213  while (!freeChunks_.try_pop(ch)) {}
214  delete ch;
215  }
216  */
217 }
std::atomic< bool > quit_threads_
std::unique_ptr< std::thread > readSupervisorThread_
std::vector< std::condition_variable * > cvReader_
std::vector< std::thread * > workerThreads_
std::list< std::pair< int, InputFile * > > filesToDelete_
std::vector< unsigned int > thread_quit_signal
void deleteFile(std::string const &)

Member Function Documentation

bool FedRawDataInputSource::checkNextEvent ( )
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 236 of file FedRawDataInputSource.cc.

References evf::EvFDaqDirector::createProcessingNotificationMaybe(), currentLumiSection_, daqDirector_, evf::EvFDaqDirector::emptyLumisectionMode(), event_, eventRunNumber_, eventsThisLumi_, fileListLoopMode_, fileListMode_, fms_, runEdmFileComparison::found, fuOutputDir_, evf::EvFDaqDirector::getEoLSFilePathOnBU(), evf::EvFDaqDirector::getEoLSFilePathOnFU(), evf::EvFDaqDirector::getEoRFilePathOnFU(), getLSFromFilename_, evf::FastMonitoringThread::inWaitInput, L1EventID_, evf::EvFDaqDirector::lockFULocal2(), maybeOpenNewLumiSection(), evf::EvFDaqDirector::newLumi, nextEvent(), evf::EvFDaqDirector::noFile, readSupervisor(), readSupervisorThread_, reportEventsThisLumiInSource(), edm::InputSource::resetLuminosityBlockAuxiliary(), evf::EvFDaqDirector::runEnded, runNumber_, edm::InputSource::setEventCached(), evf::FastMonitoringService::setInState(), startedSupervisorThread_, startupCv_, startupLock_, trackingPlots::stat, AlCaHLTBitMon_QueryRunRegistry::string, and evf::EvFDaqDirector::unlockFULocal2().

237 {
239  {
240  //late init of directory variable
242 
243  //this thread opens new files and dispatches reading to worker readers
244  //threadInit_.store(false,std::memory_order_release);
245  std::unique_lock<std::mutex> lk(startupLock_);
246  readSupervisorThread_.reset(new std::thread(&FedRawDataInputSource::readSupervisor,this));
248  startupCv_.wait(lk);
249  }
250  //signal hltd to start event accounting
254  switch (nextEvent() ) {
256  //maybe create EoL file in working directory before ending run
257  struct stat buf;
258  if ( currentLumiSection_ > 0) {
259  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
260  if (eolFound) {
262  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
263  if ( !found ) {
265  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
266  close(eol_fd);
268  }
269  }
270  }
271  //also create EoR file in FU data directory
272  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(),&buf) == 0);
273  if (!eorFound) {
274  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
275  close(eor_fd);
276  }
278  eventsThisLumi_=0;
280  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
281  return false;
282  }
284  //this is not reachable
285  return true;
286  }
288  //std::cout << "--------------NEW LUMI---------------" << std::endl;
289  return true;
290  }
291  default: {
292  if (!getLSFromFilename_) {
293  //get new lumi from file header
294  if (event_->lumi() > currentLumiSection_) {
296  eventsThisLumi_=0;
297  maybeOpenNewLumiSection( event_->lumi() );
298  }
299  }
302  else
303  eventRunNumber_=event_->run();
304  L1EventID_ = event_->event();
305 
306  setEventCached();
307 
308  return true;
309  }
310  }
311 }
void maybeOpenNewLumiSection(const uint32_t lumiSection)
void setInState(FastMonitoringThread::InputState inputState)
void createProcessingNotificationMaybe() const
std::unique_ptr< std::thread > readSupervisorThread_
std::string getEoLSFilePathOnBU(const unsigned int ls) const
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:371
std::unique_ptr< FRDEventMsgView > event_
std::condition_variable startupCv_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:354
evf::EvFDaqDirector * daqDirector_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
evf::FastMonitoringService * fms_
bool emptyLumisectionMode() const
std::string getEoLSFilePathOnFU(const unsigned int ls) const
evf::EvFDaqDirector::FileStatus nextEvent()
std::string getEoRFilePathOnFU() const
void FedRawDataInputSource::createBoLSFile ( const uint32_t  lumiSection,
bool  checkIfExists 
)
private

Definition at line 313 of file FedRawDataInputSource.cc.

References daqDirector_, evf::EvFDaqDirector::getBoLSFilePathOnFU(), trackingPlots::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by maybeOpenNewLumiSection().

314 {
315  //used for backpressure mechanisms and monitoring
316  const std::string fuBoLS = daqDirector_->getBoLSFilePathOnFU(lumiSection);
317  struct stat buf;
318  if (checkIfExists==false || stat(fuBoLS.c_str(), &buf) != 0) {
319  int bol_fd = open(fuBoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
320  close(bol_fd);
321  }
322 }
std::string getBoLSFilePathOnFU(const unsigned int ls) const
evf::EvFDaqDirector * daqDirector_
void FedRawDataInputSource::deleteFile ( std::string const &  fileName)
private

Definition at line 638 of file FedRawDataInputSource.cc.

References cppFunctionSkipper::exception, fileListMode_, MillePedeFileConverter_cfg::fileName, LogDebug, callgraph::path, and MatrixUtil::remove().

Referenced by getNextEvent(), read(), and ~FedRawDataInputSource().

639 {
640  //no deletion in file list mode
641  if (fileListMode_) return;
642 
643  const boost::filesystem::path filePath(fileName);
644  LogDebug("FedRawDataInputSource") << "Deleting input file -:" << fileName;
645  try {
646  //sometimes this fails but file gets deleted
647  boost::filesystem::remove(filePath);
648  }
649  catch (const boost::filesystem::filesystem_error& ex)
650  {
651  edm::LogError("FedRawDataInputSource") << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
652  << ". Trying again.";
653  usleep(100000);
654  try {
655  boost::filesystem::remove(filePath);
656  }
657  catch (const boost::filesystem::filesystem_error&) {/*file gets deleted first time but exception is still thrown*/}
658  }
659  catch (std::exception& ex)
660  {
661  edm::LogError("FedRawDataInputSource") << " - deleteFile std::exception CAUGHT -: " << ex.what()
662  << ". Trying again.";
663  usleep(100000);
664  try {
665  boost::filesystem::remove(filePath);
666  } catch (std::exception&) {/*file gets deleted first time but exception is still thrown*/}
667  }
668 }
#define LogDebug(id)
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:209
bool FedRawDataInputSource::exceptionState ( )
inlineprivate
void FedRawDataInputSource::fillDescriptions ( edm::ConfigurationDescriptions descriptions)
static

Definition at line 219 of file FedRawDataInputSource.cc.

References edm::ConfigurationDescriptions::add(), edm::ParameterSetDescription::addUntracked(), edm::ParameterSetDescription::setAllowAnything(), edm::ParameterSetDescription::setComment(), and edm::ParameterDescriptionNode::setComment().

220 {
222  desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk");
223  desc.addUntracked<unsigned int> ("eventChunkSize",32)->setComment("Input buffer (chunk) size");
224  desc.addUntracked<unsigned int> ("eventChunkBlock",32)->setComment("Block size used in a single file read call (must be smaller or equal to buffer size)");
225  desc.addUntracked<unsigned int> ("numBuffers",2)->setComment("Number of buffers used for reading input");
226  desc.addUntracked<unsigned int> ("maxBufferedFiles",2)->setComment("Maximum number of simultaneously buffered raw files");
227  desc.addUntracked<bool> ("verifyAdler32", true)->setComment("Verify event Adler32 checksum with FRDv3 or v4");
228  desc.addUntracked<bool> ("verifyChecksum", true)->setComment("Verify event CRC-32C checksum of FRDv5 or higher");
229  desc.addUntracked<bool> ("useL1EventID", false)->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
230  desc.addUntracked<bool> ("fileListMode", false)->setComment("Use fileNames parameter to directly specify raw files to open");
231  desc.addUntracked<std::vector<std::string>> ("fileNames", std::vector<std::string>())->setComment("file list used when fileListMode is enabled");
232  desc.setAllowAnything();
233  descriptions.add("source", desc);
234 }
void setComment(std::string const &value)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void setAllowAnything()
allow any parameter label/value pairs
void setComment(std::string const &value)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection ( FEDRawDataCollection rawData)
private

Definition at line 745 of file FedRawDataInputSource.cc.

References FEDRawData::data(), event_, fedt_struct::eventsize, evf::evtn::evm_board_sense(), Exception, FED_EVSZ_EXTRACT, FED_SOID_EXTRACT, FEDRawDataCollection::FEDData(), l1t::stage2::layer2::fedId, evf::evtn::get(), evf::evtn::getgpshigh(), evf::evtn::getgpslow(), evf::evtn::gtpe_board_sense(), evf::evtn::gtpe_get(), GTPEventID_, FEDNumbering::MAXFEDID, FEDNumbering::MINTCDSuTCAFEDID, FEDNumbering::MINTriggerEGTPFEDID, FEDNumbering::MINTriggerGTPFEDID, FEDRawData::resize(), fedh_struct::sourceid, tcds_pointer_, and ntuplemaker::time.

Referenced by read().

746 {
748  timeval stv;
749  gettimeofday(&stv,nullptr);
750  time = stv.tv_sec;
751  time = (time << 32) + stv.tv_usec;
752  edm::Timestamp tstamp(time);
753 
754  uint32_t eventSize = event_->eventSize();
755  char* event = (char*)event_->payload();
756  GTPEventID_=0;
757  tcds_pointer_ = nullptr;
758  while (eventSize > 0) {
759  assert(eventSize>=sizeof(fedt_t));
760  eventSize -= sizeof(fedt_t);
761  const fedt_t* fedTrailer = (fedt_t*) (event + eventSize);
762  const uint32_t fedSize = FED_EVSZ_EXTRACT(fedTrailer->eventsize) << 3; //trailer length counts in 8 bytes
763  assert(eventSize>=fedSize - sizeof(fedt_t));
764  eventSize -= (fedSize - sizeof(fedt_t));
765  const fedh_t* fedHeader = (fedh_t *) (event + eventSize);
766  const uint16_t fedId = FED_SOID_EXTRACT(fedHeader->sourceid);
767  if(fedId>FEDNumbering::MAXFEDID)
768  {
769  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
770  }
771  if (fedId == FEDNumbering::MINTCDSuTCAFEDID) {
772  tcds_pointer_ = (unsigned char *)(event + eventSize );
773  }
774  if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
775  if (evf::evtn::evm_board_sense((unsigned char*) fedHeader,fedSize))
776  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,true);
777  else
778  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,false);
779  //evf::evtn::evm_board_setformat(fedSize);
780  const uint64_t gpsl = evf::evtn::getgpslow((unsigned char*) fedHeader);
781  const uint64_t gpsh = evf::evtn::getgpshigh((unsigned char*) fedHeader);
782  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
783  }
784  //take event ID from GTPE FED
785  if (fedId == FEDNumbering::MINTriggerEGTPFEDID && GTPEventID_==0) {
786  if (evf::evtn::gtpe_board_sense((unsigned char*)fedHeader)) {
787  GTPEventID_ = evf::evtn::gtpe_get((unsigned char*) fedHeader);
788  }
789  }
790  FEDRawData& fedData = rawData.FEDData(fedId);
791  fedData.resize(fedSize);
792  memcpy(fedData.data(), event + eventSize, fedSize);
793  }
794  assert(eventSize == 0);
795 
796  return tstamp;
797 }
unsigned int getgpshigh(const unsigned char *)
#define FED_SOID_EXTRACT(a)
Definition: fed_header.h:53
bool gtpe_board_sense(const unsigned char *p)
unsigned int get(const unsigned char *, bool)
unsigned int sourceid
Definition: fed_header.h:32
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
void resize(size_t newsize)
Definition: FEDRawData.cc:32
#define FED_EVSZ_EXTRACT(a)
Definition: fed_trailer.h:36
struct fedt_struct fedt_t
unsigned long long TimeValue_t
Definition: Timestamp.h:28
bool evm_board_sense(const unsigned char *p, size_t size)
unsigned long long uint64_t
Definition: Time.h:15
std::unique_ptr< FRDEventMsgView > event_
unsigned int eventsize
Definition: fed_trailer.h:33
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:28
unsigned int gtpe_get(const unsigned char *)
unsigned int getgpslow(const unsigned char *)
Definition: event.py:1
std::pair< bool, unsigned int > FedRawDataInputSource::getEventReport ( unsigned int  lumi,
bool  erase 
)

Definition at line 1384 of file FedRawDataInputSource.cc.

References CommonMethods::lock(), monlock_, and sourceEventsReport_.

Referenced by evf::FastMonitoringService::preGlobalEndLumi().

1385 {
1386  std::lock_guard<std::mutex> lock(monlock_);
1387  auto itr = sourceEventsReport_.find(lumi);
1388  if (itr!=sourceEventsReport_.end()) {
1389  auto && ret = std::pair<bool,unsigned int>(true,itr->second);
1390  if (erase)
1391  sourceEventsReport_.erase(itr);
1392  return ret;
1393  }
1394  else
1395  return std::pair<bool,unsigned int>(false,0);
1396 }
std::map< unsigned int, unsigned int > sourceEventsReport_
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getFile ( unsigned int &  ls,
std::string &  nextFile,
uint32_t &  fsize,
uint64_t &  lockWaitTime 
)
private

Definition at line 1427 of file FedRawDataInputSource.cc.

References fileListIndex_, fileListLoopMode_, MillePedeFileConverter_cfg::fileName, fileNames_, loopModeIterationInc_, evf::EvFDaqDirector::newFile, callgraph::path, evf::EvFDaqDirector::runEnded, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by readSupervisor().

1428 {
1429  if (fileListIndex_ < fileNames_.size()) {
1430  nextFile = fileNames_[fileListIndex_];
1431  if (nextFile.find("file://")==0) nextFile=nextFile.substr(7);
1432  else if (nextFile.find("file:")==0) nextFile=nextFile.substr(5);
1433  boost::filesystem::path fileName = nextFile;
1434  std::string fileStem = fileName.stem().string();
1435  if (fileStem.find("ls")) fileStem = fileStem.substr(fileStem.find("ls")+2);
1436  if (fileStem.find("_")) fileStem = fileStem.substr(0,fileStem.find("_"));
1437 
1438  if (!fileListLoopMode_)
1439  ls = boost::lexical_cast<unsigned int>(fileStem);
1440  else //always starting from LS 1 in loop mode
1441  ls = 1 + loopModeIterationInc_;
1442 
1443  //fsize = 0;
1444  //lockWaitTime = 0;
1445  fileListIndex_++;
1447  }
1448  else {
1449  if (!fileListLoopMode_)
1451  else {
1452  //loop through files until interrupted
1454  fileListIndex_=0;
1455  return getFile(ls,nextFile,fsize,lockWaitTime);
1456  }
1457  }
1458 }
std::vector< std::string > fileNames_
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
def ls(path, rec=False)
Definition: eostools.py:348
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent ( )
inlineprivate

Definition at line 375 of file FedRawDataInputSource.cc.

References InputFile::advance(), bufferInputRead_, InputFile::bufferPosition_, chunkIsFree_, InputFile::chunkPosition_, InputFile::chunks_, crc32c(), InputFile::currentChunk_, currentFile_, currentFileIndex_, currentLumiSection_, cvWakeup_, daqDirector_, deleteFile(), detectedFRDversion_, event_, eventChunkSize_, eventsThisLumi_, Exception, exceptionState(), fileDeleteLock_, fileListMode_, InputFile::fileName_, fileQueue_, InputFile::fileSize_, filesToDelete_, fms_, FRDHeaderVersionSize, freeChunks_, getLSFromFilename_, evf::FastMonitoringThread::inCachedEvent, evf::FastMonitoringThread::inChecksumEvent, evf::FastMonitoringThread::inChunkReceived, evf::FastMonitoringThread::inNewLumi, evf::FastMonitoringThread::inProcessingFile, evf::FastMonitoringThread::inRunEnd, evf::FastMonitoringThread::inWaitChunk, evf::FastMonitoringThread::inWaitInput, evf::EvFDaqDirector::isSingleStreamThread(), dttmaxenums::L, InputFile::lumi_, maybeOpenNewLumiSection(), InputFile::moveToPreviousChunk(), mWakeup_, InputFile::nChunks_, InputFile::nEvents_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, InputFile::nProcessed_, InputFile::parent_, readingFilesCount_, readNextChunkIntoBuffer(), reportEventsThisLumiInSource(), InputFile::rewindChunk(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, evf::EvFDaqDirector::sameFile, evf::FastMonitoringService::setExceptionDetected(), setExceptionState_, evf::FastMonitoringService::setInState(), singleBufferMode_, mps_update::status, InputFile::status_, threadError(), mps_check::timeout, verifyAdler32_, verifyChecksum_, and InputFile::waitForChunk().

Referenced by nextEvent().

376 {
377 
379  if (!currentFile_)
380  {
383  if (!fileQueue_.try_pop(currentFile_))
384  {
385  //sleep until wakeup (only in single-buffer mode) or timeout
386  std::unique_lock<std::mutex> lkw(mWakeup_);
387  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_)
389  }
390  status = currentFile_->status_;
391  if ( status == evf::EvFDaqDirector::runEnded)
392  {
394  delete currentFile_;
395  currentFile_=nullptr;
396  return status;
397  }
398  else if ( status == evf::EvFDaqDirector::runAbort)
399  {
400  throw cms::Exception("FedRawDataInputSource::getNextEvent") << "Run has been aborted by the input source reader thread";
401  }
402  else if (status == evf::EvFDaqDirector::newLumi)
403  {
405  if (getLSFromFilename_) {
408  eventsThisLumi_=0;
410  }
411  }
412  else {//let this be picked up from next event
414  }
415 
416  delete currentFile_;
417  currentFile_=nullptr;
418  return status;
419  }
420  else if (status == evf::EvFDaqDirector::newFile) {
422  }
423  else
424  assert(0);
425  }
427 
428  //file is empty
429  if (!currentFile_->fileSize_) {
431  //try to open new lumi
432  assert(currentFile_->nChunks_==0);
433  if (getLSFromFilename_)
436  eventsThisLumi_=0;
438  }
439  //immediately delete empty file
441  delete currentFile_;
442  currentFile_=nullptr;
444  }
445 
446  //file is finished
449  //release last chunk (it is never released elsewhere)
452  {
453  throw cms::Exception("FedRawDataInputSource::getNextEvent")
454  << "Fully processed " << currentFile_->nProcessed_
455  << " from the file " << currentFile_->fileName_
456  << " but according to BU JSON there should be "
457  << currentFile_->nEvents_ << " events";
458  }
459  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
460  if (singleBufferMode_) {
461  std::unique_lock<std::mutex> lkw(mWakeup_);
462  cvWakeup_.notify_one();
463  }
466  //put the file in pending delete list;
467  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
468  filesToDelete_.push_back(std::pair<int,InputFile*>(currentFileIndex_,currentFile_));
469  }
470  else {
471  //in single-thread and stream jobs, events are already processed
473  delete currentFile_;
474  }
475  currentFile_=nullptr;
477  }
478 
479 
480  //file is too short
482  {
483  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
484  "Premature end of input file while reading event header";
485  }
486  if (singleBufferMode_) {
487 
488  //should already be there
491  usleep(10000);
493  }
495 
496  unsigned char *dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
497 
498  //conditions when read amount is not sufficient for the header to fit
499  if (!bufferInputRead_ || bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_]
501  {
503 
504  if (detectedFRDversion_==0) {
505  detectedFRDversion_=*((uint32*)dataPosition);
506  if (detectedFRDversion_>5)
507  throw cms::Exception("FedRawDataInputSource::getNextEvent")
508  << "Unknown FRD version -: " << detectedFRDversion_;
509  assert(detectedFRDversion_>=1);
510  }
511 
512  //recalculate chunk position
513  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
514  if ( bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_])
515  {
516  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
517  "Premature end of input file while reading event header";
518  }
519  }
520 
521  event_.reset( new FRDEventMsgView(dataPosition) );
522  if (event_->size()>eventChunkSize_) {
523  throw cms::Exception("FedRawDataInputSource::getNextEvent")
524  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
525  << " run:" << event_->run() << " of size:" << event_->size()
526  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
527  }
528 
529  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
530 
532  {
533  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
534  "Premature end of input file while reading event data";
535  }
536  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
538  //recalculate chunk position
539  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
540  event_.reset( new FRDEventMsgView(dataPosition) );
541  }
542  currentFile_->bufferPosition_ += event_->size();
543  currentFile_->chunkPosition_ += event_->size();
544  //last chunk is released when this function is invoked next time
545 
546  }
547  //multibuffer mode:
548  else
549  {
550  //wait for the current chunk to become added to the vector
553  usleep(10000);
555  }
557 
558  //check if header is at the boundary of two chunks
559  chunkIsFree_ = false;
560  unsigned char *dataPosition;
561 
562  //read header, copy it to a single chunk if necessary
563  bool chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]);
564 
565  event_.reset( new FRDEventMsgView(dataPosition) );
566  if (event_->size()>eventChunkSize_) {
567  throw cms::Exception("FedRawDataInputSource::getNextEvent")
568  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
569  << " run:" << event_->run() << " of size:" << event_->size()
570  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
571  }
572 
573  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
574 
576  {
577  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
578  "Premature end of input file while reading event data";
579  }
580 
581  if (chunkEnd) {
582  //header was at the chunk boundary, we will have to move payload as well
583  currentFile_->moveToPreviousChunk(msgSize,FRDHeaderVersionSize[detectedFRDversion_]);
584  chunkIsFree_ = true;
585  }
586  else {
587  //header was contiguous, but check if payload fits the chunk
588  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
589  //rewind to header start position
590  currentFile_->rewindChunk(FRDHeaderVersionSize[detectedFRDversion_]);
591  //copy event to a chunk start and move pointers
592  chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]+msgSize);
593  assert(chunkEnd);
594  chunkIsFree_=true;
595  //header is moved
596  event_.reset( new FRDEventMsgView(dataPosition) );
597  }
598  else {
599  //everything is in a single chunk, only move pointers forward
600  chunkEnd = currentFile_->advance(dataPosition,msgSize);
601  assert(!chunkEnd);
602  chunkIsFree_=false;
603  }
604  }
605  }//end multibuffer mode
607 
608  if (verifyChecksum_ && event_->version() >= 5)
609  {
610  uint32_t crc=0;
611  crc = crc32c(crc,(const unsigned char*)event_->payload(),event_->eventSize());
612  if ( crc != event_->crc32c() ) {
614  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
615  "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() <<
616  " but calculated 0x" << crc;
617  }
618  }
619  else if ( verifyAdler32_ && event_->version() >= 3)
620  {
621  uint32_t adler = adler32(0L,Z_NULL,0);
622  adler = adler32(adler,(Bytef*)event_->payload(),event_->eventSize());
623 
624  if ( adler != event_->adler32() ) {
626  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
627  "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() <<
628  " but calculated 0x" << adler;
629  }
630  }
632 
634 
636 }
unsigned int lumi_
uint32_t chunkPosition_
std::condition_variable cvWakeup_
void setExceptionDetected(unsigned int ls)
void maybeOpenNewLumiSection(const uint32_t lumiSection)
void setInState(FastMonitoringThread::InputState inputState)
void rewindChunk(const size_t size)
const uint32 FRDHeaderVersionSize[6]
evf::EvFDaqDirector::FileStatus status_
int timeout
Definition: mps_check.py:51
FedRawDataInputSource * parent_
uint32_t bufferPosition_
bool advance(unsigned char *&dataPosition, const size_t size)
void moveToPreviousChunk(const size_t size, const size_t offset)
bool waitForChunk(unsigned int chunkid)
unsigned int uint32
Definition: MsgTools.h:13
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
Definition: crc32c.cc:340
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
unsigned int currentChunk_
tbb::concurrent_queue< InputFile * > fileQueue_
std::unique_ptr< FRDEventMsgView > event_
std::string fileName_
tbb::concurrent_vector< InputChunk * > chunks_
evf::EvFDaqDirector * daqDirector_
std::atomic< unsigned int > readingFilesCount_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
void deleteFile(std::string const &)
evf::FastMonitoringService * fms_
tbb::concurrent_queue< InputChunk * > freeChunks_
void readNextChunkIntoBuffer(InputFile *file)
int FedRawDataInputSource::grabNextJsonFile ( boost::filesystem::path const &  jsonSourcePath)
private

Definition at line 799 of file FedRawDataInputSource.cc.

References popcon2dropbox::copy(), daqDirector_, data, jsoncollector::DataPoint::deserialize(), reco::dp, dpd_, MillePedeFileConverter_cfg::e, cppFunctionSkipper::exception, Exception, fuOutputDir_, jsoncollector::DataPoint::getData(), Json::Reader::getFormatedErrorMessages(), jsoncollector::DataPointDefinition::getNames(), mps_fire::i, LogDebug, Json::Reader::parse(), callgraph::path, matplotRender::reader, MatrixUtil::remove(), AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, and evf::EvFDaqDirector::unlockFULocal().

Referenced by readSupervisor().

800 {
802  try {
803  // assemble json destination path
805 
806  //TODO:should be ported to use fffnaming
807  std::ostringstream fileNameWithPID;
808  fileNameWithPID << jsonSourcePath.stem().string() << "_pid"
809  << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
810  jsonDestPath /= fileNameWithPID.str();
811 
812  LogDebug("FedRawDataInputSource") << "JSON rename -: " << jsonSourcePath << " to "
813  << jsonDestPath;
814  try {
815  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
816  }
817  catch (const boost::filesystem::filesystem_error& ex)
818  {
819  // Input dir gone?
820  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
821  // << " Maybe the file is not yet visible by FU. Trying again in one second";
822  sleep(1);
823  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
824  }
826 
827  try {
828  //sometimes this fails but file gets deleted
829  boost::filesystem::remove(jsonSourcePath);
830  }
831  catch (const boost::filesystem::filesystem_error& ex)
832  {
833  // Input dir gone?
834  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
835  }
836  catch (std::exception& ex)
837  {
838  // Input dir gone?
839  edm::LogError("FedRawDataInputSource") << "grabNextFile std::exception CAUGHT -: " << ex.what();
840  }
841 
842  boost::filesystem::ifstream ij(jsonDestPath);
843  Json::Value deserializeRoot;
845 
846  std::stringstream ss;
847  ss << ij.rdbuf();
848  if (!reader.parse(ss.str(), deserializeRoot)) {
849  edm::LogError("FedRawDataInputSource") << "Failed to deserialize JSON file -: " << jsonDestPath
850  << "\nERROR:\n" << reader.getFormatedErrorMessages()
851  << "CONTENT:\n" << ss.str()<<".";
852  throw std::runtime_error("Cannot deserialize input JSON file");
853  }
854 
855  //read BU JSON
857  DataPoint dp;
858  dp.deserialize(deserializeRoot);
859  bool success = false;
860  for (unsigned int i=0;i<dpd_->getNames().size();i++) {
861  if (dpd_->getNames().at(i)=="NEvents")
862  if (i<dp.getData().size()) {
863  data = dp.getData()[i];
864  success=true;
865  }
866  }
867  if (!success) {
868  if (!dp.getData().empty())
869  data = dp.getData()[0];
870  else
871  throw cms::Exception("FedRawDataInputSource::grabNextJsonFile") <<
872  " error reading number of events from BU JSON -: No input value " << data;
873  }
874  return boost::lexical_cast<int>(data);
875 
876  }
877  catch (const boost::filesystem::filesystem_error& ex)
878  {
879  // Input dir gone?
881  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
882  }
883  catch (std::runtime_error e)
884  {
885  // Another process grabbed the file and NFS did not register this
887  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - runtime Exception -: " << e.what();
888  }
889 
890  catch( boost::bad_lexical_cast const& ) {
891  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - error parsing number of events from BU JSON. "
892  << "Input value is -: " << data;
893  }
894 
895  catch (std::exception e)
896  {
897  // BU run directory disappeared?
899  edm::LogError("FedRawDataInputSource") << "grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
900  }
901 
902  return -1;
903 }
#define LogDebug(id)
std::vector< std::string > & getData()
Definition: DataPoint.h:58
jsoncollector::DataPointDefinition * dpd_
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
Represents a JSON value.
Definition: value.h:111
auto dp
Definition: deltaR.h:22
void deserialize(Json::Value &root) override
Definition: DataPoint.cc:56
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:209
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
evf::EvFDaqDirector * daqDirector_
Unserialize a JSON document into a Value.
Definition: reader.h:16
std::vector< std::string > const & getNames()
std::string getFormatedErrorMessages() const
Returns a user friendly string that list errors in the parsed document.
long FedRawDataInputSource::initFileList ( )
private

Definition at line 1398 of file FedRawDataInputSource.cc.

References a, b, end, MillePedeFileConverter_cfg::fileName, fileNames_, callgraph::path, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FedRawDataInputSource().

1399 {
1400  std::sort(fileNames_.begin(),fileNames_.end(),
1401  [](std::string a, std::string b) {
1402  if (a.rfind("/")!=std::string::npos) a=a.substr(a.rfind("/"));
1403  if (b.rfind("/")!=std::string::npos) b=b.substr(b.rfind("/"));
1404  return b > a;});
1405 
1406  if (!fileNames_.empty()) {
1407  //get run number from first file in the vector
1409  std::string fileStem = fileName.stem().string();
1410  auto end = fileStem.find("_");
1411  if (fileStem.find("run")==0) {
1412  std::string runStr = fileStem.substr(3,end-3);
1413  try {
1414  //get long to support test run numbers < 2^32
1415  long rval = boost::lexical_cast<long>(runStr);
1416  edm::LogInfo("FedRawDataInputSource") << "Autodetected run number in fileListMode -: "<< rval;
1417  return rval;
1418  }
1419  catch( boost::bad_lexical_cast const& ) {
1420  edm::LogWarning("FedRawDataInputSource") << "Unable to autodetect run number in fileListMode from file -: "<< fileName;
1421  }
1422  }
1423  }
1424  return -1;
1425 }
std::vector< std::string > fileNames_
#define end
Definition: vmac.h:37
unsigned long long int rval
Definition: vlib.h:22
double b
Definition: hdecay.h:120
double a
Definition: hdecay.h:121
void FedRawDataInputSource::maybeOpenNewLumiSection ( const uint32_t  lumiSection)
private

Definition at line 324 of file FedRawDataInputSource.cc.

References createBoLSFile(), currentLumiSection_, daqDirector_, runEdmFileComparison::found, evf::EvFDaqDirector::getEoLSFilePathOnFU(), edm::Timestamp::invalidTimestamp(), evf::EvFDaqDirector::lockFULocal2(), edm::InputSource::luminosityBlock(), edm::InputSource::luminosityBlockAuxiliary(), processHistoryID_, edm::InputSource::resetLuminosityBlockAuxiliary(), edm::InputSource::run(), edm::InputSource::runAuxiliary(), edm::InputSource::setLuminosityBlockAuxiliary(), trackingPlots::stat, AlCaHLTBitMon_QueryRunRegistry::string, and evf::EvFDaqDirector::unlockFULocal2().

Referenced by checkNextEvent(), and getNextEvent().

325 {
327  || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
328 
329  if ( currentLumiSection_ > 0) {
330  const std::string fuEoLS =
332  struct stat buf;
333  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
334  if ( !found ) {
336  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
337  close(eol_fd);
338  createBoLSFile(lumiSection,false);
340  }
341  }
342  else createBoLSFile(lumiSection,true);//needed for initial lumisection
343 
344  currentLumiSection_ = lumiSection;
345 
347 
348  timeval tv;
349  gettimeofday(&tv, nullptr);
350  const edm::Timestamp lsopentime( (unsigned long long) tv.tv_sec * 1000000 + (unsigned long long) tv.tv_usec );
351 
352  edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary =
354  runAuxiliary()->run(),
355  lumiSection, lsopentime,
357 
358  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
359  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
360 
361  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: "<< lumiSection;
362  }
363 }
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists)
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:504
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:346
edm::ProcessHistoryID processHistoryID_
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:510
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:250
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:354
evf::EvFDaqDirector * daqDirector_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:253
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::nextEvent ( )
inlineprivate

Definition at line 365 of file FedRawDataInputSource.cc.

References getNextEvent(), svgfig::load(), evf::EvFDaqDirector::noFile, edm::shutdown_flag, and mps_update::status.

Referenced by checkNextEvent().

366 {
368  while ((status = getNextEvent())==evf::EvFDaqDirector::noFile)
369  {
370  if (edm::shutdown_flag.load(std::memory_order_relaxed)) break;
371  }
372  return status;
373 }
volatile std::atomic< bool > shutdown_flag
def load(fileName)
Definition: svgfig.py:546
evf::EvFDaqDirector::FileStatus getNextEvent()
void FedRawDataInputSource::read ( edm::EventPrincipal eventPrincipal)
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 671 of file FedRawDataInputSource.cc.

References printConversionInfo::aux, edm::DaqProvenanceHelper::branchDescription(), checkEvery_, chunkIsFree_, InputFile::chunks_, InputFile::currentChunk_, currentFile_, currentFileIndex_, currentLumiSection_, daqProvenanceHelper_, deleteFile(), edm::DaqProvenanceHelper::dummyProvenance(), eventID_, eventRunNumber_, eventsThisLumi_, fileDeleteLock_, fileListLoopMode_, filesToDelete_, fillFEDRawDataCollection(), fms_, freeChunks_, GTPEventID_, mps_fire::i, evf::FastMonitoringThread::inNoRequest, evf::FastMonitoringThread::inReadCleanup, evf::FastMonitoringThread::inReadEvent, L1EventID_, edm::RawInputSource::makeEvent(), evf::evtn::makeEventAuxiliary(), eostools::move(), InputFile::nProcessed_, nStreams_, edm::EventAuxiliary::PhysicsTrigger, edm::InputSource::processGUID(), processHistoryID_, edm::EventPrincipal::put(), evf::FastMonitoringService::setInState(), edm::EventAuxiliary::setProcessHistoryID(), streamFileTracker_, edm::EventPrincipal::streamID(), tcds_pointer_, and useL1EventID_.

Referenced by edmIntegrityCheck.PublishToFileSystem::get(), Vispa.Plugins.EdmBrowser.EdmDataAccessor.EdmDataAccessor::goto(), readNextChunkIntoBuffer(), readWorker(), and Vispa.Plugins.EdmBrowser.EdmDataAccessor.EdmDataAccessor::setFilterBranches().

672 {
674  std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
675  edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData);
676 
677  if (useL1EventID_){
679  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
681  aux.setProcessHistoryID(processHistoryID_);
682  makeEvent(eventPrincipal, aux);
683  }
684  else if(tcds_pointer_==nullptr){
685  assert(GTPEventID_);
687  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
689  aux.setProcessHistoryID(processHistoryID_);
690  makeEvent(eventPrincipal, aux);
691  }
692  else{
693  tcds::Raw_v1 const* tcds = reinterpret_cast<tcds::Raw_v1 const*>(tcds_pointer_);
698  makeEvent(eventPrincipal, aux);
699  }
700 
701 
702 
703  std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
704 
705  eventPrincipal.put(daqProvenanceHelper_.branchDescription(), std::move(edp),
707 
708  eventsThisLumi_++;
710 
711  //resize vector if needed
712  while (streamFileTracker_.size() <= eventPrincipal.streamID())
713  streamFileTracker_.push_back(-1);
714 
716 
717  //this old file check runs no more often than every 10 events
718  if (!((currentFile_->nProcessed_-1)%(checkEvery_))) {
719  //delete files that are not in processing
720  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
721  auto it = filesToDelete_.begin();
722  while (it!=filesToDelete_.end()) {
723  bool fileIsBeingProcessed = false;
724  for (unsigned int i=0;i<nStreams_;i++) {
725  if (it->first == streamFileTracker_.at(i)) {
726  fileIsBeingProcessed = true;
727  break;
728  }
729  }
730  if (!fileIsBeingProcessed) {
731  deleteFile(it->second->fileName_);
732  delete it->second;
733  it = filesToDelete_.erase(it);
734  }
735  else it++;
736  }
737 
738  }
740  chunkIsFree_=false;
742  return;
743 }
void setInState(FastMonitoringThread::InputState inputState)
ProductProvenance const & dummyProvenance() const
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:209
Definition: TCDSRaw.h:18
std::vector< int > streamFileTracker_
StreamID streamID() const
BranchDescription const & branchDescription() const
edm::ProcessHistoryID processHistoryID_
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance) const
const edm::DaqProvenanceHelper daqProvenanceHelper_
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
unsigned int currentChunk_
void setProcessHistoryID(ProcessHistoryID const &phid)
tbb::concurrent_vector< InputChunk * > chunks_
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
void deleteFile(std::string const &)
evf::FastMonitoringService * fms_
tbb::concurrent_queue< InputChunk * > freeChunks_
edm::EventAuxiliary makeEventAuxiliary(const tcds::Raw_v1 *, unsigned int runNumber, unsigned int lumiSection, std::string const &processGUID, bool verifyLumiSection)
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection &)
def move(src, dest)
Definition: eostools.py:510
void FedRawDataInputSource::readNextChunkIntoBuffer ( InputFile file)
private

Definition at line 1319 of file FedRawDataInputSource.cc.

References bufferInputRead_, InputFile::chunkPosition_, InputFile::chunks_, eventChunkBlock_, eventChunkSize_, Exception, fileDescriptor_, InputFile::fileName_, InputFile::fileSize_, mps_fire::i, plotBeamSpotDB::last, LogDebug, read(), and readBlocks_.

Referenced by getNextEvent().

1320 {
1321 
1322  if (fileDescriptor_<0) {
1323  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1324  bufferInputRead_ = 0;
1325  //off_t pos = lseek(fileDescriptor,0,SEEK_SET);
1326  if (fileDescriptor_>=0)
1327  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1328  else
1329  {
1330  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer") << "failed to open file " << std::endl
1331  << file->fileName_ << " fd:" << fileDescriptor_;
1332  }
1333  }
1334 
1335  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1336  uint32_t existingSize = 0;
1337  for (unsigned int i=0;i<readBlocks_;i++)
1338  {
1339  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1340  bufferInputRead_+=last;
1341  existingSize+=last;
1342  }
1343  }
1344  else {
1345  const uint32_t chunksize = file->chunkPosition_;
1346  const uint32_t blockcount=chunksize/eventChunkBlock_;
1347  const uint32_t leftsize = chunksize%eventChunkBlock_;
1348  uint32_t existingSize = eventChunkSize_ - file->chunkPosition_;
1349  memmove((void*) file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSize);
1350 
1351  for (uint32_t i=0;i<blockcount;i++) {
1352  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1353  bufferInputRead_+=last;
1354  existingSize+=last;
1355  }
1356  if (leftsize) {
1357  const ssize_t last = ::read(fileDescriptor_,( void*)( file->chunks_[0]->buf_ + existingSize ), leftsize);
1358  bufferInputRead_+=last;
1359  }
1360  file->chunkPosition_=0;//data was moved to beginning of the chunk
1361  }
1362  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1363  if (fileDescriptor_!=-1)
1364  {
1365  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1366  close(fileDescriptor_);
1367  fileDescriptor_=-1;
1368  }
1369  }
1370 }
#define LogDebug(id)
uint32_t chunkPosition_
void read(edm::EventPrincipal &eventPrincipal) override
std::string fileName_
tbb::concurrent_vector< InputChunk * > chunks_
void FedRawDataInputSource::readSupervisor ( )
private

Definition at line 909 of file FedRawDataInputSource.cc.

References InputFile::chunks_, cvReader_, cvWakeup_, daqDirector_, eventChunkSize_, fileListMode_, fileQueue_, fms_, freeChunks_, getFile(), getLSFromFilename_, grabNextJsonFile(), mps_fire::i, InputFile, evf::FastMonitoringThread::inRunEnd, evf::FastMonitoringThread::inSupBusy, evf::FastMonitoringThread::inSupFileLimit, evf::FastMonitoringThread::inSupLockPolling, evf::FastMonitoringThread::inSupNewFile, evf::FastMonitoringThread::inSupNewFileWaitChunk, evf::FastMonitoringThread::inSupNewFileWaitChunkCopying, evf::FastMonitoringThread::inSupNewFileWaitThread, evf::FastMonitoringThread::inSupNewFileWaitThreadCopying, evf::FastMonitoringThread::inSupNoFile, evf::FastMonitoringThread::inSupWaitFreeChunk, evf::FastMonitoringThread::inSupWaitFreeChunkCopying, evf::FastMonitoringThread::inSupWaitFreeThread, evf::FastMonitoringThread::inSupWaitFreeThreadCopying, LogDebug, eostools::ls(), maxBufferedFiles_, mReader_, mWakeup_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, callgraph::path, quit_threads_, InputChunk::readComplete_, readingFilesCount_, evf::FastMonitoringService::reportLockWait(), InputChunk::reset(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, setExceptionState_, evf::FastMonitoringService::setInStateSup(), edm::shutdown_flag, singleBufferMode_, evf::FastMonitoringService::startedLookingForFile(), startupCv_, startupLock_, trackingPlots::stat, mps_update::status, evf::FastMonitoringService::stoppedLookingForFile(), AlCaHLTBitMon_QueryRunRegistry::string, thread_quit_signal, tid_active_, mps_check::timeout, evf::EvFDaqDirector::updateFuLock(), workerJob_, workerPool_, and workerThreads_.

Referenced by checkNextEvent().

910 {
911  bool stop=false;
912  unsigned int currentLumiSection = 0;
913  //threadInit_.exchange(true,std::memory_order_acquire);
914 
915  {
916  std::unique_lock<std::mutex> lk(startupLock_);
917  startupCv_.notify_one();
918  }
919 
920  uint32_t ls=0;
921  uint32_t monLS=1;
922  uint32_t lockCount=0;
923  uint64_t sumLockWaitTimeUs=0.;
924 
925  while (!stop) {
926 
927  //wait for at least one free thread and chunk
928  int counter=0;
930  {
931  //report state to monitoring
932  if (fms_) {
933  bool copy_active=false;
934  for (auto j : tid_active_) if (j) copy_active=true;
936  else if (freeChunks_.empty()) {
937  if (copy_active)
939  else
941  }
942  else {
943  if (copy_active)
945  else
947  }
948  }
949  std::unique_lock<std::mutex> lkw(mWakeup_);
950  //sleep until woken up by condition or a timeout
951  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
952  counter++;
953  //if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads...";
954  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
955  }
956  else {
957  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
958  }
959  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {stop=true;break;}
960  }
961 
962  if (stop) break;
963 
964  //look for a new file
965  std::string nextFile;
966  uint32_t fileSize;
967 
968  if (fms_) {
972  }
973 
975 
976  while (status == evf::EvFDaqDirector::noFile) {
977  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
978  stop=true;
979  break;
980  }
981 
982  uint64_t thisLockWaitTimeUs=0.;
983  if (fileListMode_) {
984  //return LS if LS not set, otherwise return file
985  status = getFile(ls,nextFile,fileSize,thisLockWaitTimeUs);
986  }
987  else
988  status = daqDirector_->updateFuLock(ls,nextFile,fileSize,thisLockWaitTimeUs);
989 
991 
992  if (currentLumiSection!=ls && status==evf::EvFDaqDirector::runEnded) status=evf::EvFDaqDirector::noFile;
993 
994  //monitoring of lock wait time
995  if (thisLockWaitTimeUs>0.)
996  sumLockWaitTimeUs+=thisLockWaitTimeUs;
997  lockCount++;
998  if (ls>monLS) {
999  monLS=ls;
1000  if (lockCount)
1001  if (fms_) fms_->reportLockWait(monLS,sumLockWaitTimeUs,lockCount);
1002  lockCount=0;
1003  sumLockWaitTimeUs=0;
1004  }
1005 
1006  //check again for any remaining index/EoLS files after EoR file is seen
1007  if ( status == evf::EvFDaqDirector::runEnded && !fileListMode_) {
1009  usleep(100000);
1010  //now all files should have appeared in ramdisk, check again if any raw files were left behind
1011  status = daqDirector_->updateFuLock(ls,nextFile,fileSize,thisLockWaitTimeUs);
1012  if (currentLumiSection!=ls && status==evf::EvFDaqDirector::runEnded) status=evf::EvFDaqDirector::noFile;
1013  }
1014 
1015  if ( status == evf::EvFDaqDirector::runEnded) {
1017  stop=true;
1018  break;
1019  }
1020 
1021  //queue new lumisection
1022  if( getLSFromFilename_ && ls > currentLumiSection) {
1023  //fms_->setInStateSup(evf::FastMonitoringThread::inSupNewLumi);
1024  currentLumiSection = ls;
1025  fileQueue_.push(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
1026  }
1027 
1028  if( getLSFromFilename_ && currentLumiSection>0 && ls < currentLumiSection) {
1029  edm::LogError("FedRawDataInputSource") << "Got old LS ("<<ls<<") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<". Aborting execution."<<std::endl;
1031  stop=true;
1032  break;
1033  }
1034 
1035  int dbgcount=0;
1036  if (status == evf::EvFDaqDirector::noFile) {
1038  dbgcount++;
1039  if (!(dbgcount%20)) LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
1040  usleep(100000);
1041  }
1042  }
1043  if ( status == evf::EvFDaqDirector::newFile ) {
1045  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
1046 
1047 
1048  boost::filesystem::path rawFilePath(nextFile);
1049  std::string rawFile = rawFilePath.replace_extension(".raw").string();
1050 
1051  struct stat st;
1052  int stat_res = stat(rawFile.c_str(),&st);
1053  if (stat_res==-1) {
1054  edm::LogError("FedRawDataInputSource") << "Can not stat file (" << errno << "):-"<< rawFile << std::endl;
1055  setExceptionState_=true;
1056  break;
1057  }
1058  fileSize=st.st_size;
1059 
1060  if (fms_) {
1064  }
1065  int eventsInNewFile;
1066  if (fileListMode_) {
1067  if (fileSize==0) eventsInNewFile=0;
1068  else eventsInNewFile=-1;
1069  }
1070  else {
1071  eventsInNewFile = grabNextJsonFile(nextFile);
1072  assert( eventsInNewFile>=0 );
1073  assert((eventsInNewFile>0) == (fileSize>0));//file without events must be empty
1074  }
1075 
1076  if (!singleBufferMode_) {
1077  //calculate number of needed chunks
1078  unsigned int neededChunks = fileSize/eventChunkSize_;
1079  if (fileSize%eventChunkSize_) neededChunks++;
1080 
1081  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,this);
1083  fileQueue_.push(newInputFile);
1084 
1085  for (unsigned int i=0;i<neededChunks;i++) {
1086 
1087  if (fms_) {
1088  bool copy_active=false;
1089  for (auto j : tid_active_) if (j) copy_active=true;
1090  if (copy_active)
1092  else
1094  }
1095  //get thread
1096  unsigned int newTid = 0xffffffff;
1097  while (!workerPool_.try_pop(newTid)) {
1098  usleep(100000);
1099  }
1100 
1101  if (fms_) {
1102  bool copy_active=false;
1103  for (auto j : tid_active_) if (j) copy_active=true;
1104  if (copy_active)
1106  else
1108  }
1109  InputChunk * newChunk = nullptr;
1110  while (!freeChunks_.try_pop(newChunk)) {
1111  usleep(100000);
1112  if (quit_threads_.load(std::memory_order_relaxed)) break;
1113  }
1114 
1115  if (newChunk == nullptr) {
1116  //return unused tid if we received shutdown (nullptr chunk)
1117  if (newTid!=0xffffffff) workerPool_.push(newTid);
1118  stop = true;
1119  break;
1120  }
1122 
1123  std::unique_lock<std::mutex> lk(mReader_);
1124 
1125  unsigned int toRead = eventChunkSize_;
1126  if (i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
1127  newChunk->reset(i*eventChunkSize_,toRead,i);
1128 
1129  workerJob_[newTid].first=newInputFile;
1130  workerJob_[newTid].second=newChunk;
1131 
1132  //wake up the worker thread
1133  cvReader_[newTid]->notify_one();
1134  }
1135  }
1136  else {
1137  if (!eventsInNewFile) {
1138  //still queue file for lumi update
1139  std::unique_lock<std::mutex> lkw(mWakeup_);
1140  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,this);
1142  fileQueue_.push(newInputFile);
1143  cvWakeup_.notify_one();
1144  break;
1145  }
1146  //in single-buffer mode put single chunk in the file and let the main thread read the file
1147  InputChunk * newChunk;
1148  //should be available immediately
1149  while(!freeChunks_.try_pop(newChunk)) usleep(100000);
1150 
1151  std::unique_lock<std::mutex> lkw(mWakeup_);
1152 
1153  unsigned int toRead = eventChunkSize_;
1154  if (fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
1155  newChunk->reset(0,toRead,0);
1156  newChunk->readComplete_=true;
1157 
1158  //push file and wakeup main thread
1159  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,this);
1160  newInputFile->chunks_[0]=newChunk;
1162  fileQueue_.push(newInputFile);
1163  cvWakeup_.notify_one();
1164  }
1165  }
1166  }
1168  //make sure threads finish reading
1169  unsigned numFinishedThreads = 0;
1170  while (numFinishedThreads < workerThreads_.size()) {
1171  unsigned int tid;
1172  while (!workerPool_.try_pop(tid)) {usleep(10000);}
1173  std::unique_lock<std::mutex> lk(mReader_);
1174  thread_quit_signal[tid]=true;
1175  cvReader_[tid]->notify_one();
1176  numFinishedThreads++;
1177  }
1178  for (unsigned int i=0;i<workerThreads_.size();i++) {
1179  workerThreads_[i]->join();
1180  delete workerThreads_[i];
1181  }
1182 }
#define LogDebug(id)
std::condition_variable cvWakeup_
tbb::concurrent_queue< unsigned int > workerPool_
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
volatile std::atomic< bool > shutdown_flag
void setInStateSup(FastMonitoringThread::InputState inputState)
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
int timeout
Definition: mps_check.py:51
std::vector< std::condition_variable * > cvReader_
void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
int grabNextJsonFile(boost::filesystem::path const &)
std::vector< std::thread * > workerThreads_
std::atomic< bool > readComplete_
def ls(path, rec=False)
Definition: eostools.py:348
unsigned long long uint64_t
Definition: Time.h:15
tbb::concurrent_queue< InputFile * > fileQueue_
std::condition_variable startupCv_
void stoppedLookingForFile(unsigned int lumi)
tbb::concurrent_vector< InputChunk * > chunks_
evf::EvFDaqDirector * daqDirector_
std::vector< unsigned int > thread_quit_signal
std::atomic< unsigned int > readingFilesCount_
evf::FastMonitoringService * fms_
std::vector< unsigned int > tid_active_
tbb::concurrent_queue< InputChunk * > freeChunks_
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
void FedRawDataInputSource::readWorker ( unsigned int  tid)
private

Definition at line 1184 of file FedRawDataInputSource.cc.

References InputChunk::buf_, InputFile::chunks_, cvReader_, detectedFRDversion_, diffTreeTool::diff, end, eventChunkBlock_, FrontierConditions_GlobalTag_cff::file, InputChunk::fileIndex_, InputFile::fileName_, plotBeamSpotDB::first, mps_fire::i, init, plotBeamSpotDB::last, LogDebug, mReader_, cmsPerfSuiteHarvest::now, InputChunk::offset_, read(), readBlocks_, InputChunk::readComplete_, edm::second(), setExceptionState_, command_line::start, startupCv_, startupLock_, thread_quit_signal, threadInit_, tid_active_, InputChunk::usedSize_, workerJob_, and workerPool_.

Referenced by FedRawDataInputSource().

1185 {
1186  bool init = true;
1187  threadInit_.exchange(true,std::memory_order_acquire);
1188 
1189  while (true) {
1190 
1191  tid_active_[tid]=false;
1192  std::unique_lock<std::mutex> lk(mReader_);
1193  workerJob_[tid].first=nullptr;
1194  workerJob_[tid].first=nullptr;
1195 
1196  assert(!thread_quit_signal[tid]);//should never get it here
1197  workerPool_.push(tid);
1198 
1199  if (init) {
1200  std::unique_lock<std::mutex> lk(startupLock_);
1201  init = false;
1202  startupCv_.notify_one();
1203  }
1204  cvReader_[tid]->wait(lk);
1205 
1206  if (thread_quit_signal[tid]) return;
1207  tid_active_[tid]=true;
1208 
1209  InputFile * file;
1210  InputChunk * chunk;
1211 
1212  assert(workerJob_[tid].first!=nullptr && workerJob_[tid].second!=nullptr);
1213 
1214  file = workerJob_[tid].first;
1215  chunk = workerJob_[tid].second;
1216 
1217  int fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1218  off_t pos = lseek(fileDescriptor,chunk->offset_,SEEK_SET);
1219 
1220 
1221  if (fileDescriptor>=0)
1222  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_ << " at offset " << pos;
1223  else
1224  {
1225  edm::LogError("FedRawDataInputSource") <<
1226  "readWorker failed to open file -: " << file->fileName_ << " fd:" << fileDescriptor <<
1227  " or seek to offset " << chunk->offset_ << ", lseek returned:" << pos;
1228  setExceptionState_=true;
1229  return;
1230 
1231  }
1232 
1233  unsigned int bufferLeft = 0;
1235  for (unsigned int i=0;i<readBlocks_;i++)
1236  {
1237  const ssize_t last = ::read(fileDescriptor,( void*) (chunk->buf_+bufferLeft), eventChunkBlock_);
1238  if ( last > 0 )
1239  bufferLeft+=last;
1240  if (last < eventChunkBlock_) {
1241  assert(chunk->usedSize_==i*eventChunkBlock_+last);
1242  break;
1243  }
1244  }
1246  auto diff = end-start;
1247  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1248  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB" << " in " << msec.count() << " ms ("<< (bufferLeft >> 20)/double(msec.count())<<" GB/s)";
1249  close(fileDescriptor);
1250 
1251  if (detectedFRDversion_==0 && chunk->offset_==0) detectedFRDversion_=*((uint32*)chunk->buf_);
1252  assert(detectedFRDversion_<=5);
1253  chunk->readComplete_=true;//this is atomic to secure the sequential buffer fill before becoming available for processing)
1254  file->chunks_[chunk->fileIndex_]=chunk;//put the completed chunk in the file chunk vector at predetermined index
1255 
1256  }
1257 }
#define LogDebug(id)
Definition: start.py:1
void read(edm::EventPrincipal &eventPrincipal) override
unsigned int offset_
tbb::concurrent_queue< unsigned int > workerPool_
int init
Definition: HydjetWrapper.h:67
std::vector< ReaderInfo > workerJob_
std::vector< std::condition_variable * > cvReader_
U second(std::pair< T, U > const &p)
unsigned char * buf_
#define end
Definition: vmac.h:37
unsigned int fileIndex_
unsigned int uint32
Definition: MsgTools.h:13
std::atomic< bool > readComplete_
std::string fileName_
std::condition_variable startupCv_
tbb::concurrent_vector< InputChunk * > chunks_
std::atomic< bool > threadInit_
std::vector< unsigned int > thread_quit_signal
std::vector< unsigned int > tid_active_
void FedRawDataInputSource::reportEventsThisLumiInSource ( unsigned int  lumi,
unsigned int  events 
)
private

Definition at line 1373 of file FedRawDataInputSource.cc.

References CommonMethods::lock(), monlock_, and sourceEventsReport_.

Referenced by checkNextEvent(), and getNextEvent().

1374 {
1375 
1376  std::lock_guard<std::mutex> lock(monlock_);
1377  auto itr = sourceEventsReport_.find(lumi);
1378  if (itr!=sourceEventsReport_.end())
1379  itr->second+=events;
1380  else
1382 }
std::map< unsigned int, unsigned int > sourceEventsReport_
void FedRawDataInputSource::rewind_ ( )
overrideprivatevirtual

Reimplemented from edm::InputSource.

Definition at line 905 of file FedRawDataInputSource.cc.

906 {}
void FedRawDataInputSource::threadError ( )
private

Definition at line 1259 of file FedRawDataInputSource.cc.

References Exception, and quit_threads_.

Referenced by getNextEvent().

1260 {
1261  quit_threads_=true;
1262  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1263 
1264 }
std::atomic< bool > quit_threads_

Friends And Related Function Documentation

friend struct InputChunk
friend

Definition at line 44 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

friend struct InputFile
friend

Definition at line 43 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

Member Data Documentation

uint32_t FedRawDataInputSource::bufferInputRead_ = 0
private

Definition at line 175 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readNextChunkIntoBuffer().

unsigned int FedRawDataInputSource::checkEvery_ = 10
private

Definition at line 166 of file FedRawDataInputSource.h.

Referenced by read().

bool FedRawDataInputSource::chunkIsFree_ =false
private

Definition at line 138 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

InputFile* FedRawDataInputSource::currentFile_ = 0
private

Definition at line 137 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

int FedRawDataInputSource::currentFileIndex_ = -1
private

Definition at line 160 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

unsigned int FedRawDataInputSource::currentLumiSection_
private
std::vector<std::condition_variable*> FedRawDataInputSource::cvReader_
private
std::condition_variable FedRawDataInputSource::cvWakeup_
private

Definition at line 170 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

evf::EvFDaqDirector* FedRawDataInputSource::daqDirector_ = 0
private
const edm::DaqProvenanceHelper FedRawDataInputSource::daqProvenanceHelper_
private

Definition at line 111 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and read().

std::string FedRawDataInputSource::defPath_
private

Definition at line 85 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

uint32 FedRawDataInputSource::detectedFRDversion_ =0
private

Definition at line 136 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readWorker().

jsoncollector::DataPointDefinition* FedRawDataInputSource::dpd_
private

Definition at line 126 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and grabNextJsonFile().

std::unique_ptr<FRDEventMsgView> FedRawDataInputSource::event_
private

Definition at line 113 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), fillFEDRawDataCollection(), and getNextEvent().

unsigned int FedRawDataInputSource::eventChunkBlock_
private
unsigned int FedRawDataInputSource::eventChunkSize_
private
edm::EventID FedRawDataInputSource::eventID_
private

Definition at line 115 of file FedRawDataInputSource.h.

Referenced by read().

uint32_t FedRawDataInputSource::eventRunNumber_ =0
private

Definition at line 119 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and read().

unsigned int FedRawDataInputSource::eventsThisLumi_
private

Definition at line 123 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), getNextEvent(), and read().

unsigned long FedRawDataInputSource::eventsThisRun_ = 0
private

Definition at line 124 of file FedRawDataInputSource.h.

std::mutex FedRawDataInputSource::fileDeleteLock_
private

Definition at line 163 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and read().

int FedRawDataInputSource::fileDescriptor_ = -1
private

Definition at line 174 of file FedRawDataInputSource.h.

Referenced by readNextChunkIntoBuffer().

unsigned int FedRawDataInputSource::fileListIndex_ = 0
private

Definition at line 104 of file FedRawDataInputSource.h.

Referenced by getFile().

const bool FedRawDataInputSource::fileListLoopMode_
private

Definition at line 105 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), FedRawDataInputSource(), getFile(), and read().

const bool FedRawDataInputSource::fileListMode_
private
std::vector<std::string> FedRawDataInputSource::fileNames_
private

Definition at line 100 of file FedRawDataInputSource.h.

Referenced by getFile(), and initFileList().

std::list<std::pair<int,std::string> > FedRawDataInputSource::fileNamesToDelete_
private

Definition at line 162 of file FedRawDataInputSource.h.

tbb::concurrent_queue<InputFile*> FedRawDataInputSource::fileQueue_
private

Definition at line 148 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

std::list<std::pair<int,InputFile*> > FedRawDataInputSource::filesToDelete_
private
evf::FastMonitoringService* FedRawDataInputSource::fms_ = 0
private
tbb::concurrent_queue<InputChunk*> FedRawDataInputSource::freeChunks_
private

Definition at line 147 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), read(), and readSupervisor().

std::string FedRawDataInputSource::fuOutputDir_
private

Definition at line 109 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and grabNextJsonFile().

const bool FedRawDataInputSource::getLSFromFilename_
private

Definition at line 96 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), getNextEvent(), and readSupervisor().

uint32_t FedRawDataInputSource::GTPEventID_ = 0
private

Definition at line 120 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

uint32_t FedRawDataInputSource::L1EventID_ = 0
private

Definition at line 121 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and read().

unsigned int FedRawDataInputSource::loopModeIterationInc_ = 0
private

Definition at line 106 of file FedRawDataInputSource.h.

Referenced by getFile().

unsigned int FedRawDataInputSource::maxBufferedFiles_
private

Definition at line 91 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

std::mutex FedRawDataInputSource::monlock_
private

Definition at line 180 of file FedRawDataInputSource.h.

Referenced by getEventReport(), and reportEventsThisLumiInSource().

std::mutex FedRawDataInputSource::mReader_
private

Definition at line 150 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), readWorker(), and ~FedRawDataInputSource().

std::mutex FedRawDataInputSource::mWakeup_
private

Definition at line 169 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

unsigned int FedRawDataInputSource::nStreams_ = 0
private

Definition at line 165 of file FedRawDataInputSource.h.

Referenced by read().

unsigned int FedRawDataInputSource::numBuffers_
private

Definition at line 90 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

unsigned int FedRawDataInputSource::numConcurrentReads_
private

Definition at line 92 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and ~FedRawDataInputSource().

edm::ProcessHistoryID FedRawDataInputSource::processHistoryID_
private

Definition at line 116 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), maybeOpenNewLumiSection(), and read().

std::atomic<bool> FedRawDataInputSource::quit_threads_
private
unsigned int FedRawDataInputSource::readBlocks_
private
std::atomic<unsigned int> FedRawDataInputSource::readingFilesCount_
private

Definition at line 93 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

std::unique_ptr<std::thread> FedRawDataInputSource::readSupervisorThread_
private

Definition at line 141 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and ~FedRawDataInputSource().

edm::RunNumber_t FedRawDataInputSource::runNumber_
private

Definition at line 108 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and FedRawDataInputSource().

bool FedRawDataInputSource::setExceptionState_ = false
private

Definition at line 156 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), readSupervisor(), and readWorker().

bool FedRawDataInputSource::singleBufferMode_
private

Definition at line 173 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

std::map<unsigned int,unsigned int> FedRawDataInputSource::sourceEventsReport_
private

Definition at line 179 of file FedRawDataInputSource.h.

Referenced by getEventReport(), and reportEventsThisLumiInSource().

bool FedRawDataInputSource::startedSupervisorThread_ = false
private

Definition at line 140 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and ~FedRawDataInputSource().

std::condition_variable FedRawDataInputSource::startupCv_
private
std::mutex FedRawDataInputSource::startupLock_
private
std::vector<int> FedRawDataInputSource::streamFileTracker_
private

Definition at line 164 of file FedRawDataInputSource.h.

Referenced by read().

unsigned char* FedRawDataInputSource::tcds_pointer_
private

Definition at line 122 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

std::vector<unsigned int> FedRawDataInputSource::thread_quit_signal
private
std::atomic<bool> FedRawDataInputSource::threadInit_
private

Definition at line 177 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and readWorker().

std::vector<unsigned int> FedRawDataInputSource::tid_active_
private

Definition at line 152 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), readSupervisor(), and readWorker().

const bool FedRawDataInputSource::useL1EventID_
private

Definition at line 99 of file FedRawDataInputSource.h.

Referenced by read().

const bool FedRawDataInputSource::verifyAdler32_
private

Definition at line 97 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

const bool FedRawDataInputSource::verifyChecksum_
private

Definition at line 98 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

std::vector<ReaderInfo> FedRawDataInputSource::workerJob_
private

Definition at line 145 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), readSupervisor(), and readWorker().

tbb::concurrent_queue<unsigned int> FedRawDataInputSource::workerPool_
private

Definition at line 144 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), and readWorker().

std::vector<std::thread*> FedRawDataInputSource::workerThreads_
private