CMS 3D CMS Logo

List of all members | Public Member Functions | Static Public Member Functions | Protected Member Functions | Private Types | Private Member Functions | Private Attributes | Friends
FedRawDataInputSource Class Reference

#include <FedRawDataInputSource.h>

Inheritance diagram for FedRawDataInputSource:
edm::RawInputSource edm::InputSource

Public Member Functions

 FedRawDataInputSource (edm::ParameterSet const &, edm::InputSourceDescription const &)
 
std::pair< bool, unsigned int > getEventReport (unsigned int lumi, bool erase)
 
 ~FedRawDataInputSource () override
 
- Public Member Functions inherited from edm::RawInputSource
 RawInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
 
 ~RawInputSource () override
 
- Public Member Functions inherited from edm::InputSource
std::shared_ptr< ActivityRegistryactReg () const
 Accessor for Activity Registry. More...
 
std::shared_ptr< BranchIDListHelper const > branchIDListHelper () const
 Accessors for branchIDListHelper. More...
 
std::shared_ptr< BranchIDListHelper > & branchIDListHelper ()
 
void closeFile (FileBlock *, bool cleaningUpAfterException)
 close current file More...
 
void doBeginJob ()
 Called by framework at beginning of job. More...
 
virtual void doBeginLumi (LuminosityBlockPrincipal &lbp, ProcessContext const *)
 Called by framework at beginning of lumi block. More...
 
virtual void doBeginRun (RunPrincipal &rp, ProcessContext const *)
 Called by framework at beginning of run. More...
 
void doEndJob ()
 Called by framework at end of job. More...
 
ProcessingController::ForwardState forwardState () const
 
bool goToEvent (EventID const &eventID)
 
 InputSource (ParameterSet const &, InputSourceDescription const &)
 Constructor. More...
 
 InputSource (InputSource const &)=delete
 
void issueReports (EventID const &eventID, StreamID streamID)
 issue an event report More...
 
LuminosityBlockNumber_t luminosityBlock () const
 Accessor for current luminosity block number. More...
 
std::shared_ptr< LuminosityBlockAuxiliaryluminosityBlockAuxiliary () const
 Called by the framework to merge or insert lumi in principal cache. More...
 
int maxEvents () const
 
int maxLuminosityBlocks () const
 
ModuleDescription const & moduleDescription () const
 Accessor for 'module' description. More...
 
ItemType nextItemType ()
 Advances the source to the next item. More...
 
InputSourceoperator= (InputSource const &)=delete
 
ProcessConfiguration const & processConfiguration () const
 Accessor for Process Configuration. More...
 
std::string const & processGUID () const
 Accessor for global process identifier. More...
 
ProcessHistoryRegistry const & processHistoryRegistry () const
 Accessors for process history registry. More...
 
ProcessHistoryRegistryprocessHistoryRegistry ()
 
ProcessingMode processingMode () const
 RunsLumisAndEvents (default), RunsAndLumis, or Runs. More...
 
std::shared_ptr< ProductRegistry const > productRegistry () const
 Accessors for product registry. More...
 
std::shared_ptr< ProductRegistry > & productRegistry ()
 
bool randomAccess () const
 
void readAndMergeLumi (LuminosityBlockPrincipal &lbp)
 Read next luminosity block (same as a prior lumi) More...
 
void readAndMergeRun (RunPrincipal &rp)
 Read next run (same as a prior run) More...
 
void readEvent (EventPrincipal &ep, StreamContext &)
 Read next event. More...
 
bool readEvent (EventPrincipal &ep, EventID const &, StreamContext &)
 Read a specific event. More...
 
std::unique_ptr< FileBlockreadFile ()
 Read next file. More...
 
void readLuminosityBlock (LuminosityBlockPrincipal &lumiPrincipal, HistoryAppender &historyAppender)
 Read next luminosity block (new lumi) More...
 
std::shared_ptr< LuminosityBlockAuxiliaryreadLuminosityBlockAuxiliary ()
 Read next luminosity block Auxilary. More...
 
void readRun (RunPrincipal &runPrincipal, HistoryAppender &historyAppender)
 Read next run (new run) More...
 
std::shared_ptr< RunAuxiliaryreadRunAuxiliary ()
 Read next run Auxiliary. More...
 
ProcessHistoryID const & reducedProcessHistoryID () const
 
virtual void registerProducts ()
 Register any produced products. More...
 
int remainingEvents () const
 
int remainingLuminosityBlocks () const
 
void repeat ()
 Reset the remaining number of events/lumis to the maximum number. More...
 
std::pair< SharedResourcesAcquirer *, std::recursive_mutex * > resourceSharedWithDelayedReader ()
 Returns nullptr if no resource shared between the Source and a DelayedReader. More...
 
ProcessingController::ReverseState reverseState () const
 
void rewind ()
 Begin again at the first event. More...
 
RunNumber_t run () const
 Accessor for current run number. More...
 
std::shared_ptr< RunAuxiliaryrunAuxiliary () const
 Called by the framework to merge or insert run in principal cache. More...
 
void setLuminosityBlockNumber_t (LuminosityBlockNumber_t lb)
 Set the luminosity block ID. More...
 
void setRunNumber (RunNumber_t r)
 Set the run number. More...
 
void skipEvents (int offset)
 
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper () const
 Accessors for thinnedAssociationsHelper. More...
 
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper ()
 
Timestamp const & timestamp () const
 Accessor for the current time, as seen by the input source. More...
 
virtual ~InputSource () noexcept(false)
 Destructor. More...
 

Static Public Member Functions

static void fillDescriptions (edm::ConfigurationDescriptions &descriptions)
 
- Static Public Member Functions inherited from edm::RawInputSource
static void fillDescription (ParameterSetDescription &description)
 
- Static Public Member Functions inherited from edm::InputSource
static const std::string & baseType ()
 
static void fillDescription (ParameterSetDescription &desc)
 
static void fillDescriptions (ConfigurationDescriptions &descriptions)
 
static void prevalidate (ConfigurationDescriptions &)
 

Protected Member Functions

bool checkNextEvent () override
 
void read (edm::EventPrincipal &eventPrincipal) override
 
- Protected Member Functions inherited from edm::RawInputSource
void makeEvent (EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
 
void setInputFileTransitionsEachEvent ()
 
- Protected Member Functions inherited from edm::InputSource
void decreaseRemainingEventsBy (int iSkipped)
 
bool eventCached () const
 
bool newLumi () const
 
bool newRun () const
 
ProcessHistoryRegistryprocessHistoryRegistryForUpdate ()
 
ProductRegistryproductRegistryUpdate ()
 
void reset () const
 
void resetEventCached ()
 
void resetLuminosityBlockAuxiliary (bool isNewLumi=true) const
 
void resetNewLumi ()
 
void resetNewRun ()
 
void resetRunAuxiliary (bool isNewRun=true) const
 
void setEventCached ()
 Called by the framework to merge or ached() const {return eventCached_;}. More...
 
void setLuminosityBlockAuxiliary (LuminosityBlockAuxiliary *lbp)
 
void setNewLumi ()
 
void setNewRun ()
 
void setRunAuxiliary (RunAuxiliary *rp)
 
void setTimestamp (Timestamp const &theTime)
 To set the current time, as seen by the input source. More...
 
virtual void skip (int offset)
 
ItemType state () const
 

Private Types

typedef std::pair< InputFile *, InputChunk * > ReaderInfo
 

Private Member Functions

void createBoLSFile (const uint32_t lumiSection, bool checkIfExists)
 
void deleteFile (std::string const &)
 
bool exceptionState ()
 
edm::Timestamp fillFEDRawDataCollection (FEDRawDataCollection &)
 
evf::EvFDaqDirector::FileStatus getFile (unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
 
evf::EvFDaqDirector::FileStatus getNextEvent ()
 
int grabNextJsonFile (boost::filesystem::path const &)
 
long initFileList ()
 
void maybeOpenNewLumiSection (const uint32_t lumiSection)
 
evf::EvFDaqDirector::FileStatus nextEvent ()
 
void readNextChunkIntoBuffer (InputFile *file)
 
void readSupervisor ()
 
void readWorker (unsigned int tid)
 
void reportEventsThisLumiInSource (unsigned int lumi, unsigned int events)
 
void rewind_ () override
 
void threadError ()
 

Private Attributes

uint32_t bufferInputRead_ = 0
 
unsigned int checkEvery_ = 10
 
bool chunkIsFree_ =false
 
InputFilecurrentFile_ = 0
 
int currentFileIndex_ = -1
 
unsigned int currentLumiSection_
 
std::vector< std::condition_variable * > cvReader_
 
std::condition_variable cvWakeup_
 
evf::EvFDaqDirectordaqDirector_ = 0
 
const edm::DaqProvenanceHelper daqProvenanceHelper_
 
std::string defPath_
 
uint32 detectedFRDversion_ =0
 
jsoncollector::DataPointDefinitiondpd_
 
std::unique_ptr< FRDEventMsgViewevent_
 
unsigned int eventChunkBlock_
 
unsigned int eventChunkSize_
 
edm::EventID eventID_
 
uint32_t eventRunNumber_ =0
 
unsigned int eventsThisLumi_
 
unsigned long eventsThisRun_ = 0
 
std::mutex fileDeleteLock_
 
int fileDescriptor_ = -1
 
unsigned int fileListIndex_ = 0
 
const bool fileListLoopMode_
 
const bool fileListMode_
 
std::vector< std::string > fileNames_
 
std::list< std::pair< int, std::string > > fileNamesToDelete_
 
tbb::concurrent_queue< InputFile * > fileQueue_
 
std::list< std::pair< int, InputFile * > > filesToDelete_
 
evf::FastMonitoringServicefms_ = 0
 
tbb::concurrent_queue< InputChunk * > freeChunks_
 
std::string fuOutputDir_
 
const bool getLSFromFilename_
 
uint32_t GTPEventID_ = 0
 
uint32_t L1EventID_ = 0
 
unsigned int loopModeIterationInc_ = 0
 
unsigned int maxBufferedFiles_
 
std::mutex monlock_
 
std::mutex mReader_
 
std::mutex mWakeup_
 
unsigned int nStreams_ = 0
 
unsigned int numBuffers_
 
unsigned int numConcurrentReads_
 
edm::ProcessHistoryID processHistoryID_
 
std::atomic< bool > quit_threads_
 
unsigned int readBlocks_
 
std::atomic< unsigned int > readingFilesCount_
 
std::unique_ptr< std::thread > readSupervisorThread_
 
edm::RunNumber_t runNumber_
 
bool setExceptionState_ = false
 
bool singleBufferMode_
 
std::map< unsigned int, unsigned int > sourceEventsReport_
 
bool startedSupervisorThread_ = false
 
std::condition_variable startupCv_
 
std::mutex startupLock_
 
std::vector< int > streamFileTracker_
 
unsigned char * tcds_pointer_
 
std::vector< unsigned int > thread_quit_signal
 
std::atomic< bool > threadInit_
 
std::vector< unsigned int > tid_active_
 
const bool useL1EventID_
 
const bool verifyAdler32_
 
const bool verifyChecksum_
 
std::vector< ReaderInfoworkerJob_
 
tbb::concurrent_queue< unsigned int > workerPool_
 
std::vector< std::thread * > workerThreads_
 

Friends

struct InputChunk
 
struct InputFile
 

Additional Inherited Members

- Public Types inherited from edm::InputSource
enum  ItemType {
  IsInvalid, IsStop, IsFile, IsRun,
  IsLumi, IsEvent, IsRepeat, IsSynchronize
}
 
enum  ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents }
 
- Public Attributes inherited from edm::InputSource
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> postEventReadFromSourceSignal_
 
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> preEventReadFromSourceSignal_
 

Detailed Description

Definition at line 41 of file FedRawDataInputSource.h.

Member Typedef Documentation

Definition at line 134 of file FedRawDataInputSource.h.

Constructor & Destructor Documentation

FedRawDataInputSource::FedRawDataInputSource ( edm::ParameterSet const &  pset,
edm::InputSourceDescription const &  desc 
)
explicit

Definition at line 56 of file FedRawDataInputSource.cc.

References edm::Timestamp::beginOfTime(), crc32c_hw_test(), cvReader_, daqDirector_, edm::DaqProvenanceHelper::daqInit(), daqProvenanceHelper_, defPath_, dpd_, MillePedeFileConverter_cfg::e, eventChunkBlock_, eventChunkSize_, Exception, fileDeleteLock_, fileListLoopMode_, fileListMode_, filesToDelete_, fms_, freeChunks_, mps_fire::i, evf::FastMonitoringThread::inInit, initFileList(), InputChunk, edm::Timestamp::invalidTimestamp(), numBuffers_, numConcurrentReads_, Utilities::operator, processHistoryID_, edm::InputSource::processHistoryRegistryForUpdate(), edm::InputSource::productRegistryUpdate(), quit_threads_, readBlocks_, readingFilesCount_, readWorker(), edm::InputSource::runAuxiliary(), runNumber_, evf::EvFDaqDirector::setDeleteTracking(), evf::EvFDaqDirector::setFMS(), evf::FastMonitoringService::setInputSource(), evf::FastMonitoringService::setInState(), evf::FastMonitoringService::setInStateSup(), edm::InputSource::setNewRun(), edm::InputSource::setRunAuxiliary(), singleBufferMode_, startupCv_, startupLock_, trackingPlots::stat, AlCaHLTBitMon_QueryRunRegistry::string, thread_quit_signal, threadInit_, tid_active_, workerJob_, and workerThreads_.

57  :
59  defPath_(pset.getUntrackedParameter<std::string> ("buDefPath", std::string(getenv("CMSSW_BASE"))+"/src/EventFilter/Utilities/plugins/budef.jsd")),
60  eventChunkSize_(pset.getUntrackedParameter<unsigned int> ("eventChunkSize",32)*1048576),
61  eventChunkBlock_(pset.getUntrackedParameter<unsigned int> ("eventChunkBlock",32)*1048576),
62  numBuffers_(pset.getUntrackedParameter<unsigned int> ("numBuffers",2)),
63  maxBufferedFiles_(pset.getUntrackedParameter<unsigned int> ("maxBufferedFiles",2)),
64  getLSFromFilename_(pset.getUntrackedParameter<bool> ("getLSFromFilename", true)),
65  verifyAdler32_(pset.getUntrackedParameter<bool> ("verifyAdler32", true)),
66  verifyChecksum_(pset.getUntrackedParameter<bool> ("verifyChecksum", true)),
67  useL1EventID_(pset.getUntrackedParameter<bool> ("useL1EventID", false)),
68  fileNames_(pset.getUntrackedParameter<std::vector<std::string>> ("fileNames",std::vector<std::string>())),
69  fileListMode_(pset.getUntrackedParameter<bool> ("fileListMode", false)),
70  fileListLoopMode_(pset.getUntrackedParameter<bool> ("fileListLoopMode", false)),
74  eventID_(),
77  tcds_pointer_(nullptr),
78  eventsThisLumi_(0),
79  dpd_(nullptr)
80 {
81  char thishost[256];
82  gethostname(thishost, 255);
83  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: "
84  << std::endl << (eventChunkSize_/1048576)
85  << " MB on host " << thishost;
86 
87  long autoRunNumber = -1;
88  if (fileListMode_) {
89  autoRunNumber = initFileList();
90  if (!fileListLoopMode_) {
91  if (autoRunNumber<0)
92  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") << "Run number not found from filename";
93  //override run number
94  runNumber_ = (edm::RunNumber_t)autoRunNumber;
95  edm::Service<evf::EvFDaqDirector>()->overrideRunNumber((unsigned int)autoRunNumber);
96  }
97  }
98 
100  setNewRun();
101  //todo:autodetect from file name (assert if names differ)
104 
105  std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
106  defPath_ = std::string(getenv("CMSSW_BASE")) + "/" + defPathSuffix;
107  struct stat statbuf;
108  if (stat(defPath_.c_str(), &statbuf)) {
109  defPath_ = std::string(getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
110  if (stat(defPath_.c_str(), &statbuf)) {
111  defPath_ = defPathSuffix;
112  }
113  }
114 
115  dpd_ = new DataPointDefinition();
116  std::string defLabel = "data";
117  DataPointDefinition::getDataPointDefinitionFor(defPath_, dpd_,&defLabel);
118 
119  //make sure that chunk size is N * block size
124 
125  if (!numBuffers_)
126  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") <<
127  "no reading enabled with numBuffers parameter 0";
128 
132 
133  if (!crc32c_hw_test())
134  edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
135 
136  //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
137  if (fileListMode_) {
138  try {
140  } catch(cms::Exception e) {
141  edm::LogInfo("FedRawDataInputSource") << "No FastMonitoringService found in the configuration";
142  }
143  }
144  else {
146  if (!fms_) {
147  throw cms::Exception("FedRawDataInputSource") << "FastMonitoringService not found";
148  }
149  }
150 
152  if (!daqDirector_)
153  cms::Exception("FedRawDataInputSource") << "EvFDaqDirector not found";
154 
155  //set DaqDirector to delete files in preGlobalEndLumi callback
157  if (fms_) {
159  fms_->setInputSource(this);
162  }
163  //should delete chunks when run stops
164  for (unsigned int i=0;i<numBuffers_;i++) {
166  }
167 
168  quit_threads_ = false;
169 
170  for (unsigned int i=0;i<numConcurrentReads_;i++)
171  {
172  std::unique_lock<std::mutex> lk(startupLock_);
173  //issue a memory fence here and in threads (constructor was segfaulting without this)
174  thread_quit_signal.push_back(false);
175  workerJob_.push_back(ReaderInfo(nullptr,nullptr));
176  cvReader_.push_back(new std::condition_variable);
177  tid_active_.push_back(0);
178  threadInit_.store(false,std::memory_order_release);
179  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker,this,i));
180  startupCv_.wait(lk);
181  }
182 
183  runAuxiliary()->setProcessHistoryID(processHistoryID_);
184 }
std::vector< std::string > fileNames_
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:334
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
bool crc32c_hw_test()
Definition: crc32c.cc:354
jsoncollector::DataPointDefinition * dpd_
void setInState(FastMonitoringThread::InputState inputState)
void setInStateSup(FastMonitoringThread::InputState inputState)
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
std::vector< std::condition_variable * > cvReader_
static Timestamp beginOfTime()
Definition: Timestamp.h:103
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
edm::ProcessHistoryID processHistoryID_
const edm::DaqProvenanceHelper daqProvenanceHelper_
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
std::vector< std::thread * > workerThreads_
void setInputSource(FedRawDataInputSource *inputSource)
std::list< std::pair< int, InputFile * > > filesToDelete_
def getRunNumber(filename)
std::condition_variable startupCv_
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:246
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:335
std::atomic< bool > threadInit_
std::pair< InputFile *, InputChunk * > ReaderInfo
evf::EvFDaqDirector * daqDirector_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:337
void readWorker(unsigned int tid)
std::vector< unsigned int > thread_quit_signal
std::atomic< unsigned int > readingFilesCount_
evf::FastMonitoringService * fms_
std::vector< unsigned int > tid_active_
unsigned int RunNumber_t
tbb::concurrent_queue< InputChunk * > freeChunks_
void setFMS(evf::FastMonitoringService *fms)
FedRawDataInputSource::~FedRawDataInputSource ( )
override

Definition at line 186 of file FedRawDataInputSource.cc.

References cvReader_, deleteFile(), filesToDelete_, mps_fire::i, mReader_, numConcurrentReads_, quit_threads_, readSupervisorThread_, startedSupervisorThread_, thread_quit_signal, and workerThreads_.

187 {
188  quit_threads_=true;
189 
190  //delete any remaining open files
191  for (auto it = filesToDelete_.begin();it!=filesToDelete_.end();it++) {
192  deleteFile(it->second->fileName_);
193  delete it->second;
194  }
196  readSupervisorThread_->join();
197  }
198  else {
199  //join aux threads in case the supervisor thread was not started
200  for (unsigned int i=0;i<workerThreads_.size();i++) {
201  std::unique_lock<std::mutex> lk(mReader_);
202  thread_quit_signal[i]=true;
203  cvReader_[i]->notify_one();
204  lk.unlock();
205  workerThreads_[i]->join();
206  delete workerThreads_[i];
207  }
208  }
209  for (unsigned int i=0;i<numConcurrentReads_;i++) delete cvReader_[i];
210  /*
211  for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
212  InputChunk *ch;
213  while (!freeChunks_.try_pop(ch)) {}
214  delete ch;
215  }
216  */
217 }
std::atomic< bool > quit_threads_
std::unique_ptr< std::thread > readSupervisorThread_
std::vector< std::condition_variable * > cvReader_
std::vector< std::thread * > workerThreads_
std::list< std::pair< int, InputFile * > > filesToDelete_
std::vector< unsigned int > thread_quit_signal
void deleteFile(std::string const &)

Member Function Documentation

bool FedRawDataInputSource::checkNextEvent ( )
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 236 of file FedRawDataInputSource.cc.

References evf::EvFDaqDirector::createProcessingNotificationMaybe(), currentLumiSection_, daqDirector_, event_, eventRunNumber_, eventsThisLumi_, fileListLoopMode_, fileListMode_, fms_, runEdmFileComparison::found, fuOutputDir_, evf::EvFDaqDirector::getEoLSFilePathOnBU(), evf::EvFDaqDirector::getEoLSFilePathOnFU(), evf::EvFDaqDirector::getEoRFilePathOnFU(), getLSFromFilename_, evf::FastMonitoringThread::inWaitInput, L1EventID_, evf::EvFDaqDirector::lockFULocal2(), maybeOpenNewLumiSection(), evf::EvFDaqDirector::newLumi, nextEvent(), evf::EvFDaqDirector::noFile, readSupervisor(), readSupervisorThread_, reportEventsThisLumiInSource(), edm::InputSource::resetLuminosityBlockAuxiliary(), evf::EvFDaqDirector::runEnded, runNumber_, edm::InputSource::setEventCached(), evf::FastMonitoringService::setInState(), startedSupervisorThread_, startupCv_, startupLock_, trackingPlots::stat, AlCaHLTBitMon_QueryRunRegistry::string, and evf::EvFDaqDirector::unlockFULocal2().

237 {
239  {
240  //late init of directory variable
242 
243  //this thread opens new files and dispatches reading to worker readers
244  //threadInit_.store(false,std::memory_order_release);
245  std::unique_lock<std::mutex> lk(startupLock_);
246  readSupervisorThread_.reset(new std::thread(&FedRawDataInputSource::readSupervisor,this));
248  startupCv_.wait(lk);
249  }
250  //signal hltd to start event accounting
251  if (!currentLumiSection_)
254  switch (nextEvent() ) {
256  //maybe create EoL file in working directory before ending run
257  struct stat buf;
258  if ( currentLumiSection_ > 0) {
259  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
260  if (eolFound) {
262  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
263  if ( !found ) {
265  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
266  close(eol_fd);
268  }
269  }
270  }
271  //also create EoR file in FU data directory
272  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(),&buf) == 0);
273  if (!eorFound) {
274  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
275  close(eor_fd);
276  }
278  eventsThisLumi_=0;
280  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
281  return false;
282  }
284  //this is not reachable
285  return true;
286  }
288  //std::cout << "--------------NEW LUMI---------------" << std::endl;
289  return true;
290  }
291  default: {
292  if (!getLSFromFilename_) {
293  //get new lumi from file header
294  if (event_->lumi() > currentLumiSection_) {
296  eventsThisLumi_=0;
297  maybeOpenNewLumiSection( event_->lumi() );
298  }
299  }
302  else
303  eventRunNumber_=event_->run();
304  L1EventID_ = event_->event();
305 
306  setEventCached();
307 
308  return true;
309  }
310  }
311 }
void maybeOpenNewLumiSection(const uint32_t lumiSection)
void setInState(FastMonitoringThread::InputState inputState)
void createProcessingNotificationMaybe() const
std::unique_ptr< std::thread > readSupervisorThread_
std::string getEoLSFilePathOnBU(const unsigned int ls) const
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:366
std::unique_ptr< FRDEventMsgView > event_
std::condition_variable startupCv_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:349
evf::EvFDaqDirector * daqDirector_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
evf::FastMonitoringService * fms_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
evf::EvFDaqDirector::FileStatus nextEvent()
std::string getEoRFilePathOnFU() const
void FedRawDataInputSource::createBoLSFile ( const uint32_t  lumiSection,
bool  checkIfExists 
)
private

Definition at line 313 of file FedRawDataInputSource.cc.

References daqDirector_, evf::EvFDaqDirector::getBoLSFilePathOnFU(), trackingPlots::stat, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by maybeOpenNewLumiSection().

314 {
315  //used for backpressure mechanisms and monitoring
316  const std::string fuBoLS = daqDirector_->getBoLSFilePathOnFU(lumiSection);
317  struct stat buf;
318  if (checkIfExists==false || stat(fuBoLS.c_str(), &buf) != 0) {
319  int bol_fd = open(fuBoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
320  close(bol_fd);
321  }
322 }
std::string getBoLSFilePathOnFU(const unsigned int ls) const
evf::EvFDaqDirector * daqDirector_
void FedRawDataInputSource::deleteFile ( std::string const &  fileName)
private

Definition at line 638 of file FedRawDataInputSource.cc.

References cppFunctionSkipper::exception, fileListMode_, MillePedeFileConverter_cfg::fileName, LogDebug, callgraph::path, and MatrixUtil::remove().

Referenced by getNextEvent(), read(), and ~FedRawDataInputSource().

639 {
640  //no deletion in file list mode
641  if (fileListMode_) return;
642 
643  const boost::filesystem::path filePath(fileName);
644  LogDebug("FedRawDataInputSource") << "Deleting input file -:" << fileName;
645  try {
646  //sometimes this fails but file gets deleted
647  boost::filesystem::remove(filePath);
648  }
649  catch (const boost::filesystem::filesystem_error& ex)
650  {
651  edm::LogError("FedRawDataInputSource") << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
652  << ". Trying again.";
653  usleep(100000);
654  try {
655  boost::filesystem::remove(filePath);
656  }
657  catch (const boost::filesystem::filesystem_error&) {/*file gets deleted first time but exception is still thrown*/}
658  }
659  catch (std::exception& ex)
660  {
661  edm::LogError("FedRawDataInputSource") << " - deleteFile std::exception CAUGHT -: " << ex.what()
662  << ". Trying again.";
663  usleep(100000);
664  try {
665  boost::filesystem::remove(filePath);
666  } catch (std::exception&) {/*file gets deleted first time but exception is still thrown*/}
667  }
668 }
#define LogDebug(id)
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:211
bool FedRawDataInputSource::exceptionState ( )
inlineprivate
void FedRawDataInputSource::fillDescriptions ( edm::ConfigurationDescriptions descriptions)
static

Definition at line 219 of file FedRawDataInputSource.cc.

References edm::ConfigurationDescriptions::add(), edm::ParameterSetDescription::addUntracked(), edm::ParameterSetDescription::setAllowAnything(), edm::ParameterSetDescription::setComment(), and edm::ParameterDescriptionNode::setComment().

220 {
222  desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk");
223  desc.addUntracked<unsigned int> ("eventChunkSize",32)->setComment("Input buffer (chunk) size");
224  desc.addUntracked<unsigned int> ("eventChunkBlock",32)->setComment("Block size used in a single file read call (must be smaller or equal to buffer size)");
225  desc.addUntracked<unsigned int> ("numBuffers",2)->setComment("Number of buffers used for reading input");
226  desc.addUntracked<unsigned int> ("maxBufferedFiles",2)->setComment("Maximum number of simultaneously buffered raw files");
227  desc.addUntracked<bool> ("verifyAdler32", true)->setComment("Verify event Adler32 checksum with FRDv3 or v4");
228  desc.addUntracked<bool> ("verifyChecksum", true)->setComment("Verify event CRC-32C checksum of FRDv5 or higher");
229  desc.addUntracked<bool> ("useL1EventID", false)->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
230  desc.addUntracked<bool> ("fileListMode", false)->setComment("Use fileNames parameter to directly specify raw files to open");
231  desc.addUntracked<std::vector<std::string>> ("fileNames", std::vector<std::string>())->setComment("file list used when fileListMode is enabled");
232  desc.setAllowAnything();
233  descriptions.add("source", desc);
234 }
void setComment(std::string const &value)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void setAllowAnything()
allow any parameter label/value pairs
void setComment(std::string const &value)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection ( FEDRawDataCollection rawData)
private

Definition at line 747 of file FedRawDataInputSource.cc.

References FEDRawData::data(), event_, evf::evtn::evm_board_sense(), Exception, FEDRawDataCollection::FEDData(), l1t::stage2::layer2::fedId, FEDTrailer::fragmentLength(), evf::evtn::get(), evf::evtn::getgpshigh(), evf::evtn::getgpslow(), evf::evtn::gtpe_board_sense(), evf::evtn::gtpe_get(), GTPEventID_, FEDHeader::length, FEDTrailer::length, FEDNumbering::MAXFEDID, FEDNumbering::MINTCDSuTCAFEDID, FEDNumbering::MINTriggerEGTPFEDID, FEDNumbering::MINTriggerGTPFEDID, FEDRawData::resize(), FEDHeader::sourceID(), tcds_pointer_, and ntuplemaker::time.

Referenced by read().

748 {
750  timeval stv;
751  gettimeofday(&stv,nullptr);
752  time = stv.tv_sec;
753  time = (time << 32) + stv.tv_usec;
754  edm::Timestamp tstamp(time);
755 
756  uint32_t eventSize = event_->eventSize();
757  unsigned char* event = (unsigned char*)event_->payload();
758  GTPEventID_=0;
759  tcds_pointer_ = nullptr;
760  while (eventSize > 0) {
761  assert(eventSize>=FEDTrailer::length);
762  eventSize -= FEDTrailer::length;
763  const FEDTrailer fedTrailer(event + eventSize);
764  const uint32_t fedSize = fedTrailer.fragmentLength() << 3; //trailer length counts in 8 bytes
765  assert(eventSize>=fedSize - FEDHeader::length);
766  eventSize -= (fedSize - FEDHeader::length);
767  const FEDHeader fedHeader(event + eventSize);
768  const uint16_t fedId = fedHeader.sourceID();
769  if(fedId>FEDNumbering::MAXFEDID)
770  {
771  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
772  }
773  if (fedId == FEDNumbering::MINTCDSuTCAFEDID) {
774  tcds_pointer_ = event + eventSize;
775  }
776  if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
777  if (evf::evtn::evm_board_sense(event + eventSize,fedSize))
778  GTPEventID_ = evf::evtn::get(event + eventSize,true);
779  else
780  GTPEventID_ = evf::evtn::get(event + eventSize,false);
781  //evf::evtn::evm_board_setformat(fedSize);
782  const uint64_t gpsl = evf::evtn::getgpslow(event + eventSize);
783  const uint64_t gpsh = evf::evtn::getgpshigh(event + eventSize);
784  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
785  }
786  //take event ID from GTPE FED
787  if (fedId == FEDNumbering::MINTriggerEGTPFEDID && GTPEventID_==0) {
788  if (evf::evtn::gtpe_board_sense(event + eventSize)) {
789  GTPEventID_ = evf::evtn::gtpe_get(event + eventSize);
790  }
791  }
792  FEDRawData& fedData = rawData.FEDData(fedId);
793  fedData.resize(fedSize);
794  memcpy(fedData.data(), event + eventSize, fedSize);
795  }
796  assert(eventSize == 0);
797 
798  return tstamp;
799 }
unsigned int getgpshigh(const unsigned char *)
bool gtpe_board_sense(const unsigned char *p)
static const uint32_t length
Definition: FEDTrailer.h:61
unsigned int get(const unsigned char *, bool)
static const uint32_t length
Definition: FEDHeader.h:54
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
void resize(size_t newsize)
Definition: FEDRawData.cc:32
unsigned long long TimeValue_t
Definition: Timestamp.h:28
bool evm_board_sense(const unsigned char *p, size_t size)
unsigned long long uint64_t
Definition: Time.h:15
std::unique_ptr< FRDEventMsgView > event_
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:28
unsigned int gtpe_get(const unsigned char *)
unsigned int getgpslow(const unsigned char *)
Definition: event.py:1
std::pair< bool, unsigned int > FedRawDataInputSource::getEventReport ( unsigned int  lumi,
bool  erase 
)

Definition at line 1392 of file FedRawDataInputSource.cc.

References CommonMethods::lock(), monlock_, and sourceEventsReport_.

Referenced by evf::FastMonitoringService::preGlobalEndLumi().

1393 {
1394  std::lock_guard<std::mutex> lock(monlock_);
1395  auto itr = sourceEventsReport_.find(lumi);
1396  if (itr!=sourceEventsReport_.end()) {
1397  auto && ret = std::pair<bool,unsigned int>(true,itr->second);
1398  if (erase)
1399  sourceEventsReport_.erase(itr);
1400  return ret;
1401  }
1402  else
1403  return std::pair<bool,unsigned int>(false,0);
1404 }
std::map< unsigned int, unsigned int > sourceEventsReport_
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getFile ( unsigned int &  ls,
std::string &  nextFile,
uint32_t &  fsize,
uint64_t &  lockWaitTime 
)
private

Definition at line 1435 of file FedRawDataInputSource.cc.

References fileListIndex_, fileListLoopMode_, MillePedeFileConverter_cfg::fileName, fileNames_, loopModeIterationInc_, evf::EvFDaqDirector::newFile, callgraph::path, evf::EvFDaqDirector::runEnded, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by readSupervisor().

1436 {
1437  if (fileListIndex_ < fileNames_.size()) {
1438  nextFile = fileNames_[fileListIndex_];
1439  if (nextFile.find("file://")==0) nextFile=nextFile.substr(7);
1440  else if (nextFile.find("file:")==0) nextFile=nextFile.substr(5);
1441  boost::filesystem::path fileName = nextFile;
1442  std::string fileStem = fileName.stem().string();
1443  if (fileStem.find("ls")) fileStem = fileStem.substr(fileStem.find("ls")+2);
1444  if (fileStem.find("_")) fileStem = fileStem.substr(0,fileStem.find("_"));
1445 
1446  if (!fileListLoopMode_)
1447  ls = boost::lexical_cast<unsigned int>(fileStem);
1448  else //always starting from LS 1 in loop mode
1449  ls = 1 + loopModeIterationInc_;
1450 
1451  //fsize = 0;
1452  //lockWaitTime = 0;
1453  fileListIndex_++;
1455  }
1456  else {
1457  if (!fileListLoopMode_)
1459  else {
1460  //loop through files until interrupted
1462  fileListIndex_=0;
1463  return getFile(ls,nextFile,fsize,lockWaitTime);
1464  }
1465  }
1466 }
std::vector< std::string > fileNames_
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
def ls(path, rec=False)
Definition: eostools.py:348
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent ( )
inlineprivate

Definition at line 375 of file FedRawDataInputSource.cc.

References InputFile::advance(), bufferInputRead_, InputFile::bufferPosition_, chunkIsFree_, InputFile::chunkPosition_, InputFile::chunks_, crc32c(), InputFile::currentChunk_, currentFile_, currentFileIndex_, currentLumiSection_, cvWakeup_, daqDirector_, deleteFile(), detectedFRDversion_, event_, eventChunkSize_, eventsThisLumi_, Exception, exceptionState(), fileDeleteLock_, fileListMode_, InputFile::fileName_, fileQueue_, InputFile::fileSize_, filesToDelete_, fms_, FRDHeaderVersionSize, freeChunks_, getLSFromFilename_, evf::FastMonitoringThread::inCachedEvent, evf::FastMonitoringThread::inChecksumEvent, evf::FastMonitoringThread::inChunkReceived, evf::FastMonitoringThread::inNewLumi, evf::FastMonitoringThread::inProcessingFile, evf::FastMonitoringThread::inRunEnd, evf::FastMonitoringThread::inWaitChunk, evf::FastMonitoringThread::inWaitInput, evf::EvFDaqDirector::isSingleStreamThread(), dttmaxenums::L, InputFile::lumi_, maybeOpenNewLumiSection(), InputFile::moveToPreviousChunk(), mWakeup_, InputFile::nChunks_, InputFile::nEvents_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, InputFile::nProcessed_, InputFile::parent_, readingFilesCount_, readNextChunkIntoBuffer(), reportEventsThisLumiInSource(), InputFile::rewindChunk(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, evf::EvFDaqDirector::sameFile, evf::FastMonitoringService::setExceptionDetected(), setExceptionState_, evf::FastMonitoringService::setInState(), singleBufferMode_, mps_update::status, InputFile::status_, threadError(), mps_check::timeout, verifyAdler32_, verifyChecksum_, and InputFile::waitForChunk().

Referenced by nextEvent().

376 {
377 
379  if (!currentFile_)
380  {
383  if (!fileQueue_.try_pop(currentFile_))
384  {
385  //sleep until wakeup (only in single-buffer mode) or timeout
386  std::unique_lock<std::mutex> lkw(mWakeup_);
387  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_)
389  }
390  status = currentFile_->status_;
391  if ( status == evf::EvFDaqDirector::runEnded)
392  {
394  delete currentFile_;
395  currentFile_=nullptr;
396  return status;
397  }
398  else if ( status == evf::EvFDaqDirector::runAbort)
399  {
400  throw cms::Exception("FedRawDataInputSource::getNextEvent") << "Run has been aborted by the input source reader thread";
401  }
402  else if (status == evf::EvFDaqDirector::newLumi)
403  {
405  if (getLSFromFilename_) {
408  eventsThisLumi_=0;
410  }
411  }
412  else {//let this be picked up from next event
414  }
415 
416  delete currentFile_;
417  currentFile_=nullptr;
418  return status;
419  }
420  else if (status == evf::EvFDaqDirector::newFile) {
422  }
423  else
424  assert(false);
425  }
427 
428  //file is empty
429  if (!currentFile_->fileSize_) {
431  //try to open new lumi
432  assert(currentFile_->nChunks_==0);
433  if (getLSFromFilename_)
436  eventsThisLumi_=0;
438  }
439  //immediately delete empty file
441  delete currentFile_;
442  currentFile_=nullptr;
444  }
445 
446  //file is finished
449  //release last chunk (it is never released elsewhere)
452  {
453  throw cms::Exception("FedRawDataInputSource::getNextEvent")
454  << "Fully processed " << currentFile_->nProcessed_
455  << " from the file " << currentFile_->fileName_
456  << " but according to BU JSON there should be "
457  << currentFile_->nEvents_ << " events";
458  }
459  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
460  if (singleBufferMode_) {
461  std::unique_lock<std::mutex> lkw(mWakeup_);
462  cvWakeup_.notify_one();
463  }
466  //put the file in pending delete list;
467  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
468  filesToDelete_.push_back(std::pair<int,InputFile*>(currentFileIndex_,currentFile_));
469  }
470  else {
471  //in single-thread and stream jobs, events are already processed
473  delete currentFile_;
474  }
475  currentFile_=nullptr;
477  }
478 
479 
480  //file is too short
482  {
483  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
484  "Premature end of input file while reading event header";
485  }
486  if (singleBufferMode_) {
487 
488  //should already be there
491  usleep(10000);
493  }
495 
496  unsigned char *dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
497 
498  //conditions when read amount is not sufficient for the header to fit
499  if (!bufferInputRead_ || bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_]
501  {
503 
504  if (detectedFRDversion_==0) {
505  detectedFRDversion_=*((uint32*)dataPosition);
506  if (detectedFRDversion_>5)
507  throw cms::Exception("FedRawDataInputSource::getNextEvent")
508  << "Unknown FRD version -: " << detectedFRDversion_;
509  assert(detectedFRDversion_>=1);
510  }
511 
512  //recalculate chunk position
513  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
514  if ( bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_])
515  {
516  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
517  "Premature end of input file while reading event header";
518  }
519  }
520 
521  event_.reset( new FRDEventMsgView(dataPosition) );
522  if (event_->size()>eventChunkSize_) {
523  throw cms::Exception("FedRawDataInputSource::getNextEvent")
524  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
525  << " run:" << event_->run() << " of size:" << event_->size()
526  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
527  }
528 
529  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
530 
532  {
533  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
534  "Premature end of input file while reading event data";
535  }
536  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
538  //recalculate chunk position
539  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
540  event_.reset( new FRDEventMsgView(dataPosition) );
541  }
542  currentFile_->bufferPosition_ += event_->size();
543  currentFile_->chunkPosition_ += event_->size();
544  //last chunk is released when this function is invoked next time
545 
546  }
547  //multibuffer mode:
548  else
549  {
550  //wait for the current chunk to become added to the vector
553  usleep(10000);
555  }
557 
558  //check if header is at the boundary of two chunks
559  chunkIsFree_ = false;
560  unsigned char *dataPosition;
561 
562  //read header, copy it to a single chunk if necessary
563  bool chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]);
564 
565  event_.reset( new FRDEventMsgView(dataPosition) );
566  if (event_->size()>eventChunkSize_) {
567  throw cms::Exception("FedRawDataInputSource::getNextEvent")
568  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
569  << " run:" << event_->run() << " of size:" << event_->size()
570  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
571  }
572 
573  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
574 
576  {
577  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
578  "Premature end of input file while reading event data";
579  }
580 
581  if (chunkEnd) {
582  //header was at the chunk boundary, we will have to move payload as well
583  currentFile_->moveToPreviousChunk(msgSize,FRDHeaderVersionSize[detectedFRDversion_]);
584  chunkIsFree_ = true;
585  }
586  else {
587  //header was contiguous, but check if payload fits the chunk
588  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
589  //rewind to header start position
590  currentFile_->rewindChunk(FRDHeaderVersionSize[detectedFRDversion_]);
591  //copy event to a chunk start and move pointers
592  chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]+msgSize);
593  assert(chunkEnd);
594  chunkIsFree_=true;
595  //header is moved
596  event_.reset( new FRDEventMsgView(dataPosition) );
597  }
598  else {
599  //everything is in a single chunk, only move pointers forward
600  chunkEnd = currentFile_->advance(dataPosition,msgSize);
601  assert(!chunkEnd);
602  chunkIsFree_=false;
603  }
604  }
605  }//end multibuffer mode
607 
608  if (verifyChecksum_ && event_->version() >= 5)
609  {
610  uint32_t crc=0;
611  crc = crc32c(crc,(const unsigned char*)event_->payload(),event_->eventSize());
612  if ( crc != event_->crc32c() ) {
614  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
615  "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() <<
616  " but calculated 0x" << crc;
617  }
618  }
619  else if ( verifyAdler32_ && event_->version() >= 3)
620  {
621  uint32_t adler = adler32(0L,Z_NULL,0);
622  adler = adler32(adler,(Bytef*)event_->payload(),event_->eventSize());
623 
624  if ( adler != event_->adler32() ) {
626  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
627  "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() <<
628  " but calculated 0x" << adler;
629  }
630  }
632 
634 
636 }
unsigned int lumi_
uint32_t chunkPosition_
std::condition_variable cvWakeup_
void setExceptionDetected(unsigned int ls)
void maybeOpenNewLumiSection(const uint32_t lumiSection)
void setInState(FastMonitoringThread::InputState inputState)
void rewindChunk(const size_t size)
const uint32 FRDHeaderVersionSize[6]
evf::EvFDaqDirector::FileStatus status_
int timeout
Definition: mps_check.py:51
FedRawDataInputSource * parent_
uint32_t bufferPosition_
bool advance(unsigned char *&dataPosition, const size_t size)
void moveToPreviousChunk(const size_t size, const size_t offset)
bool waitForChunk(unsigned int chunkid)
unsigned int uint32
Definition: MsgTools.h:13
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
Definition: crc32c.cc:340
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
unsigned int currentChunk_
tbb::concurrent_queue< InputFile * > fileQueue_
std::unique_ptr< FRDEventMsgView > event_
std::string fileName_
tbb::concurrent_vector< InputChunk * > chunks_
evf::EvFDaqDirector * daqDirector_
std::atomic< unsigned int > readingFilesCount_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
void deleteFile(std::string const &)
evf::FastMonitoringService * fms_
tbb::concurrent_queue< InputChunk * > freeChunks_
void readNextChunkIntoBuffer(InputFile *file)
int FedRawDataInputSource::grabNextJsonFile ( boost::filesystem::path const &  jsonSourcePath)
private

Definition at line 801 of file FedRawDataInputSource.cc.

References popcon2dropbox::copy(), daqDirector_, data, jsoncollector::DataPoint::deserialize(), reco::dp, dpd_, MillePedeFileConverter_cfg::e, cppFunctionSkipper::exception, Exception, fuOutputDir_, jsoncollector::DataPoint::getData(), Json::Reader::getFormatedErrorMessages(), jsoncollector::DataPointDefinition::getNames(), mps_fire::i, LogDebug, Json::Reader::parse(), callgraph::path, matplotRender::reader, MatrixUtil::remove(), AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, and evf::EvFDaqDirector::unlockFULocal().

Referenced by readSupervisor().

802 {
804  try {
805  // assemble json destination path
807 
808  //TODO:should be ported to use fffnaming
809  std::ostringstream fileNameWithPID;
810  fileNameWithPID << jsonSourcePath.stem().string() << "_pid"
811  << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
812  jsonDestPath /= fileNameWithPID.str();
813 
814  LogDebug("FedRawDataInputSource") << "JSON rename -: " << jsonSourcePath << " to "
815  << jsonDestPath;
816  try {
817  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
818  }
819  catch (const boost::filesystem::filesystem_error& ex)
820  {
821  // Input dir gone?
822  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
823  // << " Maybe the file is not yet visible by FU. Trying again in one second";
824  sleep(1);
825  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
826  }
828 
829  try {
830  //sometimes this fails but file gets deleted
831  boost::filesystem::remove(jsonSourcePath);
832  }
833  catch (const boost::filesystem::filesystem_error& ex)
834  {
835  // Input dir gone?
836  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
837  }
838  catch (std::exception& ex)
839  {
840  // Input dir gone?
841  edm::LogError("FedRawDataInputSource") << "grabNextFile std::exception CAUGHT -: " << ex.what();
842  }
843 
844  boost::filesystem::ifstream ij(jsonDestPath);
845  Json::Value deserializeRoot;
847 
848  std::stringstream ss;
849  ss << ij.rdbuf();
850  if (!reader.parse(ss.str(), deserializeRoot)) {
851  edm::LogError("FedRawDataInputSource") << "Failed to deserialize JSON file -: " << jsonDestPath
852  << "\nERROR:\n" << reader.getFormatedErrorMessages()
853  << "CONTENT:\n" << ss.str()<<".";
854  throw std::runtime_error("Cannot deserialize input JSON file");
855  }
856 
857  //read BU JSON
859  DataPoint dp;
860  dp.deserialize(deserializeRoot);
861  bool success = false;
862  for (unsigned int i=0;i<dpd_->getNames().size();i++) {
863  if (dpd_->getNames().at(i)=="NEvents")
864  if (i<dp.getData().size()) {
865  data = dp.getData()[i];
866  success=true;
867  }
868  }
869  if (!success) {
870  if (!dp.getData().empty())
871  data = dp.getData()[0];
872  else
873  throw cms::Exception("FedRawDataInputSource::grabNextJsonFile") <<
874  " error reading number of events from BU JSON -: No input value " << data;
875  }
876  return boost::lexical_cast<int>(data);
877 
878  }
879  catch (const boost::filesystem::filesystem_error& ex)
880  {
881  // Input dir gone?
883  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
884  }
885  catch (std::runtime_error e)
886  {
887  // Another process grabbed the file and NFS did not register this
889  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - runtime Exception -: " << e.what();
890  }
891 
892  catch( boost::bad_lexical_cast const& ) {
893  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - error parsing number of events from BU JSON. "
894  << "Input value is -: " << data;
895  }
896 
897  catch (std::exception e)
898  {
899  // BU run directory disappeared?
901  edm::LogError("FedRawDataInputSource") << "grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
902  }
903 
904  return -1;
905 }
#define LogDebug(id)
std::vector< std::string > & getData()
Definition: DataPoint.h:58
def copy(args, dbName)
jsoncollector::DataPointDefinition * dpd_
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
Represents a JSON value.
Definition: value.h:111
auto dp
Definition: deltaR.h:22
void deserialize(Json::Value &root) override
Definition: DataPoint.cc:56
def remove(d, key, TELL=False)
Definition: MatrixUtil.py:211
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
evf::EvFDaqDirector * daqDirector_
Unserialize a JSON document into a Value.
Definition: reader.h:16
std::vector< std::string > const & getNames()
std::string getFormatedErrorMessages() const
Returns a user friendly string that list errors in the parsed document.
long FedRawDataInputSource::initFileList ( )
private

Definition at line 1406 of file FedRawDataInputSource.cc.

References a, b, end, MillePedeFileConverter_cfg::fileName, fileNames_, callgraph::path, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FedRawDataInputSource().

1407 {
1408  std::sort(fileNames_.begin(),fileNames_.end(),
1409  [](std::string a, std::string b) {
1410  if (a.rfind("/")!=std::string::npos) a=a.substr(a.rfind("/"));
1411  if (b.rfind("/")!=std::string::npos) b=b.substr(b.rfind("/"));
1412  return b > a;});
1413 
1414  if (!fileNames_.empty()) {
1415  //get run number from first file in the vector
1417  std::string fileStem = fileName.stem().string();
1418  auto end = fileStem.find("_");
1419  if (fileStem.find("run")==0) {
1420  std::string runStr = fileStem.substr(3,end-3);
1421  try {
1422  //get long to support test run numbers < 2^32
1423  long rval = boost::lexical_cast<long>(runStr);
1424  edm::LogInfo("FedRawDataInputSource") << "Autodetected run number in fileListMode -: "<< rval;
1425  return rval;
1426  }
1427  catch( boost::bad_lexical_cast const& ) {
1428  edm::LogWarning("FedRawDataInputSource") << "Unable to autodetect run number in fileListMode from file -: "<< fileName;
1429  }
1430  }
1431  }
1432  return -1;
1433 }
std::vector< std::string > fileNames_
#define end
Definition: vmac.h:39
unsigned long long int rval
Definition: vlib.h:22
double b
Definition: hdecay.h:120
double a
Definition: hdecay.h:121
void FedRawDataInputSource::maybeOpenNewLumiSection ( const uint32_t  lumiSection)
private

Definition at line 324 of file FedRawDataInputSource.cc.

References createBoLSFile(), currentLumiSection_, daqDirector_, runEdmFileComparison::found, evf::EvFDaqDirector::getEoLSFilePathOnFU(), edm::Timestamp::invalidTimestamp(), evf::EvFDaqDirector::lockFULocal2(), edm::InputSource::luminosityBlock(), edm::InputSource::luminosityBlockAuxiliary(), processHistoryID_, edm::InputSource::resetLuminosityBlockAuxiliary(), edm::InputSource::run(), edm::InputSource::runAuxiliary(), edm::InputSource::setLuminosityBlockAuxiliary(), trackingPlots::stat, AlCaHLTBitMon_QueryRunRegistry::string, and evf::EvFDaqDirector::unlockFULocal2().

Referenced by checkNextEvent(), and getNextEvent().

325 {
327  || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
328 
329  if ( currentLumiSection_ > 0) {
330  const std::string fuEoLS =
332  struct stat buf;
333  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
334  if ( !found ) {
336  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
337  close(eol_fd);
338  createBoLSFile(lumiSection,false);
340  }
341  }
342  else createBoLSFile(lumiSection,true);//needed for initial lumisection
343 
344  currentLumiSection_ = lumiSection;
345 
347 
348  timeval tv;
349  gettimeofday(&tv, nullptr);
350  const edm::Timestamp lsopentime( (unsigned long long) tv.tv_sec * 1000000 + (unsigned long long) tv.tv_usec );
351 
352  edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary =
354  runAuxiliary()->run(),
355  lumiSection, lsopentime,
357 
358  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
359  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
360 
361  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: "<< lumiSection;
362  }
363 }
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists)
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:497
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:341
edm::ProcessHistoryID processHistoryID_
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:503
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:246
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:349
evf::EvFDaqDirector * daqDirector_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:249
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::nextEvent ( )
inlineprivate

Definition at line 365 of file FedRawDataInputSource.cc.

References getNextEvent(), svgfig::load(), evf::EvFDaqDirector::noFile, edm::shutdown_flag, and mps_update::status.

Referenced by checkNextEvent().

366 {
368  while ((status = getNextEvent())==evf::EvFDaqDirector::noFile)
369  {
370  if (edm::shutdown_flag.load(std::memory_order_relaxed)) break;
371  }
372  return status;
373 }
volatile std::atomic< bool > shutdown_flag
def load(fileName)
Definition: svgfig.py:546
evf::EvFDaqDirector::FileStatus getNextEvent()
void FedRawDataInputSource::read ( edm::EventPrincipal eventPrincipal)
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 671 of file FedRawDataInputSource.cc.

References printConversionInfo::aux, edm::DaqProvenanceHelper::branchDescription(), checkEvery_, chunkIsFree_, InputFile::chunks_, InputFile::currentChunk_, currentFile_, currentFileIndex_, currentLumiSection_, daqProvenanceHelper_, deleteFile(), edm::DaqProvenanceHelper::dummyProvenance(), eventID_, eventRunNumber_, eventsThisLumi_, fileDeleteLock_, fileListLoopMode_, filesToDelete_, fillFEDRawDataCollection(), fms_, freeChunks_, GTPEventID_, mps_fire::i, evf::FastMonitoringThread::inNoRequest, evf::FastMonitoringThread::inReadCleanup, evf::FastMonitoringThread::inReadEvent, L1EventID_, FEDHeader::length, edm::RawInputSource::makeEvent(), evf::evtn::makeEventAuxiliary(), eostools::move(), InputFile::nProcessed_, nStreams_, edm::EventAuxiliary::PhysicsTrigger, edm::InputSource::processGUID(), processHistoryID_, edm::EventPrincipal::put(), evf::FastMonitoringService::setInState(), edm::EventAuxiliary::setProcessHistoryID(), streamFileTracker_, edm::EventPrincipal::streamID(), tcds_pointer_, FEDHeader::triggerType(), and useL1EventID_.

Referenced by edmIntegrityCheck.PublishToFileSystem::get(), Vispa.Plugins.EdmBrowser.EdmDataAccessor.EdmDataAccessor::goto(), readNextChunkIntoBuffer(), readWorker(), and Vispa.Plugins.EdmBrowser.EdmDataAccessor.EdmDataAccessor::setFilterBranches().

672 {
674  std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
675  edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData);
676 
677  if (useL1EventID_){
679  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
681  aux.setProcessHistoryID(processHistoryID_);
682  makeEvent(eventPrincipal, aux);
683  }
684  else if(tcds_pointer_==nullptr){
685  assert(GTPEventID_);
687  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
689  aux.setProcessHistoryID(processHistoryID_);
690  makeEvent(eventPrincipal, aux);
691  }
692  else{
693  const FEDHeader fedHeader(tcds_pointer_);
694  tcds::Raw_v1 const* tcds = reinterpret_cast<tcds::Raw_v1 const*>(tcds_pointer_ + FEDHeader::length);
697  static_cast<edm::EventAuxiliary::ExperimentType>(fedHeader.triggerType()),
700  makeEvent(eventPrincipal, aux);
701  }
702 
703 
704 
705  std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
706 
707  eventPrincipal.put(daqProvenanceHelper_.branchDescription(), std::move(edp),
709 
710  eventsThisLumi_++;
712 
713  //resize vector if needed
714  while (streamFileTracker_.size() <= eventPrincipal.streamID())
715  streamFileTracker_.push_back(-1);
716 
718 
719  //this old file check runs no more often than every 10 events
720  if (!((currentFile_->nProcessed_-1)%(checkEvery_))) {
721  //delete files that are not in processing
722  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
723  auto it = filesToDelete_.begin();
724  while (it!=filesToDelete_.end()) {
725  bool fileIsBeingProcessed = false;
726  for (unsigned int i=0;i<nStreams_;i++) {
727  if (it->first == streamFileTracker_.at(i)) {
728  fileIsBeingProcessed = true;
729  break;
730  }
731  }
732  if (!fileIsBeingProcessed) {
733  deleteFile(it->second->fileName_);
734  delete it->second;
735  it = filesToDelete_.erase(it);
736  }
737  else it++;
738  }
739 
740  }
742  chunkIsFree_=false;
744  return;
745 }
edm::EventAuxiliary makeEventAuxiliary(const tcds::Raw_v1 *, unsigned int runNumber, unsigned int lumiSection, const edm::EventAuxiliary::ExperimentType &, const std::string &processGUID, bool verifyLumiSection)
static const uint32_t length
Definition: FEDHeader.h:54
void setInState(FastMonitoringThread::InputState inputState)
ProductProvenance const & dummyProvenance() const
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:211
Definition: TCDSRaw.h:16
std::vector< int > streamFileTracker_
StreamID streamID() const
BranchDescription const & branchDescription() const
edm::ProcessHistoryID processHistoryID_
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance) const
const edm::DaqProvenanceHelper daqProvenanceHelper_
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
unsigned int currentChunk_
void setProcessHistoryID(ProcessHistoryID const &phid)
tbb::concurrent_vector< InputChunk * > chunks_
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
void deleteFile(std::string const &)
evf::FastMonitoringService * fms_
tbb::concurrent_queue< InputChunk * > freeChunks_
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection &)
def move(src, dest)
Definition: eostools.py:510
void FedRawDataInputSource::readNextChunkIntoBuffer ( InputFile file)
private

Definition at line 1327 of file FedRawDataInputSource.cc.

References bufferInputRead_, InputFile::chunkPosition_, InputFile::chunks_, eventChunkBlock_, eventChunkSize_, Exception, fileDescriptor_, InputFile::fileName_, InputFile::fileSize_, mps_fire::i, plotBeamSpotDB::last, LogDebug, read(), and readBlocks_.

Referenced by getNextEvent().

1328 {
1329 
1330  if (fileDescriptor_<0) {
1331  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1332  bufferInputRead_ = 0;
1333  //off_t pos = lseek(fileDescriptor,0,SEEK_SET);
1334  if (fileDescriptor_>=0)
1335  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1336  else
1337  {
1338  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer") << "failed to open file " << std::endl
1339  << file->fileName_ << " fd:" << fileDescriptor_;
1340  }
1341  }
1342 
1343  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1344  uint32_t existingSize = 0;
1345  for (unsigned int i=0;i<readBlocks_;i++)
1346  {
1347  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1348  bufferInputRead_+=last;
1349  existingSize+=last;
1350  }
1351  }
1352  else {
1353  const uint32_t chunksize = file->chunkPosition_;
1354  const uint32_t blockcount=chunksize/eventChunkBlock_;
1355  const uint32_t leftsize = chunksize%eventChunkBlock_;
1356  uint32_t existingSize = eventChunkSize_ - file->chunkPosition_;
1357  memmove((void*) file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSize);
1358 
1359  for (uint32_t i=0;i<blockcount;i++) {
1360  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1361  bufferInputRead_+=last;
1362  existingSize+=last;
1363  }
1364  if (leftsize) {
1365  const ssize_t last = ::read(fileDescriptor_,( void*)( file->chunks_[0]->buf_ + existingSize ), leftsize);
1366  bufferInputRead_+=last;
1367  }
1368  file->chunkPosition_=0;//data was moved to beginning of the chunk
1369  }
1370  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1371  if (fileDescriptor_!=-1)
1372  {
1373  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1374  close(fileDescriptor_);
1375  fileDescriptor_=-1;
1376  }
1377  }
1378 }
#define LogDebug(id)
uint32_t chunkPosition_
void read(edm::EventPrincipal &eventPrincipal) override
std::string fileName_
tbb::concurrent_vector< InputChunk * > chunks_
void FedRawDataInputSource::readSupervisor ( )
private

Definition at line 911 of file FedRawDataInputSource.cc.

References InputFile::chunks_, cvReader_, cvWakeup_, daqDirector_, eventChunkSize_, fileListMode_, fileQueue_, fms_, freeChunks_, getFile(), getLSFromFilename_, grabNextJsonFile(), mps_fire::i, InputFile, evf::FastMonitoringThread::inRunEnd, evf::FastMonitoringThread::inSupBusy, evf::FastMonitoringThread::inSupFileLimit, evf::FastMonitoringThread::inSupLockPolling, evf::FastMonitoringThread::inSupNewFile, evf::FastMonitoringThread::inSupNewFileWaitChunk, evf::FastMonitoringThread::inSupNewFileWaitChunkCopying, evf::FastMonitoringThread::inSupNewFileWaitThread, evf::FastMonitoringThread::inSupNewFileWaitThreadCopying, evf::FastMonitoringThread::inSupNoFile, evf::FastMonitoringThread::inSupWaitFreeChunk, evf::FastMonitoringThread::inSupWaitFreeChunkCopying, evf::FastMonitoringThread::inSupWaitFreeThread, evf::FastMonitoringThread::inSupWaitFreeThreadCopying, LogDebug, eostools::ls(), maxBufferedFiles_, mReader_, mWakeup_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, callgraph::path, quit_threads_, InputChunk::readComplete_, readingFilesCount_, evf::FastMonitoringService::reportLockWait(), InputChunk::reset(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, setExceptionState_, evf::FastMonitoringService::setInStateSup(), edm::shutdown_flag, singleBufferMode_, evf::FastMonitoringService::startedLookingForFile(), startupCv_, startupLock_, trackingPlots::stat, mps_update::status, evf::FastMonitoringService::stoppedLookingForFile(), AlCaHLTBitMon_QueryRunRegistry::string, thread_quit_signal, tid_active_, mps_check::timeout, evf::EvFDaqDirector::updateFuLock(), workerJob_, workerPool_, and workerThreads_.

Referenced by checkNextEvent().

912 {
913  bool stop=false;
914  unsigned int currentLumiSection = 0;
915  //threadInit_.exchange(true,std::memory_order_acquire);
916 
917  {
918  std::unique_lock<std::mutex> lk(startupLock_);
919  startupCv_.notify_one();
920  }
921 
922  uint32_t ls=0;
923  uint32_t monLS=1;
924  uint32_t lockCount=0;
925  uint64_t sumLockWaitTimeUs=0.;
926 
927  while (!stop) {
928 
929  //wait for at least one free thread and chunk
930  int counter=0;
932  {
933  //report state to monitoring
934  if (fms_) {
935  bool copy_active=false;
936  for (auto j : tid_active_) if (j) copy_active=true;
938  else if (freeChunks_.empty()) {
939  if (copy_active)
941  else
943  }
944  else {
945  if (copy_active)
947  else
949  }
950  }
951  std::unique_lock<std::mutex> lkw(mWakeup_);
952  //sleep until woken up by condition or a timeout
953  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
954  counter++;
955  //if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads...";
956  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
957  }
958  else {
959  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
960  }
961  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {stop=true;break;}
962  }
963 
964  if (stop) break;
965 
966  //look for a new file
967  std::string nextFile;
968  uint32_t fileSize;
969 
970  if (fms_) {
974  }
975 
977 
978  while (status == evf::EvFDaqDirector::noFile) {
979  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
980  stop=true;
981  break;
982  }
983 
984  uint64_t thisLockWaitTimeUs=0.;
985  if (fileListMode_) {
986  //return LS if LS not set, otherwise return file
987  status = getFile(ls,nextFile,fileSize,thisLockWaitTimeUs);
988  }
989  else
990  status = daqDirector_->updateFuLock(ls,nextFile,fileSize,thisLockWaitTimeUs);
991 
993 
994  if (currentLumiSection!=ls && status==evf::EvFDaqDirector::runEnded) status=evf::EvFDaqDirector::noFile;
995 
996  //monitoring of lock wait time
997  if (thisLockWaitTimeUs>0.)
998  sumLockWaitTimeUs+=thisLockWaitTimeUs;
999  lockCount++;
1000  if (ls>monLS) {
1001  monLS=ls;
1002  if (lockCount)
1003  if (fms_) fms_->reportLockWait(monLS,sumLockWaitTimeUs,lockCount);
1004  lockCount=0;
1005  sumLockWaitTimeUs=0;
1006  }
1007 
1008  //check again for any remaining index/EoLS files after EoR file is seen
1009  if ( status == evf::EvFDaqDirector::runEnded && !fileListMode_) {
1011  usleep(100000);
1012  //now all files should have appeared in ramdisk, check again if any raw files were left behind
1013  status = daqDirector_->updateFuLock(ls,nextFile,fileSize,thisLockWaitTimeUs);
1014  if (currentLumiSection!=ls && status==evf::EvFDaqDirector::runEnded) status=evf::EvFDaqDirector::noFile;
1015  }
1016 
1017  if ( status == evf::EvFDaqDirector::runEnded) {
1019  stop=true;
1020  break;
1021  }
1022 
1023  //error from filelocking function
1024  if (status == evf::EvFDaqDirector::runAbort) {
1026  stop=true;
1027  break;
1028  }
1029  //queue new lumisection
1030  if( getLSFromFilename_ && ls > currentLumiSection) {
1031  //fms_->setInStateSup(evf::FastMonitoringThread::inSupNewLumi);
1032  currentLumiSection = ls;
1033  fileQueue_.push(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
1034  }
1035 
1036  if( getLSFromFilename_ && currentLumiSection>0 && ls < currentLumiSection) {
1037  edm::LogError("FedRawDataInputSource") << "Got old LS ("<<ls<<") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<". Aborting execution."<<std::endl;
1039  stop=true;
1040  break;
1041  }
1042 
1043  int dbgcount=0;
1044  if (status == evf::EvFDaqDirector::noFile) {
1046  dbgcount++;
1047  if (!(dbgcount%20)) LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
1048  usleep(100000);
1049  }
1050  }
1051  if ( status == evf::EvFDaqDirector::newFile ) {
1053  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
1054 
1055 
1056  boost::filesystem::path rawFilePath(nextFile);
1057  std::string rawFile = rawFilePath.replace_extension(".raw").string();
1058 
1059  struct stat st;
1060  int stat_res = stat(rawFile.c_str(),&st);
1061  if (stat_res==-1) {
1062  edm::LogError("FedRawDataInputSource") << "Can not stat file (" << errno << "):-"<< rawFile << std::endl;
1063  setExceptionState_=true;
1064  break;
1065  }
1066  fileSize=st.st_size;
1067 
1068  if (fms_) {
1072  }
1073  int eventsInNewFile;
1074  if (fileListMode_) {
1075  if (fileSize==0) eventsInNewFile=0;
1076  else eventsInNewFile=-1;
1077  }
1078  else {
1079  eventsInNewFile = grabNextJsonFile(nextFile);
1080  assert( eventsInNewFile>=0 );
1081  assert((eventsInNewFile>0) == (fileSize>0));//file without events must be empty
1082  }
1083 
1084  if (!singleBufferMode_) {
1085  //calculate number of needed chunks
1086  unsigned int neededChunks = fileSize/eventChunkSize_;
1087  if (fileSize%eventChunkSize_) neededChunks++;
1088 
1089  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,this);
1091  fileQueue_.push(newInputFile);
1092 
1093  for (unsigned int i=0;i<neededChunks;i++) {
1094 
1095  if (fms_) {
1096  bool copy_active=false;
1097  for (auto j : tid_active_) if (j) copy_active=true;
1098  if (copy_active)
1100  else
1102  }
1103  //get thread
1104  unsigned int newTid = 0xffffffff;
1105  while (!workerPool_.try_pop(newTid)) {
1106  usleep(100000);
1107  }
1108 
1109  if (fms_) {
1110  bool copy_active=false;
1111  for (auto j : tid_active_) if (j) copy_active=true;
1112  if (copy_active)
1114  else
1116  }
1117  InputChunk * newChunk = nullptr;
1118  while (!freeChunks_.try_pop(newChunk)) {
1119  usleep(100000);
1120  if (quit_threads_.load(std::memory_order_relaxed)) break;
1121  }
1122 
1123  if (newChunk == nullptr) {
1124  //return unused tid if we received shutdown (nullptr chunk)
1125  if (newTid!=0xffffffff) workerPool_.push(newTid);
1126  stop = true;
1127  break;
1128  }
1130 
1131  std::unique_lock<std::mutex> lk(mReader_);
1132 
1133  unsigned int toRead = eventChunkSize_;
1134  if (i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
1135  newChunk->reset(i*eventChunkSize_,toRead,i);
1136 
1137  workerJob_[newTid].first=newInputFile;
1138  workerJob_[newTid].second=newChunk;
1139 
1140  //wake up the worker thread
1141  cvReader_[newTid]->notify_one();
1142  }
1143  }
1144  else {
1145  if (!eventsInNewFile) {
1146  //still queue file for lumi update
1147  std::unique_lock<std::mutex> lkw(mWakeup_);
1148  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,this);
1150  fileQueue_.push(newInputFile);
1151  cvWakeup_.notify_one();
1152  break;
1153  }
1154  //in single-buffer mode put single chunk in the file and let the main thread read the file
1155  InputChunk * newChunk;
1156  //should be available immediately
1157  while(!freeChunks_.try_pop(newChunk)) usleep(100000);
1158 
1159  std::unique_lock<std::mutex> lkw(mWakeup_);
1160 
1161  unsigned int toRead = eventChunkSize_;
1162  if (fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
1163  newChunk->reset(0,toRead,0);
1164  newChunk->readComplete_=true;
1165 
1166  //push file and wakeup main thread
1167  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,this);
1168  newInputFile->chunks_[0]=newChunk;
1170  fileQueue_.push(newInputFile);
1171  cvWakeup_.notify_one();
1172  }
1173  }
1174  }
1176  //make sure threads finish reading
1177  unsigned numFinishedThreads = 0;
1178  while (numFinishedThreads < workerThreads_.size()) {
1179  unsigned int tid;
1180  while (!workerPool_.try_pop(tid)) {usleep(10000);}
1181  std::unique_lock<std::mutex> lk(mReader_);
1182  thread_quit_signal[tid]=true;
1183  cvReader_[tid]->notify_one();
1184  numFinishedThreads++;
1185  }
1186  for (unsigned int i=0;i<workerThreads_.size();i++) {
1187  workerThreads_[i]->join();
1188  delete workerThreads_[i];
1189  }
1190 }
#define LogDebug(id)
std::condition_variable cvWakeup_
tbb::concurrent_queue< unsigned int > workerPool_
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
volatile std::atomic< bool > shutdown_flag
void setInStateSup(FastMonitoringThread::InputState inputState)
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
int timeout
Definition: mps_check.py:51
std::vector< std::condition_variable * > cvReader_
void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
int grabNextJsonFile(boost::filesystem::path const &)
std::vector< std::thread * > workerThreads_
std::atomic< bool > readComplete_
def ls(path, rec=False)
Definition: eostools.py:348
unsigned long long uint64_t
Definition: Time.h:15
tbb::concurrent_queue< InputFile * > fileQueue_
std::condition_variable startupCv_
void stoppedLookingForFile(unsigned int lumi)
tbb::concurrent_vector< InputChunk * > chunks_
evf::EvFDaqDirector * daqDirector_
std::vector< unsigned int > thread_quit_signal
std::atomic< unsigned int > readingFilesCount_
evf::FastMonitoringService * fms_
std::vector< unsigned int > tid_active_
tbb::concurrent_queue< InputChunk * > freeChunks_
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
void FedRawDataInputSource::readWorker ( unsigned int  tid)
private

Definition at line 1192 of file FedRawDataInputSource.cc.

References InputChunk::buf_, InputFile::chunks_, cvReader_, detectedFRDversion_, diffTreeTool::diff, end, eventChunkBlock_, FrontierConditions_GlobalTag_cff::file, InputChunk::fileIndex_, InputFile::fileName_, plotBeamSpotDB::first, mps_fire::i, init, plotBeamSpotDB::last, LogDebug, mReader_, cmsPerfSuiteHarvest::now, InputChunk::offset_, read(), readBlocks_, InputChunk::readComplete_, edm::second(), setExceptionState_, command_line::start, startupCv_, startupLock_, thread_quit_signal, threadInit_, tid_active_, InputChunk::usedSize_, workerJob_, and workerPool_.

Referenced by FedRawDataInputSource().

1193 {
1194  bool init = true;
1195  threadInit_.exchange(true,std::memory_order_acquire);
1196 
1197  while (true) {
1198 
1199  tid_active_[tid]=false;
1200  std::unique_lock<std::mutex> lk(mReader_);
1201  workerJob_[tid].first=nullptr;
1202  workerJob_[tid].first=nullptr;
1203 
1204  assert(!thread_quit_signal[tid]);//should never get it here
1205  workerPool_.push(tid);
1206 
1207  if (init) {
1208  std::unique_lock<std::mutex> lk(startupLock_);
1209  init = false;
1210  startupCv_.notify_one();
1211  }
1212  cvReader_[tid]->wait(lk);
1213 
1214  if (thread_quit_signal[tid]) return;
1215  tid_active_[tid]=true;
1216 
1217  InputFile * file;
1218  InputChunk * chunk;
1219 
1220  assert(workerJob_[tid].first!=nullptr && workerJob_[tid].second!=nullptr);
1221 
1222  file = workerJob_[tid].first;
1223  chunk = workerJob_[tid].second;
1224 
1225  int fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1226  off_t pos = lseek(fileDescriptor,chunk->offset_,SEEK_SET);
1227 
1228 
1229  if (fileDescriptor>=0)
1230  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_ << " at offset " << pos;
1231  else
1232  {
1233  edm::LogError("FedRawDataInputSource") <<
1234  "readWorker failed to open file -: " << file->fileName_ << " fd:" << fileDescriptor <<
1235  " or seek to offset " << chunk->offset_ << ", lseek returned:" << pos;
1236  setExceptionState_=true;
1237  return;
1238 
1239  }
1240 
1241  unsigned int bufferLeft = 0;
1243  for (unsigned int i=0;i<readBlocks_;i++)
1244  {
1245  const ssize_t last = ::read(fileDescriptor,( void*) (chunk->buf_+bufferLeft), eventChunkBlock_);
1246  if ( last > 0 )
1247  bufferLeft+=last;
1248  if (last < eventChunkBlock_) {
1249  assert(chunk->usedSize_==i*eventChunkBlock_+last);
1250  break;
1251  }
1252  }
1254  auto diff = end-start;
1255  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1256  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB" << " in " << msec.count() << " ms ("<< (bufferLeft >> 20)/double(msec.count())<<" GB/s)";
1257  close(fileDescriptor);
1258 
1259  if (detectedFRDversion_==0 && chunk->offset_==0) detectedFRDversion_=*((uint32*)chunk->buf_);
1260  assert(detectedFRDversion_<=5);
1261  chunk->readComplete_=true;//this is atomic to secure the sequential buffer fill before becoming available for processing)
1262  file->chunks_[chunk->fileIndex_]=chunk;//put the completed chunk in the file chunk vector at predetermined index
1263 
1264  }
1265 }
#define LogDebug(id)
Definition: start.py:1
void read(edm::EventPrincipal &eventPrincipal) override
unsigned int offset_
tbb::concurrent_queue< unsigned int > workerPool_
int init
Definition: HydjetWrapper.h:67
std::vector< ReaderInfo > workerJob_
std::vector< std::condition_variable * > cvReader_
U second(std::pair< T, U > const &p)
unsigned char * buf_
#define end
Definition: vmac.h:39
unsigned int fileIndex_
unsigned int uint32
Definition: MsgTools.h:13
std::atomic< bool > readComplete_
std::string fileName_
std::condition_variable startupCv_
tbb::concurrent_vector< InputChunk * > chunks_
std::atomic< bool > threadInit_
std::vector< unsigned int > thread_quit_signal
std::vector< unsigned int > tid_active_
void FedRawDataInputSource::reportEventsThisLumiInSource ( unsigned int  lumi,
unsigned int  events 
)
private

Definition at line 1381 of file FedRawDataInputSource.cc.

References CommonMethods::lock(), monlock_, and sourceEventsReport_.

Referenced by checkNextEvent(), and getNextEvent().

1382 {
1383 
1384  std::lock_guard<std::mutex> lock(monlock_);
1385  auto itr = sourceEventsReport_.find(lumi);
1386  if (itr!=sourceEventsReport_.end())
1387  itr->second+=events;
1388  else
1390 }
std::map< unsigned int, unsigned int > sourceEventsReport_
void FedRawDataInputSource::rewind_ ( )
overrideprivatevirtual

Reimplemented from edm::InputSource.

Definition at line 907 of file FedRawDataInputSource.cc.

908 {}
void FedRawDataInputSource::threadError ( )
private

Definition at line 1267 of file FedRawDataInputSource.cc.

References Exception, and quit_threads_.

Referenced by getNextEvent().

1268 {
1269  quit_threads_=true;
1270  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1271 
1272 }
std::atomic< bool > quit_threads_

Friends And Related Function Documentation

friend struct InputChunk
friend

Definition at line 44 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

friend struct InputFile
friend

Definition at line 43 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

Member Data Documentation

uint32_t FedRawDataInputSource::bufferInputRead_ = 0
private

Definition at line 175 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readNextChunkIntoBuffer().

unsigned int FedRawDataInputSource::checkEvery_ = 10
private

Definition at line 166 of file FedRawDataInputSource.h.

Referenced by read().

bool FedRawDataInputSource::chunkIsFree_ =false
private

Definition at line 138 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

InputFile* FedRawDataInputSource::currentFile_ = 0
private

Definition at line 137 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

int FedRawDataInputSource::currentFileIndex_ = -1
private

Definition at line 160 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

unsigned int FedRawDataInputSource::currentLumiSection_
private
std::vector<std::condition_variable*> FedRawDataInputSource::cvReader_
private
std::condition_variable FedRawDataInputSource::cvWakeup_
private

Definition at line 170 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

evf::EvFDaqDirector* FedRawDataInputSource::daqDirector_ = 0
private
const edm::DaqProvenanceHelper FedRawDataInputSource::daqProvenanceHelper_
private

Definition at line 111 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and read().

std::string FedRawDataInputSource::defPath_
private

Definition at line 85 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

uint32 FedRawDataInputSource::detectedFRDversion_ =0
private

Definition at line 136 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readWorker().

jsoncollector::DataPointDefinition* FedRawDataInputSource::dpd_
private

Definition at line 126 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and grabNextJsonFile().

std::unique_ptr<FRDEventMsgView> FedRawDataInputSource::event_
private

Definition at line 113 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), fillFEDRawDataCollection(), and getNextEvent().

unsigned int FedRawDataInputSource::eventChunkBlock_
private
unsigned int FedRawDataInputSource::eventChunkSize_
private
edm::EventID FedRawDataInputSource::eventID_
private

Definition at line 115 of file FedRawDataInputSource.h.

Referenced by read().

uint32_t FedRawDataInputSource::eventRunNumber_ =0
private

Definition at line 119 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and read().

unsigned int FedRawDataInputSource::eventsThisLumi_
private

Definition at line 123 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), getNextEvent(), and read().

unsigned long FedRawDataInputSource::eventsThisRun_ = 0
private

Definition at line 124 of file FedRawDataInputSource.h.

std::mutex FedRawDataInputSource::fileDeleteLock_
private

Definition at line 163 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and read().

int FedRawDataInputSource::fileDescriptor_ = -1
private

Definition at line 174 of file FedRawDataInputSource.h.

Referenced by readNextChunkIntoBuffer().

unsigned int FedRawDataInputSource::fileListIndex_ = 0
private

Definition at line 104 of file FedRawDataInputSource.h.

Referenced by getFile().

const bool FedRawDataInputSource::fileListLoopMode_
private

Definition at line 105 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), FedRawDataInputSource(), getFile(), and read().

const bool FedRawDataInputSource::fileListMode_
private
std::vector<std::string> FedRawDataInputSource::fileNames_
private

Definition at line 100 of file FedRawDataInputSource.h.

Referenced by getFile(), and initFileList().

std::list<std::pair<int,std::string> > FedRawDataInputSource::fileNamesToDelete_
private

Definition at line 162 of file FedRawDataInputSource.h.

tbb::concurrent_queue<InputFile*> FedRawDataInputSource::fileQueue_
private

Definition at line 148 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

std::list<std::pair<int,InputFile*> > FedRawDataInputSource::filesToDelete_
private
evf::FastMonitoringService* FedRawDataInputSource::fms_ = 0
private
tbb::concurrent_queue<InputChunk*> FedRawDataInputSource::freeChunks_
private

Definition at line 147 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), read(), and readSupervisor().

std::string FedRawDataInputSource::fuOutputDir_
private

Definition at line 109 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and grabNextJsonFile().

const bool FedRawDataInputSource::getLSFromFilename_
private

Definition at line 96 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), getNextEvent(), and readSupervisor().

uint32_t FedRawDataInputSource::GTPEventID_ = 0
private

Definition at line 120 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

uint32_t FedRawDataInputSource::L1EventID_ = 0
private

Definition at line 121 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and read().

unsigned int FedRawDataInputSource::loopModeIterationInc_ = 0
private

Definition at line 106 of file FedRawDataInputSource.h.

Referenced by getFile().

unsigned int FedRawDataInputSource::maxBufferedFiles_
private

Definition at line 91 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

std::mutex FedRawDataInputSource::monlock_
private

Definition at line 180 of file FedRawDataInputSource.h.

Referenced by getEventReport(), and reportEventsThisLumiInSource().

std::mutex FedRawDataInputSource::mReader_
private

Definition at line 150 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), readWorker(), and ~FedRawDataInputSource().

std::mutex FedRawDataInputSource::mWakeup_
private

Definition at line 169 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

unsigned int FedRawDataInputSource::nStreams_ = 0
private

Definition at line 165 of file FedRawDataInputSource.h.

Referenced by read().

unsigned int FedRawDataInputSource::numBuffers_
private

Definition at line 90 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

unsigned int FedRawDataInputSource::numConcurrentReads_
private

Definition at line 92 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and ~FedRawDataInputSource().

edm::ProcessHistoryID FedRawDataInputSource::processHistoryID_
private

Definition at line 116 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), maybeOpenNewLumiSection(), and read().

std::atomic<bool> FedRawDataInputSource::quit_threads_
private
unsigned int FedRawDataInputSource::readBlocks_
private
std::atomic<unsigned int> FedRawDataInputSource::readingFilesCount_
private

Definition at line 93 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

std::unique_ptr<std::thread> FedRawDataInputSource::readSupervisorThread_
private

Definition at line 141 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and ~FedRawDataInputSource().

edm::RunNumber_t FedRawDataInputSource::runNumber_
private

Definition at line 108 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and FedRawDataInputSource().

bool FedRawDataInputSource::setExceptionState_ = false
private

Definition at line 156 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), readSupervisor(), and readWorker().

bool FedRawDataInputSource::singleBufferMode_
private

Definition at line 173 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

std::map<unsigned int,unsigned int> FedRawDataInputSource::sourceEventsReport_
private

Definition at line 179 of file FedRawDataInputSource.h.

Referenced by getEventReport(), and reportEventsThisLumiInSource().

bool FedRawDataInputSource::startedSupervisorThread_ = false
private

Definition at line 140 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and ~FedRawDataInputSource().

std::condition_variable FedRawDataInputSource::startupCv_
private
std::mutex FedRawDataInputSource::startupLock_
private
std::vector<int> FedRawDataInputSource::streamFileTracker_
private

Definition at line 164 of file FedRawDataInputSource.h.

Referenced by read().

unsigned char* FedRawDataInputSource::tcds_pointer_
private

Definition at line 122 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

std::vector<unsigned int> FedRawDataInputSource::thread_quit_signal
private
std::atomic<bool> FedRawDataInputSource::threadInit_
private

Definition at line 177 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and readWorker().

std::vector<unsigned int> FedRawDataInputSource::tid_active_
private

Definition at line 152 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), readSupervisor(), and readWorker().

const bool FedRawDataInputSource::useL1EventID_
private

Definition at line 99 of file FedRawDataInputSource.h.

Referenced by read().

const bool FedRawDataInputSource::verifyAdler32_
private

Definition at line 97 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

const bool FedRawDataInputSource::verifyChecksum_
private

Definition at line 98 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

std::vector<ReaderInfo> FedRawDataInputSource::workerJob_
private

Definition at line 145 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), readSupervisor(), and readWorker().

tbb::concurrent_queue<unsigned int> FedRawDataInputSource::workerPool_
private

Definition at line 144 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), and readWorker().

std::vector<std::thread*> FedRawDataInputSource::workerThreads_
private