test
CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Static Public Member Functions | Protected Member Functions | Private Types | Private Member Functions | Private Attributes | Friends
FedRawDataInputSource Class Reference

#include <FedRawDataInputSource.h>

Inheritance diagram for FedRawDataInputSource:
edm::RawInputSource edm::InputSource edm::ProductRegistryHelper

Public Member Functions

 FedRawDataInputSource (edm::ParameterSet const &, edm::InputSourceDescription const &)
 
std::pair< bool, unsigned int > getEventReport (unsigned int lumi, bool erase)
 
virtual ~FedRawDataInputSource ()
 
- Public Member Functions inherited from edm::RawInputSource
 RawInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
 
virtual ~RawInputSource ()
 
- Public Member Functions inherited from edm::InputSource
std::shared_ptr< ActivityRegistryactReg () const
 Accessor for Activity Registry. More...
 
std::shared_ptr
< BranchIDListHelper const > 
branchIDListHelper () const
 Accessors for branchIDListHelper. More...
 
std::shared_ptr
< BranchIDListHelper > & 
branchIDListHelper ()
 
void closeFile (FileBlock *, bool cleaningUpAfterException)
 close current file More...
 
void doBeginJob ()
 Called by framework at beginning of job. More...
 
void doBeginLumi (LuminosityBlockPrincipal &lbp, ProcessContext const *)
 Called by framework at beginning of lumi block. More...
 
void doBeginRun (RunPrincipal &rp, ProcessContext const *)
 Called by framework at beginning of run. More...
 
void doEndJob ()
 Called by framework at end of job. More...
 
void doEndLumi (LuminosityBlockPrincipal &lbp, bool cleaningUpAfterException, ProcessContext const *)
 Called by framework at end of lumi block. More...
 
void doEndRun (RunPrincipal &rp, bool cleaningUpAfterException, ProcessContext const *)
 Called by framework at end of run. More...
 
void doPostForkReacquireResources (std::shared_ptr< multicore::MessageReceiverForSource >)
 
void doPreForkReleaseResources ()
 Called by the framework before forking the process. More...
 
ProcessingController::ForwardState forwardState () const
 
bool goToEvent (EventID const &eventID)
 
 InputSource (ParameterSet const &, InputSourceDescription const &)
 Constructor. More...
 
 InputSource (InputSource const &)=delete
 
void issueReports (EventID const &eventID)
 issue an event report More...
 
LuminosityBlockNumber_t luminosityBlock () const
 Accessor for current luminosity block number. More...
 
std::shared_ptr
< LuminosityBlockAuxiliary
luminosityBlockAuxiliary () const
 Called by the framework to merge or insert lumi in principal cache. More...
 
int maxEvents () const
 
int maxLuminosityBlocks () const
 
ModuleDescription const & moduleDescription () const
 Accessor for 'module' description. More...
 
ItemType nextItemType ()
 Advances the source to the next item. More...
 
InputSourceoperator= (InputSource const &)=delete
 
ProcessConfiguration const & processConfiguration () const
 Accessor for Process Configuration. More...
 
std::string const & processGUID () const
 Accessor for global process identifier. More...
 
ProcessHistoryRegistry const & processHistoryRegistry () const
 Accessors for process history registry. More...
 
ProcessHistoryRegistryprocessHistoryRegistry ()
 
ProcessingMode processingMode () const
 RunsLumisAndEvents (default), RunsAndLumis, or Runs. More...
 
std::shared_ptr
< ProductRegistry const > 
productRegistry () const
 Accessors for product registry. More...
 
std::shared_ptr
< ProductRegistry > & 
productRegistry ()
 
bool randomAccess () const
 
void readAndMergeLumi (LuminosityBlockPrincipal &lbp)
 Read next luminosity block (same as a prior lumi) More...
 
void readAndMergeRun (RunPrincipal &rp)
 Read next run (same as a prior run) More...
 
void readEvent (EventPrincipal &ep, StreamContext &)
 Read next event. More...
 
bool readEvent (EventPrincipal &ep, EventID const &, StreamContext &)
 Read a specific event. More...
 
std::unique_ptr< FileBlockreadFile ()
 Read next file. More...
 
void readLuminosityBlock (LuminosityBlockPrincipal &lumiPrincipal, HistoryAppender &historyAppender)
 Read next luminosity block (new lumi) More...
 
std::shared_ptr
< LuminosityBlockAuxiliary
readLuminosityBlockAuxiliary ()
 Read next luminosity block Auxilary. More...
 
void readRun (RunPrincipal &runPrincipal, HistoryAppender &historyAppender)
 Read next run (new run) More...
 
std::shared_ptr< RunAuxiliaryreadRunAuxiliary ()
 Read next run Auxiliary. More...
 
ProcessHistoryID const & reducedProcessHistoryID () const
 
void registerProducts ()
 Register any produced products. More...
 
int remainingEvents () const
 
int remainingLuminosityBlocks () const
 
void repeat ()
 Reset the remaining number of events/lumis to the maximum number. More...
 
SharedResourcesAcquirerresourceSharedWithDelayedReader ()
 Returns nullptr if no resource shared between the Source and a DelayedReader. More...
 
ProcessingController::ReverseState reverseState () const
 
void rewind ()
 Begin again at the first event. More...
 
RunNumber_t run () const
 Accessor for current run number. More...
 
std::shared_ptr< RunAuxiliaryrunAuxiliary () const
 Called by the framework to merge or insert run in principal cache. More...
 
void setLuminosityBlockNumber_t (LuminosityBlockNumber_t lb)
 Set the luminosity block ID. More...
 
void setRunNumber (RunNumber_t r)
 Set the run number. More...
 
void skipEvents (int offset)
 
bool skipForForking ()
 
std::shared_ptr
< ThinnedAssociationsHelper
const > 
thinnedAssociationsHelper () const
 Accessors for thinnedAssociationsHelper. More...
 
std::shared_ptr
< ThinnedAssociationsHelper > & 
thinnedAssociationsHelper ()
 
Timestamp const & timestamp () const
 Accessor for the current time, as seen by the input source. More...
 
virtual ~InputSource ()
 Destructor. More...
 

Static Public Member Functions

static void fillDescriptions (edm::ConfigurationDescriptions &descriptions)
 
- Static Public Member Functions inherited from edm::RawInputSource
static void fillDescription (ParameterSetDescription &description)
 
- Static Public Member Functions inherited from edm::InputSource
static const std::string & baseType ()
 
static void fillDescription (ParameterSetDescription &desc)
 
static void fillDescriptions (ConfigurationDescriptions &descriptions)
 
static void prevalidate (ConfigurationDescriptions &)
 

Protected Member Functions

virtual bool checkNextEvent () override
 
virtual void read (edm::EventPrincipal &eventPrincipal) override
 
- Protected Member Functions inherited from edm::RawInputSource
void makeEvent (EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
 
void setInputFileTransitionsEachEvent ()
 
- Protected Member Functions inherited from edm::InputSource
void decreaseRemainingEventsBy (int iSkipped)
 
bool eventCached () const
 
bool newLumi () const
 
bool newRun () const
 
ProcessHistoryRegistryprocessHistoryRegistryForUpdate ()
 
ProductRegistryproductRegistryUpdate ()
 
void reset () const
 
void resetEventCached ()
 
void resetLuminosityBlockAuxiliary (bool isNewLumi=true) const
 
void resetNewLumi ()
 
void resetNewRun ()
 
void resetRunAuxiliary (bool isNewRun=true) const
 
void setEventCached ()
 Called by the framework to merge or ached() const {return eventCached_;}. More...
 
void setLuminosityBlockAuxiliary (LuminosityBlockAuxiliary *lbp)
 
void setNewLumi ()
 
void setNewRun ()
 
void setRunAuxiliary (RunAuxiliary *rp)
 
void setTimestamp (Timestamp const &theTime)
 To set the current time, as seen by the input source. More...
 
virtual void skip (int offset)
 
ItemType state () const
 

Private Types

typedef std::pair< InputFile
*, InputChunk * > 
ReaderInfo
 

Private Member Functions

void createBoLSFile (const uint32_t lumiSection, bool checkIfExists)
 
void deleteFile (std::string const &)
 
bool exceptionState ()
 
edm::Timestamp fillFEDRawDataCollection (FEDRawDataCollection &)
 
evf::EvFDaqDirector::FileStatus getFile (unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
 
evf::EvFDaqDirector::FileStatus getNextEvent ()
 
int grabNextJsonFile (boost::filesystem::path const &)
 
long initFileList ()
 
void maybeOpenNewLumiSection (const uint32_t lumiSection)
 
evf::EvFDaqDirector::FileStatus nextEvent ()
 
virtual void postForkReacquireResources (std::shared_ptr< edm::multicore::MessageReceiverForSource >) override
 
virtual void preForkReleaseResources () override
 
void readNextChunkIntoBuffer (InputFile *file)
 
void readSupervisor ()
 
void readWorker (unsigned int tid)
 
void reportEventsThisLumiInSource (unsigned int lumi, unsigned int events)
 
virtual void rewind_ () override
 
void threadError ()
 

Private Attributes

uint32_t bufferInputRead_ = 0
 
unsigned int checkEvery_ = 10
 
bool chunkIsFree_ =false
 
InputFilecurrentFile_ = 0
 
int currentFileIndex_ = -1
 
unsigned int currentLumiSection_
 
std::vector
< std::condition_variable * > 
cvReader_
 
std::condition_variable cvWakeup_
 
evf::EvFDaqDirectordaqDirector_ = 0
 
const edm::DaqProvenanceHelper daqProvenanceHelper_
 
std::string defPath_
 
uint32 detectedFRDversion_ =0
 
jsoncollector::DataPointDefinitiondpd_
 
std::unique_ptr< FRDEventMsgViewevent_
 
unsigned int eventChunkBlock_
 
unsigned int eventChunkSize_
 
edm::EventID eventID_
 
uint32_t eventRunNumber_ =0
 
unsigned int eventsThisLumi_
 
unsigned long eventsThisRun_ = 0
 
std::mutex fileDeleteLock_
 
int fileDescriptor_ = -1
 
unsigned int fileListIndex_ = 0
 
const bool fileListMode_
 
std::vector< std::string > fileNames_
 
std::list< std::pair< int,
std::string > > 
fileNamesToDelete_
 
tbb::concurrent_queue
< InputFile * > 
fileQueue_
 
std::list< std::pair< int,
InputFile * > > 
filesToDelete_
 
evf::FastMonitoringServicefms_ = 0
 
tbb::concurrent_queue
< InputChunk * > 
freeChunks_
 
std::string fuOutputDir_
 
const bool getLSFromFilename_
 
uint32_t GTPEventID_ = 0
 
uint32_t L1EventID_ = 0
 
unsigned int maxBufferedFiles_
 
std::mutex monlock_
 
std::mutex mReader_
 
std::mutex mWakeup_
 
unsigned int nStreams_ = 0
 
unsigned int numBuffers_
 
unsigned int numConcurrentReads_
 
edm::ProcessHistoryID processHistoryID_
 
std::atomic< bool > quit_threads_
 
unsigned int readBlocks_
 
std::atomic< unsigned int > readingFilesCount_
 
std::unique_ptr< std::thread > readSupervisorThread_
 
edm::RunNumber_t runNumber_
 
bool setExceptionState_ = false
 
bool singleBufferMode_
 
std::map< unsigned int,
unsigned int > 
sourceEventsReport_
 
bool startedSupervisorThread_ = false
 
std::condition_variable startupCv_
 
std::mutex startupLock_
 
std::vector< int > * streamFileTrackerPtr_ = 0
 
unsigned char * tcds_pointer_
 
std::vector< unsigned int > thread_quit_signal
 
std::atomic< bool > threadInit_
 
std::vector< unsigned int > tid_active_
 
const bool useL1EventID_
 
const bool verifyAdler32_
 
const bool verifyChecksum_
 
std::vector< ReaderInfoworkerJob_
 
tbb::concurrent_queue
< unsigned int > 
workerPool_
 
std::vector< std::thread * > workerThreads_
 

Friends

class InputChunk
 
class InputFile
 Open Root file and provide MEs ############. More...
 

Additional Inherited Members

- Public Types inherited from edm::InputSource
enum  ItemType {
  IsInvalid, IsStop, IsFile, IsRun,
  IsLumi, IsEvent, IsRepeat, IsSynchronize
}
 
enum  ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents }
 
typedef
ProductRegistryHelper::TypeLabelList 
TypeLabelList
 

Detailed Description

Definition at line 43 of file FedRawDataInputSource.h.

Member Typedef Documentation

Definition at line 136 of file FedRawDataInputSource.h.

Constructor & Destructor Documentation

FedRawDataInputSource::FedRawDataInputSource ( edm::ParameterSet const &  pset,
edm::InputSourceDescription const &  desc 
)
explicit

Definition at line 54 of file FedRawDataInputSource.cc.

References assert(), edm::Timestamp::beginOfTime(), crc32c_hw_test(), cvReader_, daqDirector_, edm::DaqProvenanceHelper::daqInit(), daqProvenanceHelper_, defPath_, dpd_, eventChunkBlock_, eventChunkSize_, Exception, fileDeleteLock_, fileListMode_, filesToDelete_, fms_, freeChunks_, i, evf::FastMonitoringThread::inInit, initFileList(), InputChunk, edm::Timestamp::invalidTimestamp(), numBuffers_, numConcurrentReads_, cppFunctionSkipper::operator, processHistoryID_, edm::InputSource::processHistoryRegistryForUpdate(), edm::InputSource::productRegistryUpdate(), quit_threads_, readBlocks_, readingFilesCount_, readWorker(), edm::InputSource::runAuxiliary(), runNumber_, evf::EvFDaqDirector::setDeleteTracking(), evf::EvFDaqDirector::setFMS(), evf::FastMonitoringService::setInputSource(), evf::FastMonitoringService::setInState(), evf::FastMonitoringService::setInStateSup(), edm::InputSource::setNewRun(), edm::InputSource::setRunAuxiliary(), singleBufferMode_, startupCv_, startupLock_, AlCaHLTBitMon_QueryRunRegistry::string, thread_quit_signal, threadInit_, tid_active_, workerJob_, and workerThreads_.

55  :
57  defPath_(pset.getUntrackedParameter<std::string> ("buDefPath", std::string(getenv("CMSSW_BASE"))+"/src/EventFilter/Utilities/plugins/budef.jsd")),
58  eventChunkSize_(pset.getUntrackedParameter<unsigned int> ("eventChunkSize",32)*1048576),
59  eventChunkBlock_(pset.getUntrackedParameter<unsigned int> ("eventChunkBlock",32)*1048576),
60  numBuffers_(pset.getUntrackedParameter<unsigned int> ("numBuffers",2)),
61  maxBufferedFiles_(pset.getUntrackedParameter<unsigned int> ("maxBufferedFiles",2)),
62  getLSFromFilename_(pset.getUntrackedParameter<bool> ("getLSFromFilename", true)),
63  verifyAdler32_(pset.getUntrackedParameter<bool> ("verifyAdler32", true)),
64  verifyChecksum_(pset.getUntrackedParameter<bool> ("verifyChecksum", true)),
65  useL1EventID_(pset.getUntrackedParameter<bool> ("useL1EventID", false)),
66  fileNames_(pset.getUntrackedParameter<std::vector<std::string>> ("fileNames",std::vector<std::string>())),
67  fileListMode_(pset.getUntrackedParameter<bool> ("fileListMode", false)),
71  eventID_(),
74  tcds_pointer_(0),
75  eventsThisLumi_(0),
76  dpd_(nullptr)
77 {
78  char thishost[256];
79  gethostname(thishost, 255);
80  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: "
81  << std::endl << (eventChunkSize_/1048576)
82  << " MB on host " << thishost;
83 
84  long autoRunNumber = -1;
85  if (fileListMode_) {
86  autoRunNumber = initFileList();
87  if (autoRunNumber<0)
88  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") << "Run number not found from filename";
89  //override run number
90  runNumber_ = (edm::RunNumber_t)autoRunNumber;
91  edm::Service<evf::EvFDaqDirector>()->overrideRunNumber((unsigned int)autoRunNumber);
92  }
93 
95  setNewRun();
96  //todo:autodetect from file name (assert if names differ)
99 
100  std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
101  defPath_ = std::string(getenv("CMSSW_BASE")) + "/" + defPathSuffix;
102  struct stat statbuf;
103  if (stat(defPath_.c_str(), &statbuf)) {
104  defPath_ = std::string(getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
105  if (stat(defPath_.c_str(), &statbuf)) {
106  defPath_ = defPathSuffix;
107  }
108  }
109 
110  dpd_ = new DataPointDefinition();
111  std::string defLabel = "data";
112  DataPointDefinition::getDataPointDefinitionFor(defPath_, dpd_,&defLabel);
113 
114  //make sure that chunk size is N * block size
119 
120  if (!numBuffers_)
121  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") <<
122  "no reading enabled with numBuffers parameter 0";
123 
127 
128  if (!crc32c_hw_test())
129  edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
130 
131  //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
132  if (fileListMode_) {
133  try {
135  } catch(...) {}
136  }
137  else {
139  if (!fms_) {
140  throw cms::Exception("FedRawDataInputSource") << "FastMonitoringService not found";
141  }
142  }
143 
145  if (!daqDirector_)
146  cms::Exception("FedRawDataInputSource") << "EvFDaqDirector not found";
147 
148  //set DaqDirector to delete files in preGlobalEndLumi callback
150  if (fms_) {
152  fms_->setInputSource(this);
155  }
156  //should delete chunks when run stops
157  for (unsigned int i=0;i<numBuffers_;i++) {
159  }
160 
161  quit_threads_ = false;
162 
163  for (unsigned int i=0;i<numConcurrentReads_;i++)
164  {
165  std::unique_lock<std::mutex> lk(startupLock_);
166  //issue a memory fence here and in threads (constructor was segfaulting without this)
167  thread_quit_signal.push_back(false);
168  workerJob_.push_back(ReaderInfo(nullptr,nullptr));
169  cvReader_.push_back(new std::condition_variable);
170  tid_active_.push_back(0);
171  threadInit_.store(false,std::memory_order_release);
172  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker,this,i));
173  startupCv_.wait(lk);
174  }
175 
176  runAuxiliary()->setProcessHistoryID(processHistoryID_);
177 }
int i
Definition: DBlmapReader.cc:9
std::vector< std::string > fileNames_
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:350
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
bool crc32c_hw_test()
Definition: crc32c.cc:354
jsoncollector::DataPointDefinition * dpd_
void setInState(FastMonitoringThread::InputState inputState)
assert(m_qm.get())
void setInStateSup(FastMonitoringThread::InputState inputState)
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
std::vector< std::condition_variable * > cvReader_
static Timestamp beginOfTime()
Definition: Timestamp.h:103
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
edm::ProcessHistoryID processHistoryID_
const edm::DaqProvenanceHelper daqProvenanceHelper_
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
std::vector< std::thread * > workerThreads_
void setInputSource(FedRawDataInputSource *inputSource)
std::list< std::pair< int, InputFile * > > filesToDelete_
std::condition_variable startupCv_
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:262
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:351
std::atomic< bool > threadInit_
std::pair< InputFile *, InputChunk * > ReaderInfo
evf::EvFDaqDirector * daqDirector_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:353
void readWorker(unsigned int tid)
std::vector< unsigned int > thread_quit_signal
std::atomic< unsigned int > readingFilesCount_
evf::FastMonitoringService * fms_
std::vector< unsigned int > tid_active_
unsigned int RunNumber_t
tbb::concurrent_queue< InputChunk * > freeChunks_
void setFMS(evf::FastMonitoringService *fms)
FedRawDataInputSource::~FedRawDataInputSource ( )
virtual

Definition at line 179 of file FedRawDataInputSource.cc.

References cvReader_, deleteFile(), filesToDelete_, i, mReader_, numConcurrentReads_, quit_threads_, readSupervisorThread_, startedSupervisorThread_, thread_quit_signal, and workerThreads_.

180 {
181  quit_threads_=true;
182 
183  //delete any remaining open files
184  for (auto it = filesToDelete_.begin();it!=filesToDelete_.end();it++) {
185  deleteFile(it->second->fileName_);
186  delete it->second;
187  }
189  readSupervisorThread_->join();
190  }
191  else {
192  //join aux threads in case the supervisor thread was not started
193  for (unsigned int i=0;i<workerThreads_.size();i++) {
194  std::unique_lock<std::mutex> lk(mReader_);
195  thread_quit_signal[i]=true;
196  cvReader_[i]->notify_one();
197  lk.unlock();
198  workerThreads_[i]->join();
199  delete workerThreads_[i];
200  }
201  }
202  for (unsigned int i=0;i<numConcurrentReads_;i++) delete cvReader_[i];
203  /*
204  for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
205  InputChunk *ch;
206  while (!freeChunks_.try_pop(ch)) {}
207  delete ch;
208  }
209  */
210 }
int i
Definition: DBlmapReader.cc:9
std::atomic< bool > quit_threads_
std::unique_ptr< std::thread > readSupervisorThread_
std::vector< std::condition_variable * > cvReader_
std::vector< std::thread * > workerThreads_
std::list< std::pair< int, InputFile * > > filesToDelete_
std::vector< unsigned int > thread_quit_signal
void deleteFile(std::string const &)

Member Function Documentation

bool FedRawDataInputSource::checkNextEvent ( )
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 229 of file FedRawDataInputSource.cc.

References evf::EvFDaqDirector::createProcessingNotificationMaybe(), currentLumiSection_, daqDirector_, evf::EvFDaqDirector::emptyLumisectionMode(), event_, eventRunNumber_, eventsThisLumi_, fms_, newFWLiteAna::found, fuOutputDir_, evf::EvFDaqDirector::getEoLSFilePathOnBU(), evf::EvFDaqDirector::getEoLSFilePathOnFU(), evf::EvFDaqDirector::getEoRFilePathOnFU(), getLSFromFilename_, evf::FastMonitoringThread::inWaitInput, L1EventID_, evf::EvFDaqDirector::lockFULocal2(), maybeOpenNewLumiSection(), evf::EvFDaqDirector::newLumi, nextEvent(), evf::EvFDaqDirector::noFile, readSupervisor(), readSupervisorThread_, reportEventsThisLumiInSource(), edm::InputSource::resetLuminosityBlockAuxiliary(), evf::EvFDaqDirector::runEnded, edm::InputSource::setEventCached(), evf::FastMonitoringService::setInState(), startedSupervisorThread_, startupCv_, startupLock_, AlCaHLTBitMon_QueryRunRegistry::string, and evf::EvFDaqDirector::unlockFULocal2().

230 {
232  {
233  //late init of directory variable
235 
236  //this thread opens new files and dispatches reading to worker readers
237  //threadInit_.store(false,std::memory_order_release);
238  std::unique_lock<std::mutex> lk(startupLock_);
239  readSupervisorThread_.reset(new std::thread(&FedRawDataInputSource::readSupervisor,this));
241  startupCv_.wait(lk);
242  }
243  //signal hltd to start event accounting
247  switch (nextEvent() ) {
249  //maybe create EoL file in working directory before ending run
250  struct stat buf;
251  if ( currentLumiSection_ > 0) {
252  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
253  if (eolFound) {
255  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
256  if ( !found ) {
258  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
259  close(eol_fd);
261  }
262  }
263  }
264  //also create EoR file in FU data directory
265  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(),&buf) == 0);
266  if (!eorFound) {
267  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
268  close(eor_fd);
269  }
271  eventsThisLumi_=0;
273  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
274  return false;
275  }
277  //this is not reachable
278  return true;
279  }
281  //std::cout << "--------------NEW LUMI---------------" << std::endl;
282  return true;
283  }
284  default: {
285  if (!getLSFromFilename_) {
286  //get new lumi from file header
287  if (event_->lumi() > currentLumiSection_) {
289  eventsThisLumi_=0;
290  maybeOpenNewLumiSection( event_->lumi() );
291  }
292  }
293  eventRunNumber_=event_->run();
294  L1EventID_ = event_->event();
295 
296  setEventCached();
297 
298  return true;
299  }
300  }
301 }
void maybeOpenNewLumiSection(const uint32_t lumiSection)
void setInState(FastMonitoringThread::InputState inputState)
void createProcessingNotificationMaybe() const
std::unique_ptr< std::thread > readSupervisorThread_
std::string getEoLSFilePathOnBU(const unsigned int ls) const
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:382
std::unique_ptr< FRDEventMsgView > event_
std::condition_variable startupCv_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:365
evf::EvFDaqDirector * daqDirector_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
evf::FastMonitoringService * fms_
bool emptyLumisectionMode() const
std::string getEoLSFilePathOnFU(const unsigned int ls) const
evf::EvFDaqDirector::FileStatus nextEvent()
std::string getEoRFilePathOnFU() const
void FedRawDataInputSource::createBoLSFile ( const uint32_t  lumiSection,
bool  checkIfExists 
)
private

Definition at line 303 of file FedRawDataInputSource.cc.

References daqDirector_, evf::EvFDaqDirector::getBoLSFilePathOnFU(), and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by maybeOpenNewLumiSection().

304 {
305  //used for backpressure mechanisms and monitoring
306  const std::string fuBoLS = daqDirector_->getBoLSFilePathOnFU(lumiSection);
307  struct stat buf;
308  if (checkIfExists==false || stat(fuBoLS.c_str(), &buf) != 0) {
309  int bol_fd = open(fuBoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
310  close(bol_fd);
311  }
312 }
std::string getBoLSFilePathOnFU(const unsigned int ls) const
evf::EvFDaqDirector * daqDirector_
void FedRawDataInputSource::deleteFile ( std::string const &  fileName)
private

Definition at line 635 of file FedRawDataInputSource.cc.

References cppFunctionSkipper::exception, fileListMode_, MillePedeFileConverter_cfg::fileName, LogDebug, fed_dqm_sourceclient-live_cfg::path, and MatrixUtil::remove().

Referenced by getNextEvent(), read(), and ~FedRawDataInputSource().

636 {
637  //no deletion in file list mode
638  if (fileListMode_) return;
639 
640  const boost::filesystem::path filePath(fileName);
641  LogDebug("FedRawDataInputSource") << "Deleting input file -:" << fileName;
642  try {
643  //sometimes this fails but file gets deleted
644  boost::filesystem::remove(filePath);
645  }
646  catch (const boost::filesystem::filesystem_error& ex)
647  {
648  edm::LogError("FedRawDataInputSource") << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
649  << ". Trying again.";
650  usleep(100000);
651  try {
652  boost::filesystem::remove(filePath);
653  }
654  catch (const boost::filesystem::filesystem_error&) {/*file gets deleted first time but exception is still thrown*/}
655  }
656  catch (std::exception& ex)
657  {
658  edm::LogError("FedRawDataInputSource") << " - deleteFile std::exception CAUGHT -: " << ex.what()
659  << ". Trying again.";
660  usleep(100000);
661  try {
662  boost::filesystem::remove(filePath);
663  } catch (std::exception&) {/*file gets deleted first time but exception is still thrown*/}
664  }
665 }
#define LogDebug(id)
bool FedRawDataInputSource::exceptionState ( )
inlineprivate

Definition at line 74 of file FedRawDataInputSource.h.

References setExceptionState_.

Referenced by InputFile::advance(), and getNextEvent().

void FedRawDataInputSource::fillDescriptions ( edm::ConfigurationDescriptions descriptions)
static

Definition at line 212 of file FedRawDataInputSource.cc.

References edm::ConfigurationDescriptions::add(), edm::ParameterSetDescription::addUntracked(), edm::ParameterSetDescription::setAllowAnything(), edm::ParameterSetDescription::setComment(), and edm::ParameterDescriptionNode::setComment().

213 {
215  desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk");
216  desc.addUntracked<unsigned int> ("eventChunkSize",32)->setComment("Input buffer (chunk) size");
217  desc.addUntracked<unsigned int> ("eventChunkBlock",32)->setComment("Block size used in a single file read call (must be smaller or equal to buffer size)");
218  desc.addUntracked<unsigned int> ("numBuffers",2)->setComment("Number of buffers used for reading input");
219  desc.addUntracked<unsigned int> ("maxBufferedFiles",2)->setComment("Maximum number of simultaneously buffered raw files");
220  desc.addUntracked<bool> ("verifyAdler32", true)->setComment("Verify event Adler32 checksum with FRDv3 or v4");
221  desc.addUntracked<bool> ("verifyChecksum", true)->setComment("Verify event CRC-32C checksum of FRDv5 or higher");
222  desc.addUntracked<bool> ("useL1EventID", false)->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
223  desc.addUntracked<bool> ("fileListMode", false)->setComment("Use fileNames parameter to directly specify raw files to open");
224  desc.addUntracked<std::vector<std::string>> ("fileNames", std::vector<std::string>())->setComment("file list used when fileListMode is enabled");
225  desc.setAllowAnything();
226  descriptions.add("source", desc);
227 }
void setComment(std::string const &value)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void setAllowAnything()
allow any parameter label/value pairs
void setComment(std::string const &value)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection ( FEDRawDataCollection rawData)
private

Definition at line 740 of file FedRawDataInputSource.cc.

References assert(), FEDRawData::data(), event(), event_, fedt_struct::eventsize, evf::evtn::evm_board_sense(), Exception, FED_EVSZ_EXTRACT, FED_SOID_EXTRACT, FEDRawDataCollection::FEDData(), l1tstage2_dqm_sourceclient-live_cfg::fedId, evf::evtn::get(), evf::evtn::getgpshigh(), evf::evtn::getgpslow(), evf::evtn::gtpe_board_sense(), evf::evtn::gtpe_get(), GTPEventID_, FEDNumbering::MAXFEDID, FEDNumbering::MINTCDSuTCAFEDID, FEDNumbering::MINTriggerEGTPFEDID, FEDNumbering::MINTriggerGTPFEDID, FEDRawData::resize(), fedh_struct::sourceid, and tcds_pointer_.

Referenced by read().

741 {
742  edm::TimeValue_t time;
743  timeval stv;
744  gettimeofday(&stv,0);
745  time = stv.tv_sec;
746  time = (time << 32) + stv.tv_usec;
747  edm::Timestamp tstamp(time);
748 
749  uint32_t eventSize = event_->eventSize();
750  char* event = (char*)event_->payload();
751  GTPEventID_=0;
752  tcds_pointer_ = 0;
753  while (eventSize > 0) {
754  assert(eventSize>=sizeof(fedt_t));
755  eventSize -= sizeof(fedt_t);
756  const fedt_t* fedTrailer = (fedt_t*) (event + eventSize);
757  const uint32_t fedSize = FED_EVSZ_EXTRACT(fedTrailer->eventsize) << 3; //trailer length counts in 8 bytes
758  assert(eventSize>=fedSize - sizeof(fedt_t));
759  eventSize -= (fedSize - sizeof(fedt_t));
760  const fedh_t* fedHeader = (fedh_t *) (event + eventSize);
761  const uint16_t fedId = FED_SOID_EXTRACT(fedHeader->sourceid);
762  if(fedId>FEDNumbering::MAXFEDID)
763  {
764  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
765  }
766  if (fedId == FEDNumbering::MINTCDSuTCAFEDID) {
767  tcds_pointer_ = (unsigned char *)(event + eventSize );
768  }
769  if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
770  if (evf::evtn::evm_board_sense((unsigned char*) fedHeader,fedSize))
771  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,true);
772  else
773  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,false);
774  //evf::evtn::evm_board_setformat(fedSize);
775  const uint64_t gpsl = evf::evtn::getgpslow((unsigned char*) fedHeader);
776  const uint64_t gpsh = evf::evtn::getgpshigh((unsigned char*) fedHeader);
777  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
778  }
779  //take event ID from GTPE FED
780  if (fedId == FEDNumbering::MINTriggerEGTPFEDID && GTPEventID_==0) {
781  if (evf::evtn::gtpe_board_sense((unsigned char*)fedHeader)) {
782  GTPEventID_ = evf::evtn::gtpe_get((unsigned char*) fedHeader);
783  }
784  }
785  FEDRawData& fedData = rawData.FEDData(fedId);
786  fedData.resize(fedSize);
787  memcpy(fedData.data(), event + eventSize, fedSize);
788  }
789  assert(eventSize == 0);
790 
791  return tstamp;
792 }
unsigned int getgpshigh(const unsigned char *)
bool gtpe_board_sense(const unsigned char *p)
unsigned int get(const unsigned char *, bool)
assert(m_qm.get())
unsigned int sourceid
Definition: fed_header.h:32
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
void resize(size_t newsize)
Definition: FEDRawData.cc:32
struct fedt_struct fedt_t
#define FED_EVSZ_EXTRACT(a)
Definition: fed_trailer.h:36
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
unsigned long long TimeValue_t
Definition: Timestamp.h:28
bool evm_board_sense(const unsigned char *p, size_t size)
unsigned long long uint64_t
Definition: Time.h:15
std::unique_ptr< FRDEventMsgView > event_
unsigned int eventsize
Definition: fed_trailer.h:33
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:28
unsigned int gtpe_get(const unsigned char *)
unsigned int getgpslow(const unsigned char *)
#define FED_SOID_EXTRACT(a)
Definition: fed_header.h:53
std::pair< bool, unsigned int > FedRawDataInputSource::getEventReport ( unsigned int  lumi,
bool  erase 
)

Definition at line 1391 of file FedRawDataInputSource.cc.

References CommonMethods::lock(), monlock_, runTheMatrix::ret, and sourceEventsReport_.

Referenced by evf::FastMonitoringService::preGlobalEndLumi().

1392 {
1393  std::lock_guard<std::mutex> lock(monlock_);
1394  auto itr = sourceEventsReport_.find(lumi);
1395  if (itr!=sourceEventsReport_.end()) {
1396  auto && ret = std::pair<bool,unsigned int>(true,itr->second);
1397  if (erase)
1398  sourceEventsReport_.erase(itr);
1399  return ret;
1400  }
1401  else
1402  return std::pair<bool,unsigned int>(false,0);
1403 }
tuple ret
prodAgent to be discontinued
tuple lumi
Definition: fjr2json.py:35
std::map< unsigned int, unsigned int > sourceEventsReport_
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getFile ( unsigned int &  ls,
std::string &  nextFile,
uint32_t &  fsize,
uint64_t &  lockWaitTime 
)
private

Definition at line 1434 of file FedRawDataInputSource.cc.

References fileListIndex_, MillePedeFileConverter_cfg::fileName, fileNames_, evf::EvFDaqDirector::newFile, fed_dqm_sourceclient-live_cfg::path, evf::EvFDaqDirector::runEnded, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by readSupervisor().

1435 {
1436  if (fileListIndex_ < fileNames_.size()) {
1437  nextFile = fileNames_[fileListIndex_];
1438  if (nextFile.find("file://")==0) nextFile=nextFile.substr(7);
1439  else if (nextFile.find("file:")==0) nextFile=nextFile.substr(5);
1440  boost::filesystem::path fileName = nextFile;
1441  std::string fileStem = fileName.stem().string();
1442  if (fileStem.find("ls")) fileStem = fileStem.substr(fileStem.find("ls")+2);
1443  if (fileStem.find("_")) fileStem = fileStem.substr(0,fileStem.find("_"));
1444  ls = boost::lexical_cast<unsigned int>(fileStem);
1445  //fsize = 0;
1446  //lockWaitTime = 0;
1447  fileListIndex_++;
1449  }
1450  else
1452 }
std::vector< std::string > fileNames_
def ls
Definition: eostools.py:348
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent ( )
inlineprivate

Definition at line 365 of file FedRawDataInputSource.cc.

References InputFile::advance(), assert(), bufferInputRead_, InputFile::bufferPosition_, checkEvery_, chunkIsFree_, InputFile::chunkPosition_, InputFile::chunks_, crc32c(), InputFile::currentChunk_, currentFile_, currentFileIndex_, currentLumiSection_, cvWakeup_, daqDirector_, deleteFile(), detectedFRDversion_, event_, eventChunkSize_, eventsThisLumi_, Exception, exceptionState(), fileDeleteLock_, fileListMode_, InputFile::fileName_, fileQueue_, InputFile::fileSize_, filesToDelete_, fms_, FRDHeaderVersionSize, freeChunks_, getLSFromFilename_, evf::EvFDaqDirector::getStreamFileTracker(), evf::FastMonitoringThread::inCachedEvent, evf::FastMonitoringThread::inChecksumEvent, evf::FastMonitoringThread::inChunkReceived, evf::FastMonitoringThread::inNewLumi, evf::FastMonitoringThread::inProcessingFile, evf::FastMonitoringThread::inRunEnd, evf::FastMonitoringThread::inWaitChunk, evf::FastMonitoringThread::inWaitInput, evf::EvFDaqDirector::isSingleStreamThread(), dttmaxenums::L, InputFile::lumi_, maybeOpenNewLumiSection(), InputFile::moveToPreviousChunk(), mWakeup_, InputFile::nChunks_, InputFile::nEvents_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, InputFile::nProcessed_, nStreams_, InputFile::parent_, readingFilesCount_, readNextChunkIntoBuffer(), reportEventsThisLumiInSource(), InputFile::rewindChunk(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, evf::EvFDaqDirector::sameFile, evf::FastMonitoringService::setExceptionDetected(), setExceptionState_, evf::FastMonitoringService::setInState(), singleBufferMode_, mps_update::status, InputFile::status_, streamFileTrackerPtr_, threadError(), mps_check::timeout, evf::EvFDaqDirector::updateFileIndex(), verifyAdler32_, verifyChecksum_, and InputFile::waitForChunk().

Referenced by nextEvent().

366 {
367 
369  if (!currentFile_)
370  {
371  if (!streamFileTrackerPtr_) {
375  }
376 
379  if (!fileQueue_.try_pop(currentFile_))
380  {
381  //sleep until wakeup (only in single-buffer mode) or timeout
382  std::unique_lock<std::mutex> lkw(mWakeup_);
383  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_)
385  }
386  status = currentFile_->status_;
387  if ( status == evf::EvFDaqDirector::runEnded)
388  {
390  delete currentFile_;
391  currentFile_=nullptr;
392  return status;
393  }
394  else if ( status == evf::EvFDaqDirector::runAbort)
395  {
396  throw cms::Exception("FedRawDataInputSource::getNextEvent") << "Run has been aborted by the input source reader thread";
397  }
398  else if (status == evf::EvFDaqDirector::newLumi)
399  {
401  if (getLSFromFilename_) {
404  eventsThisLumi_=0;
406  }
407  }
408  else {//let this be picked up from next event
410  }
411 
412  delete currentFile_;
413  currentFile_=nullptr;
414  return status;
415  }
416  else if (status == evf::EvFDaqDirector::newFile) {
419  }
420  else
421  assert(0);
422  }
424 
425  //file is empty
426  if (!currentFile_->fileSize_) {
428  //try to open new lumi
430  if (getLSFromFilename_)
433  eventsThisLumi_=0;
435  }
436  //immediately delete empty file
438  delete currentFile_;
439  currentFile_=nullptr;
441  }
442 
443  //file is finished
446  //release last chunk (it is never released elsewhere)
449  {
450  throw cms::Exception("FedRawDataInputSource::getNextEvent")
451  << "Fully processed " << currentFile_->nProcessed_
452  << " from the file " << currentFile_->fileName_
453  << " but according to BU JSON there should be "
454  << currentFile_->nEvents_ << " events";
455  }
456  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
457  if (singleBufferMode_) {
458  std::unique_lock<std::mutex> lkw(mWakeup_);
459  cvWakeup_.notify_one();
460  }
463  //put the file in pending delete list;
464  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
465  filesToDelete_.push_back(std::pair<int,InputFile*>(currentFileIndex_,currentFile_));
466  }
467  else {
468  //in single-thread and stream jobs, events are already processed
470  delete currentFile_;
471  }
472  currentFile_=nullptr;
474  }
475 
476 
477  //file is too short
479  {
480  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
481  "Premature end of input file while reading event header";
482  }
483  if (singleBufferMode_) {
484 
485  //should already be there
488  usleep(10000);
490  }
492 
493  unsigned char *dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
494 
495  //conditions when read amount is not sufficient for the header to fit
496  if (!bufferInputRead_ || bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_]
498  {
500 
501  if (detectedFRDversion_==0) {
502  detectedFRDversion_=*((uint32*)dataPosition);
503  if (detectedFRDversion_>5)
504  throw cms::Exception("FedRawDataInputSource::getNextEvent")
505  << "Unknown FRD version -: " << detectedFRDversion_;
506  assert(detectedFRDversion_>=1);
507  }
508 
509  //recalculate chunk position
510  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
511  if ( bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_])
512  {
513  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
514  "Premature end of input file while reading event header";
515  }
516  }
517 
518  event_.reset( new FRDEventMsgView(dataPosition) );
519  if (event_->size()>eventChunkSize_) {
520  throw cms::Exception("FedRawDataInputSource::getNextEvent")
521  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
522  << " run:" << event_->run() << " of size:" << event_->size()
523  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
524  }
525 
526  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
527 
529  {
530  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
531  "Premature end of input file while reading event data";
532  }
533  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
535  //recalculate chunk position
536  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
537  event_.reset( new FRDEventMsgView(dataPosition) );
538  }
539  currentFile_->bufferPosition_ += event_->size();
540  currentFile_->chunkPosition_ += event_->size();
541  //last chunk is released when this function is invoked next time
542 
543  }
544  //multibuffer mode:
545  else
546  {
547  //wait for the current chunk to become added to the vector
550  usleep(10000);
552  }
554 
555  //check if header is at the boundary of two chunks
556  chunkIsFree_ = false;
557  unsigned char *dataPosition;
558 
559  //read header, copy it to a single chunk if necessary
560  bool chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]);
561 
562  event_.reset( new FRDEventMsgView(dataPosition) );
563  if (event_->size()>eventChunkSize_) {
564  throw cms::Exception("FedRawDataInputSource::getNextEvent")
565  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
566  << " run:" << event_->run() << " of size:" << event_->size()
567  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
568  }
569 
570  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
571 
573  {
574  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
575  "Premature end of input file while reading event data";
576  }
577 
578  if (chunkEnd) {
579  //header was at the chunk boundary, we will have to move payload as well
580  currentFile_->moveToPreviousChunk(msgSize,FRDHeaderVersionSize[detectedFRDversion_]);
581  chunkIsFree_ = true;
582  }
583  else {
584  //header was contiguous, but check if payload fits the chunk
585  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
586  //rewind to header start position
587  currentFile_->rewindChunk(FRDHeaderVersionSize[detectedFRDversion_]);
588  //copy event to a chunk start and move pointers
589  chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]+msgSize);
590  assert(chunkEnd);
591  chunkIsFree_=true;
592  //header is moved
593  event_.reset( new FRDEventMsgView(dataPosition) );
594  }
595  else {
596  //everything is in a single chunk, only move pointers forward
597  chunkEnd = currentFile_->advance(dataPosition,msgSize);
598  assert(!chunkEnd);
599  chunkIsFree_=false;
600  }
601  }
602  }//end multibuffer mode
604 
605  if (verifyChecksum_ && event_->version() >= 5)
606  {
607  uint32_t crc=0;
608  crc = crc32c(crc,(const unsigned char*)event_->payload(),event_->eventSize());
609  if ( crc != event_->crc32c() ) {
611  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
612  "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() <<
613  " but calculated 0x" << crc;
614  }
615  }
616  else if ( verifyAdler32_ && event_->version() >= 3)
617  {
618  uint32_t adler = adler32(0L,Z_NULL,0);
619  adler = adler32(adler,(Bytef*)event_->payload(),event_->eventSize());
620 
621  if ( adler != event_->adler32() ) {
623  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
624  "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() <<
625  " but calculated 0x" << adler;
626  }
627  }
629 
631 
633 }
unsigned int lumi_
uint32_t chunkPosition_
std::condition_variable cvWakeup_
void setExceptionDetected(unsigned int ls)
std::vector< int > * getStreamFileTracker()
void maybeOpenNewLumiSection(const uint32_t lumiSection)
void setInState(FastMonitoringThread::InputState inputState)
void rewindChunk(const size_t size)
const uint32 FRDHeaderVersionSize[6]
assert(m_qm.get())
std::vector< int > * streamFileTrackerPtr_
evf::EvFDaqDirector::FileStatus status_
int timeout
Definition: mps_check.py:51
FedRawDataInputSource * parent_
uint32_t bufferPosition_
void updateFileIndex(int const &fileIndex)
bool advance(unsigned char *&dataPosition, const size_t size)
void moveToPreviousChunk(const size_t size, const size_t offset)
bool waitForChunk(unsigned int chunkid)
unsigned int uint32
Definition: MsgTools.h:13
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
Definition: crc32c.cc:340
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
unsigned int currentChunk_
tbb::concurrent_queue< InputFile * > fileQueue_
std::unique_ptr< FRDEventMsgView > event_
std::string fileName_
tbb::concurrent_vector< InputChunk * > chunks_
evf::EvFDaqDirector * daqDirector_
std::atomic< unsigned int > readingFilesCount_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
void deleteFile(std::string const &)
evf::FastMonitoringService * fms_
tbb::concurrent_queue< InputChunk * > freeChunks_
void readNextChunkIntoBuffer(InputFile *file)
tuple status
Definition: mps_update.py:57
int FedRawDataInputSource::grabNextJsonFile ( boost::filesystem::path const &  jsonSourcePath)
private

Definition at line 794 of file FedRawDataInputSource.cc.

References filterCSVwithJSON::copy, daqDirector_, AlCaHLTBitMon_QueryRunRegistry::data, jsoncollector::DataPoint::deserialize(), reco::dp, dpd_, alignCSCRings::e, cppFunctionSkipper::exception, Exception, fuOutputDir_, jsoncollector::DataPoint::getData(), jsoncollector::DataPointDefinition::getNames(), i, LogDebug, Json::Reader::parse(), fed_dqm_sourceclient-live_cfg::path, matplotRender::reader, MatrixUtil::remove(), AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, and evf::EvFDaqDirector::unlockFULocal().

Referenced by readSupervisor().

795 {
797  try {
798  // assemble json destination path
800 
801  //TODO:should be ported to use fffnaming
802  std::ostringstream fileNameWithPID;
803  fileNameWithPID << jsonSourcePath.stem().string() << "_pid"
804  << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
805  jsonDestPath /= fileNameWithPID.str();
806 
807  LogDebug("FedRawDataInputSource") << "JSON rename -: " << jsonSourcePath << " to "
808  << jsonDestPath;
809  try {
810  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
811  }
812  catch (const boost::filesystem::filesystem_error& ex)
813  {
814  // Input dir gone?
815  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
816  // << " Maybe the file is not yet visible by FU. Trying again in one second";
817  sleep(1);
818  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
819  }
821 
822  try {
823  //sometimes this fails but file gets deleted
824  boost::filesystem::remove(jsonSourcePath);
825  }
826  catch (const boost::filesystem::filesystem_error& ex)
827  {
828  // Input dir gone?
829  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
830  }
831  catch (std::exception& ex)
832  {
833  // Input dir gone?
834  edm::LogError("FedRawDataInputSource") << "grabNextFile std::exception CAUGHT -: " << ex.what();
835  }
836 
837  boost::filesystem::ifstream ij(jsonDestPath);
838  Json::Value deserializeRoot;
840 
841  if (!reader.parse(ij, deserializeRoot))
842  throw std::runtime_error("Cannot deserialize input JSON file");
843 
844  //read BU JSON
846  DataPoint dp;
847  dp.deserialize(deserializeRoot);
848  bool success = false;
849  for (unsigned int i=0;i<dpd_->getNames().size();i++) {
850  if (dpd_->getNames().at(i)=="NEvents")
851  if (i<dp.getData().size()) {
852  data = dp.getData()[i];
853  success=true;
854  }
855  }
856  if (!success) {
857  if (dp.getData().size())
858  data = dp.getData()[0];
859  else
860  throw cms::Exception("FedRawDataInputSource::grabNextJsonFile") <<
861  " error reading number of events from BU JSON -: No input value " << data;
862  }
863  return boost::lexical_cast<int>(data);
864 
865  }
866  catch (const boost::filesystem::filesystem_error& ex)
867  {
868  // Input dir gone?
870  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
871  }
872  catch (std::runtime_error e)
873  {
874  // Another process grabbed the file and NFS did not register this
876  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - runtime Exception -: " << e.what();
877  }
878 
879  catch( boost::bad_lexical_cast const& ) {
880  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - error parsing number of events from BU JSON. "
881  << "Input value is -: " << data;
882  }
883 
884  catch (std::exception e)
885  {
886  // BU run directory disappeared?
888  edm::LogError("FedRawDataInputSource") << "grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
889  }
890 
891  return -1;
892 }
#define LogDebug(id)
std::vector< std::string > & getData()
Definition: DataPoint.h:58
int i
Definition: DBlmapReader.cc:9
jsoncollector::DataPointDefinition * dpd_
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
Represents a JSON value.
Definition: value.h:111
virtual void deserialize(Json::Value &root)
Definition: DataPoint.cc:56
auto dp
Definition: deltaR.h:22
evf::EvFDaqDirector * daqDirector_
Unserialize a JSON document into a Value.
Definition: reader.h:16
std::vector< std::string > const & getNames()
long FedRawDataInputSource::initFileList ( )
private

Definition at line 1405 of file FedRawDataInputSource.cc.

References a, b, end, MillePedeFileConverter_cfg::fileName, fileNames_, fed_dqm_sourceclient-live_cfg::path, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FedRawDataInputSource().

1406 {
1407  std::sort(fileNames_.begin(),fileNames_.end(),
1408  [](std::string a, std::string b) {
1409  if (a.rfind("/")!=std::string::npos) a=a.substr(a.rfind("/"));
1410  if (b.rfind("/")!=std::string::npos) b=b.substr(b.rfind("/"));
1411  return b > a;});
1412 
1413  if (fileNames_.size()) {
1414  //get run number from first file in the vector
1416  std::string fileStem = fileName.stem().string();
1417  auto end = fileStem.find("_");
1418  if (fileStem.find("run")==0) {
1419  std::string runStr = fileStem.substr(3,end-3);
1420  try {
1421  //get long to support test run numbers < 2^32
1422  long rval = boost::lexical_cast<long>(runStr);
1423  edm::LogInfo("FedRawDataInputSource") << "Autodetected run number in fileListMode -: "<< rval;
1424  return rval;
1425  }
1426  catch( boost::bad_lexical_cast const& ) {
1427  edm::LogWarning("FedRawDataInputSource") << "Unable to autodetect run number in fileListMode from file -: "<< fileName;
1428  }
1429  }
1430  }
1431  return -1;
1432 }
std::vector< std::string > fileNames_
#define end
Definition: vmac.h:37
unsigned long long int rval
Definition: vlib.h:22
double b
Definition: hdecay.h:120
double a
Definition: hdecay.h:121
void FedRawDataInputSource::maybeOpenNewLumiSection ( const uint32_t  lumiSection)
private

Definition at line 314 of file FedRawDataInputSource.cc.

References createBoLSFile(), currentLumiSection_, daqDirector_, newFWLiteAna::found, evf::EvFDaqDirector::getEoLSFilePathOnFU(), edm::Timestamp::invalidTimestamp(), evf::EvFDaqDirector::lockFULocal2(), edm::InputSource::luminosityBlock(), edm::InputSource::luminosityBlockAuxiliary(), processHistoryID_, edm::InputSource::resetLuminosityBlockAuxiliary(), edm::InputSource::run(), edm::InputSource::runAuxiliary(), edm::InputSource::setLuminosityBlockAuxiliary(), AlCaHLTBitMon_QueryRunRegistry::string, and evf::EvFDaqDirector::unlockFULocal2().

Referenced by checkNextEvent(), and getNextEvent().

315 {
317  || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
318 
319  if ( currentLumiSection_ > 0) {
320  const std::string fuEoLS =
322  struct stat buf;
323  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
324  if ( !found ) {
326  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
327  close(eol_fd);
328  createBoLSFile(lumiSection,false);
330  }
331  }
332  else createBoLSFile(lumiSection,true);//needed for initial lumisection
333 
334  currentLumiSection_ = lumiSection;
335 
337 
338  timeval tv;
339  gettimeofday(&tv, 0);
340  const edm::Timestamp lsopentime( (unsigned long long) tv.tv_sec * 1000000 + (unsigned long long) tv.tv_usec );
341 
342  edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary =
344  runAuxiliary()->run(),
345  lumiSection, lsopentime,
347 
348  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
349  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
350 
351  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: "<< lumiSection;
352  }
353 }
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists)
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:590
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:357
edm::ProcessHistoryID processHistoryID_
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:596
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:262
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:365
evf::EvFDaqDirector * daqDirector_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:265
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::nextEvent ( )
inlineprivate

Definition at line 355 of file FedRawDataInputSource.cc.

References getNextEvent(), svgfig::load(), evf::EvFDaqDirector::noFile, edm::shutdown_flag, and mps_update::status.

Referenced by checkNextEvent().

356 {
358  while ((status = getNextEvent())==evf::EvFDaqDirector::noFile)
359  {
360  if (edm::shutdown_flag.load(std::memory_order_relaxed)) break;
361  }
362  return status;
363 }
volatile std::atomic< bool > shutdown_flag
def load
Definition: svgfig.py:546
evf::EvFDaqDirector::FileStatus getNextEvent()
tuple status
Definition: mps_update.py:57
void FedRawDataInputSource::postForkReacquireResources ( std::shared_ptr< edm::multicore::MessageReceiverForSource )
overrideprivatevirtual

Definition at line 897 of file FedRawDataInputSource.cc.

References edm::Timestamp::beginOfTime(), edm::Timestamp::invalidTimestamp(), runNumber_, and edm::InputSource::setRunAuxiliary().

898 {
899  InputSource::rewind();
903 }
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
static Timestamp beginOfTime()
Definition: Timestamp.h:103
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:353
void FedRawDataInputSource::preForkReleaseResources ( )
overrideprivatevirtual

Reimplemented from edm::RawInputSource.

Definition at line 894 of file FedRawDataInputSource.cc.

895 {}
void FedRawDataInputSource::read ( edm::EventPrincipal eventPrincipal)
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 668 of file FedRawDataInputSource.cc.

References assert(), printConversionInfo::aux, edm::DaqProvenanceHelper::branchDescription(), checkEvery_, chunkIsFree_, InputFile::chunks_, InputFile::currentChunk_, currentFile_, currentLumiSection_, daqProvenanceHelper_, deleteFile(), edm::DaqProvenanceHelper::dummyProvenance(), eventID_, eventRunNumber_, eventsThisLumi_, fileDeleteLock_, filesToDelete_, fillFEDRawDataCollection(), fms_, freeChunks_, GTPEventID_, i, evf::FastMonitoringThread::inNoRequest, evf::FastMonitoringThread::inReadCleanup, evf::FastMonitoringThread::inReadEvent, L1EventID_, edm::RawInputSource::makeEvent(), evf::evtn::makeEventAuxiliary(), eostools::move(), InputFile::nProcessed_, nStreams_, edm::EventAuxiliary::PhysicsTrigger, edm::InputSource::processGUID(), processHistoryID_, edm::EventPrincipal::put(), record, evf::FastMonitoringService::setInState(), edm::EventAuxiliary::setProcessHistoryID(), streamFileTrackerPtr_, tcds_pointer_, and useL1EventID_.

Referenced by edmIntegrityCheck.PublishToFileSystem::get(), Vispa.Plugins.EdmBrowser.EdmDataAccessor.EdmDataAccessor::goto(), readNextChunkIntoBuffer(), readWorker(), and Vispa.Plugins.EdmBrowser.EdmDataAccessor.EdmDataAccessor::setFilterBranches().

669 {
671  std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
672  edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData);
673 
674  if (useL1EventID_){
676  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
678  aux.setProcessHistoryID(processHistoryID_);
679  makeEvent(eventPrincipal, aux);
680  }
681  else if(tcds_pointer_==0){
684  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
686  aux.setProcessHistoryID(processHistoryID_);
687  makeEvent(eventPrincipal, aux);
688  }
689  else{
690  evf::evtn::TCDSRecord record((unsigned char *)(tcds_pointer_));
693  processGUID());
695  makeEvent(eventPrincipal, aux);
696  }
697 
698 
699 
700  std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
701 
702  //FWCore/Sources DaqProvenanceHelper before 7_1_0_pre3
703  //eventPrincipal.put(daqProvenanceHelper_.constBranchDescription_, edp,
704  // daqProvenanceHelper_.dummyProvenance_);
705 
706  eventPrincipal.put(daqProvenanceHelper_.branchDescription(), std::move(edp),
708 
709  eventsThisLumi_++;
711 
712  //this old file check runs no more often than every 10 events
713  if (!((currentFile_->nProcessed_-1)%(checkEvery_))) {
714  //delete files that are not in processing
715  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
716  auto it = filesToDelete_.begin();
717  while (it!=filesToDelete_.end()) {
718  bool fileIsBeingProcessed = false;
719  for (unsigned int i=0;i<nStreams_;i++) {
720  if (it->first == streamFileTrackerPtr_->at(i)) {
721  fileIsBeingProcessed = true;
722  break;
723  }
724  }
725  if (!fileIsBeingProcessed) {
726  deleteFile(it->second->fileName_);
727  delete it->second;
728  it = filesToDelete_.erase(it);
729  }
730  else it++;
731  }
732 
733  }
735  chunkIsFree_=false;
737  return;
738 }
int i
Definition: DBlmapReader.cc:9
JetCorrectorParameters::Record record
Definition: classes.h:7
void setInState(FastMonitoringThread::InputState inputState)
assert(m_qm.get())
std::vector< int > * streamFileTrackerPtr_
ProductProvenance const & dummyProvenance() const
edm::EventAuxiliary makeEventAuxiliary(TCDSRecord *record, unsigned int runNumber, unsigned int lumiSection, std::string const &processGUID)
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:217
def move
Definition: eostools.py:510
BranchDescription const & branchDescription() const
edm::ProcessHistoryID processHistoryID_
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance) const
const edm::DaqProvenanceHelper daqProvenanceHelper_
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
unsigned int currentChunk_
void setProcessHistoryID(ProcessHistoryID const &phid)
tbb::concurrent_vector< InputChunk * > chunks_
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
void deleteFile(std::string const &)
evf::FastMonitoringService * fms_
tbb::concurrent_queue< InputChunk * > freeChunks_
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection &)
void FedRawDataInputSource::readNextChunkIntoBuffer ( InputFile file)
private

Definition at line 1326 of file FedRawDataInputSource.cc.

References bufferInputRead_, InputFile::chunkPosition_, InputFile::chunks_, eventChunkBlock_, eventChunkSize_, Exception, fileDescriptor_, InputFile::fileName_, InputFile::fileSize_, i, plotBeamSpotDB::last, LogDebug, read(), and readBlocks_.

Referenced by getNextEvent().

1327 {
1328 
1329  if (fileDescriptor_<0) {
1330  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1331  bufferInputRead_ = 0;
1332  //off_t pos = lseek(fileDescriptor,0,SEEK_SET);
1333  if (fileDescriptor_>=0)
1334  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1335  else
1336  {
1337  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer") << "failed to open file " << std::endl
1338  << file->fileName_ << " fd:" << fileDescriptor_;
1339  }
1340  }
1341 
1342  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1343  uint32_t existingSize = 0;
1344  for (unsigned int i=0;i<readBlocks_;i++)
1345  {
1346  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1347  bufferInputRead_+=last;
1348  existingSize+=last;
1349  }
1350  }
1351  else {
1352  const uint32_t chunksize = file->chunkPosition_;
1353  const uint32_t blockcount=chunksize/eventChunkBlock_;
1354  const uint32_t leftsize = chunksize%eventChunkBlock_;
1355  uint32_t existingSize = eventChunkSize_ - file->chunkPosition_;
1356  memmove((void*) file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSize);
1357 
1358  for (uint32_t i=0;i<blockcount;i++) {
1359  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1360  bufferInputRead_+=last;
1361  existingSize+=last;
1362  }
1363  if (leftsize) {
1364  const ssize_t last = ::read(fileDescriptor_,( void*)( file->chunks_[0]->buf_ + existingSize ), leftsize);
1365  bufferInputRead_+=last;
1366  }
1367  file->chunkPosition_=0;//data was moved to beginning of the chunk
1368  }
1369  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1370  if (fileDescriptor_!=-1)
1371  {
1372  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1373  close(fileDescriptor_);
1374  fileDescriptor_=-1;
1375  }
1376  }
1377 }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
uint32_t chunkPosition_
virtual void read(edm::EventPrincipal &eventPrincipal) override
std::string fileName_
tbb::concurrent_vector< InputChunk * > chunks_
void FedRawDataInputSource::readSupervisor ( )
private

Definition at line 909 of file FedRawDataInputSource.cc.

References assert(), InputFile::chunks_, counter, cvReader_, cvWakeup_, daqDirector_, eventChunkSize_, fileListMode_, fileQueue_, fms_, freeChunks_, getFile(), getLSFromFilename_, grabNextJsonFile(), i, InputFile, evf::FastMonitoringThread::inRunEnd, evf::FastMonitoringThread::inSupBusy, evf::FastMonitoringThread::inSupFileLimit, evf::FastMonitoringThread::inSupLockPolling, evf::FastMonitoringThread::inSupNewFile, evf::FastMonitoringThread::inSupNewFileWaitChunk, evf::FastMonitoringThread::inSupNewFileWaitChunkCopying, evf::FastMonitoringThread::inSupNewFileWaitThread, evf::FastMonitoringThread::inSupNewFileWaitThreadCopying, evf::FastMonitoringThread::inSupNoFile, evf::FastMonitoringThread::inSupWaitFreeChunk, evf::FastMonitoringThread::inSupWaitFreeChunkCopying, evf::FastMonitoringThread::inSupWaitFreeThread, evf::FastMonitoringThread::inSupWaitFreeThreadCopying, j, LogDebug, eostools::ls(), maxBufferedFiles_, mReader_, mWakeup_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, fed_dqm_sourceclient-live_cfg::path, quit_threads_, InputChunk::readComplete_, readingFilesCount_, evf::FastMonitoringService::reportLockWait(), InputChunk::reset(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, setExceptionState_, evf::FastMonitoringService::setInStateSup(), edm::shutdown_flag, singleBufferMode_, evf::FastMonitoringService::startedLookingForFile(), startupCv_, startupLock_, mps_update::status, evf::FastMonitoringService::stoppedLookingForFile(), AlCaHLTBitMon_QueryRunRegistry::string, thread_quit_signal, tid_active_, mps_check::timeout, evf::EvFDaqDirector::updateFuLock(), workerJob_, workerPool_, and workerThreads_.

Referenced by checkNextEvent().

910 {
911  bool stop=false;
912  unsigned int currentLumiSection = 0;
913  //threadInit_.exchange(true,std::memory_order_acquire);
914 
915  {
916  std::unique_lock<std::mutex> lk(startupLock_);
917  startupCv_.notify_one();
918  }
919 
920  uint32_t ls=0;
921  uint32_t monLS=1;
922  uint32_t lockCount=0;
923  uint64_t sumLockWaitTimeUs=0.;
924 
925  while (!stop) {
926 
927  //wait for at least one free thread and chunk
928  int counter=0;
930  {
931  //report state to monitoring
932  if (fms_) {
933  bool copy_active=false;
934  for (auto j : tid_active_) if (j) copy_active=true;
936  else if (freeChunks_.empty()) {
937  if (copy_active)
939  else
941  }
942  else {
943  if (copy_active)
945  else
947  }
948  }
949  std::unique_lock<std::mutex> lkw(mWakeup_);
950  //sleep until woken up by condition or a timeout
951  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
952  counter++;
953  //if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads...";
954  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
955  }
956  else {
957  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
958  }
959  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {stop=true;break;}
960  }
961 
962  if (stop) break;
963 
964  //look for a new file
965  std::string nextFile;
966  uint32_t fileSize;
967 
968  if (fms_) {
972  }
973 
975 
976  while (status == evf::EvFDaqDirector::noFile) {
977  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
978  stop=true;
979  break;
980  }
981 
982  uint64_t thisLockWaitTimeUs=0.;
983  if (fileListMode_) {
984  //return LS if LS not set, otherwise return file
985  status = getFile(ls,nextFile,fileSize,thisLockWaitTimeUs);
986  }
987  else
988  status = daqDirector_->updateFuLock(ls,nextFile,fileSize,thisLockWaitTimeUs);
989 
991 
992  if (currentLumiSection!=ls && status==evf::EvFDaqDirector::runEnded) status=evf::EvFDaqDirector::noFile;
993 
994  //monitoring of lock wait time
995  if (thisLockWaitTimeUs>0.)
996  sumLockWaitTimeUs+=thisLockWaitTimeUs;
997  lockCount++;
998  if (ls>monLS) {
999  monLS=ls;
1000  if (lockCount)
1001  if (fms_) fms_->reportLockWait(monLS,sumLockWaitTimeUs,lockCount);
1002  lockCount=0;
1003  sumLockWaitTimeUs=0;
1004  }
1005 
1006  //check again for any remaining index/EoLS files after EoR file is seen
1007  if ( status == evf::EvFDaqDirector::runEnded && !fileListMode_) {
1009  usleep(100000);
1010  //now all files should have appeared in ramdisk, check again if any raw files were left behind
1011  status = daqDirector_->updateFuLock(ls,nextFile,fileSize,thisLockWaitTimeUs);
1012  if (currentLumiSection!=ls && status==evf::EvFDaqDirector::runEnded) status=evf::EvFDaqDirector::noFile;
1013  }
1014 
1015  if ( status == evf::EvFDaqDirector::runEnded) {
1017  stop=true;
1018  break;
1019  }
1020 
1021  //queue new lumisection
1022  if( getLSFromFilename_ && ls > currentLumiSection) {
1023  //fms_->setInStateSup(evf::FastMonitoringThread::inSupNewLumi);
1024  currentLumiSection = ls;
1025  fileQueue_.push(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
1026  }
1027 
1028  if( getLSFromFilename_ && currentLumiSection>0 && ls < currentLumiSection) {
1029  edm::LogError("FedRawDataInputSource") << "Got old LS ("<<ls<<") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<". Aborting execution."<<std::endl;
1031  stop=true;
1032  break;
1033  }
1034 
1035  int dbgcount=0;
1036  if (status == evf::EvFDaqDirector::noFile) {
1038  dbgcount++;
1039  if (!(dbgcount%20)) LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
1040  usleep(100000);
1041  }
1042  }
1043  if ( status == evf::EvFDaqDirector::newFile ) {
1045  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
1046 
1047 
1048  boost::filesystem::path rawFilePath(nextFile);
1049  std::string rawFile = rawFilePath.replace_extension(".raw").string();
1050 
1051  struct stat st;
1052  int stat_res = stat(rawFile.c_str(),&st);
1053  if (stat_res==-1) {
1054  edm::LogError("FedRawDataInputSource") << "Can not stat file (" << errno << "):-"<< rawFile << std::endl;
1055  setExceptionState_=true;
1056  stop=true;
1057  break;
1058  }
1059  fileSize=st.st_size;
1060 
1061  if (fms_) {
1065  }
1066  int eventsInNewFile;
1067  if (fileListMode_) {
1068  if (fileSize==0) eventsInNewFile=0;
1069  else eventsInNewFile=-1;
1070  }
1071  else {
1072  eventsInNewFile = grabNextJsonFile(nextFile);
1073  assert( eventsInNewFile>=0 );
1074  assert((eventsInNewFile>0) == (fileSize>0));//file without events must be empty
1075  }
1076  //int eventsInNewFile = fileListMode_ ? -1 : grabNextJsonFile(nextFile);
1077  //if (fileListMode_ && fileSize==0) {eventsInNewFile=0;}
1078  //if (!fileListMode_) {
1079  // assert( eventsInNewFile>=0 );
1080  // assert((eventsInNewFile>0) == (fileSize>0));//file without events must be empty
1081  //}
1082 
1083  if (!singleBufferMode_) {
1084  //calculate number of needed chunks
1085  unsigned int neededChunks = fileSize/eventChunkSize_;
1086  if (fileSize%eventChunkSize_) neededChunks++;
1087 
1088  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,this);
1090  fileQueue_.push(newInputFile);
1091 
1092  for (unsigned int i=0;i<neededChunks;i++) {
1093 
1094  if (fms_) {
1095  bool copy_active=false;
1096  for (auto j : tid_active_) if (j) copy_active=true;
1097  if (copy_active)
1099  else
1101  }
1102  //get thread
1103  unsigned int newTid = 0xffffffff;
1104  while (!workerPool_.try_pop(newTid)) {
1105  usleep(100000);
1106  }
1107 
1108  if (fms_) {
1109  bool copy_active=false;
1110  for (auto j : tid_active_) if (j) copy_active=true;
1111  if (copy_active)
1113  else
1115  }
1116  InputChunk * newChunk = nullptr;
1117  while (!freeChunks_.try_pop(newChunk)) {
1118  usleep(100000);
1119  if (quit_threads_.load(std::memory_order_relaxed)) break;
1120  }
1121 
1122  if (newChunk == nullptr) {
1123  //return unused tid if we received shutdown (nullptr chunk)
1124  if (newTid!=0xffffffff) workerPool_.push(newTid);
1125  stop = true;
1126  break;
1127  }
1129 
1130  std::unique_lock<std::mutex> lk(mReader_);
1131 
1132  unsigned int toRead = eventChunkSize_;
1133  if (i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
1134  newChunk->reset(i*eventChunkSize_,toRead,i);
1135 
1136  workerJob_[newTid].first=newInputFile;
1137  workerJob_[newTid].second=newChunk;
1138 
1139  //wake up the worker thread
1140  cvReader_[newTid]->notify_one();
1141  }
1142  }
1143  else {
1144  if (!eventsInNewFile) {
1145  //still queue file for lumi update
1146  std::unique_lock<std::mutex> lkw(mWakeup_);
1147  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,this);
1149  fileQueue_.push(newInputFile);
1150  cvWakeup_.notify_one();
1151  return;
1152  }
1153  //in single-buffer mode put single chunk in the file and let the main thread read the file
1154  InputChunk * newChunk;
1155  //should be available immediately
1156  while(!freeChunks_.try_pop(newChunk)) usleep(100000);
1157 
1158  std::unique_lock<std::mutex> lkw(mWakeup_);
1159 
1160  unsigned int toRead = eventChunkSize_;
1161  if (fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
1162  newChunk->reset(0,toRead,0);
1163  newChunk->readComplete_=true;
1164 
1165  //push file and wakeup main thread
1166  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,this);
1167  newInputFile->chunks_[0]=newChunk;
1169  fileQueue_.push(newInputFile);
1170  cvWakeup_.notify_one();
1171  }
1172  }
1173  }
1175  //make sure threads finish reading
1176  unsigned numFinishedThreads = 0;
1177  while (numFinishedThreads < workerThreads_.size()) {
1178  unsigned int tid;
1179  while (!workerPool_.try_pop(tid)) {usleep(10000);}
1180  std::unique_lock<std::mutex> lk(mReader_);
1181  thread_quit_signal[tid]=true;
1182  cvReader_[tid]->notify_one();
1183  numFinishedThreads++;
1184  }
1185  for (unsigned int i=0;i<workerThreads_.size();i++) {
1186  workerThreads_[i]->join();
1187  delete workerThreads_[i];
1188  }
1189 }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
std::condition_variable cvWakeup_
tbb::concurrent_queue< unsigned int > workerPool_
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
assert(m_qm.get())
def ls
Definition: eostools.py:348
void setInStateSup(FastMonitoringThread::InputState inputState)
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
volatile std::atomic< bool > shutdown_flag
int timeout
Definition: mps_check.py:51
std::vector< std::condition_variable * > cvReader_
void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
int grabNextJsonFile(boost::filesystem::path const &)
int j
Definition: DBlmapReader.cc:9
std::vector< std::thread * > workerThreads_
std::atomic< bool > readComplete_
unsigned long long uint64_t
Definition: Time.h:15
tbb::concurrent_queue< InputFile * > fileQueue_
std::condition_variable startupCv_
void stoppedLookingForFile(unsigned int lumi)
tbb::concurrent_vector< InputChunk * > chunks_
evf::EvFDaqDirector * daqDirector_
static std::atomic< unsigned int > counter
std::vector< unsigned int > thread_quit_signal
std::atomic< unsigned int > readingFilesCount_
evf::FastMonitoringService * fms_
friend class InputFile
Open Root file and provide MEs ############.
std::vector< unsigned int > tid_active_
tbb::concurrent_queue< InputChunk * > freeChunks_
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
tuple status
Definition: mps_update.py:57
void FedRawDataInputSource::readWorker ( unsigned int  tid)
private

Definition at line 1191 of file FedRawDataInputSource.cc.

References assert(), InputChunk::buf_, InputFile::chunks_, cvReader_, detectedFRDversion_, mps_update::diff, end, eventChunkBlock_, mergeVDriftHistosByStation::file, InputChunk::fileIndex_, InputFile::fileName_, plotBeamSpotDB::first, i, init, plotBeamSpotDB::last, LogDebug, mReader_, fileCollector::now, InputChunk::offset_, read(), readBlocks_, InputChunk::readComplete_, edm::second(), setExceptionState_, dqm_diff::start, startupCv_, startupLock_, thread_quit_signal, threadInit_, tid_active_, InputChunk::usedSize_, workerJob_, and workerPool_.

Referenced by FedRawDataInputSource().

1192 {
1193  bool init = true;
1194  threadInit_.exchange(true,std::memory_order_acquire);
1195 
1196  while (1) {
1197 
1198  tid_active_[tid]=false;
1199  std::unique_lock<std::mutex> lk(mReader_);
1200  workerJob_[tid].first=nullptr;
1201  workerJob_[tid].first=nullptr;
1202 
1203  assert(!thread_quit_signal[tid]);//should never get it here
1204  workerPool_.push(tid);
1205 
1206  if (init) {
1207  std::unique_lock<std::mutex> lk(startupLock_);
1208  init = false;
1209  startupCv_.notify_one();
1210  }
1211  cvReader_[tid]->wait(lk);
1212 
1213  if (thread_quit_signal[tid]) return;
1214  tid_active_[tid]=true;
1215 
1216  InputFile * file;
1217  InputChunk * chunk;
1218 
1219  assert(workerJob_[tid].first!=nullptr && workerJob_[tid].second!=nullptr);
1220 
1221  file = workerJob_[tid].first;
1222  chunk = workerJob_[tid].second;
1223 
1224  int fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1225  off_t pos = lseek(fileDescriptor,chunk->offset_,SEEK_SET);
1226 
1227 
1228  if (fileDescriptor>=0)
1229  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_ << " at offset " << pos;
1230  else
1231  {
1232  edm::LogError("FedRawDataInputSource") <<
1233  "readWorker failed to open file -: " << file->fileName_ << " fd:" << fileDescriptor <<
1234  " or seek to offset " << chunk->offset_ << ", lseek returned:" << pos;
1235  setExceptionState_=true;
1236  return;
1237 
1238  }
1239 
1240  unsigned int bufferLeft = 0;
1242  for (unsigned int i=0;i<readBlocks_;i++)
1243  {
1244  const ssize_t last = ::read(fileDescriptor,( void*) (chunk->buf_+bufferLeft), eventChunkBlock_);
1245  if ( last > 0 )
1246  bufferLeft+=last;
1247  if (last < eventChunkBlock_) {
1248  assert(chunk->usedSize_==i*eventChunkBlock_+last);
1249  break;
1250  }
1251  }
1253  auto diff = end-start;
1254  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1255  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB" << " in " << msec.count() << " ms ("<< (bufferLeft >> 20)/double(msec.count())<<" GB/s)";
1256  close(fileDescriptor);
1257 
1258  if (detectedFRDversion_==0 && chunk->offset_==0) detectedFRDversion_=*((uint32*)chunk->buf_);
1260  chunk->readComplete_=true;//this is atomic to secure the sequential buffer fill before becoming available for processing)
1261  file->chunks_[chunk->fileIndex_]=chunk;//put the completed chunk in the file chunk vector at predetermined index
1262 
1263  }
1264 }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
tuple start
Check for commandline option errors.
Definition: dqm_diff.py:58
virtual void read(edm::EventPrincipal &eventPrincipal) override
unsigned int offset_
tbb::concurrent_queue< unsigned int > workerPool_
assert(m_qm.get())
int init
Definition: HydjetWrapper.h:67
std::vector< ReaderInfo > workerJob_
std::vector< std::condition_variable * > cvReader_
list diff
Definition: mps_update.py:85
U second(std::pair< T, U > const &p)
unsigned char * buf_
#define end
Definition: vmac.h:37
unsigned int fileIndex_
unsigned int uint32
Definition: MsgTools.h:13
std::atomic< bool > readComplete_
std::string fileName_
std::condition_variable startupCv_
tbb::concurrent_vector< InputChunk * > chunks_
std::atomic< bool > threadInit_
std::vector< unsigned int > thread_quit_signal
std::vector< unsigned int > tid_active_
void FedRawDataInputSource::reportEventsThisLumiInSource ( unsigned int  lumi,
unsigned int  events 
)
private

Definition at line 1380 of file FedRawDataInputSource.cc.

References CommonMethods::lock(), monlock_, and sourceEventsReport_.

Referenced by checkNextEvent(), and getNextEvent().

1381 {
1382 
1383  std::lock_guard<std::mutex> lock(monlock_);
1384  auto itr = sourceEventsReport_.find(lumi);
1385  if (itr!=sourceEventsReport_.end())
1386  itr->second+=events;
1387  else
1389 }
tuple lumi
Definition: fjr2json.py:35
std::map< unsigned int, unsigned int > sourceEventsReport_
tuple events
Definition: patZpeak.py:19
void FedRawDataInputSource::rewind_ ( )
overrideprivatevirtual

Reimplemented from edm::RawInputSource.

Definition at line 905 of file FedRawDataInputSource.cc.

906 {}
void FedRawDataInputSource::threadError ( )
private

Definition at line 1266 of file FedRawDataInputSource.cc.

References Exception, and quit_threads_.

Referenced by InputFile::advance(), and getNextEvent().

1267 {
1268  quit_threads_=true;
1269  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1270 
1271 }
std::atomic< bool > quit_threads_

Friends And Related Function Documentation

friend class InputChunk
friend

Definition at line 46 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

friend class InputFile
friend

Open Root file and provide MEs ############.

Definition at line 45 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

Member Data Documentation

uint32_t FedRawDataInputSource::bufferInputRead_ = 0
private

Definition at line 177 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readNextChunkIntoBuffer().

unsigned int FedRawDataInputSource::checkEvery_ = 10
private

Definition at line 168 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

bool FedRawDataInputSource::chunkIsFree_ =false
private

Definition at line 140 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

InputFile* FedRawDataInputSource::currentFile_ = 0
private

Definition at line 139 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

int FedRawDataInputSource::currentFileIndex_ = -1
private

Definition at line 162 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

unsigned int FedRawDataInputSource::currentLumiSection_
private
std::vector<std::condition_variable*> FedRawDataInputSource::cvReader_
private
std::condition_variable FedRawDataInputSource::cvWakeup_
private

Definition at line 172 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

evf::EvFDaqDirector* FedRawDataInputSource::daqDirector_ = 0
private
const edm::DaqProvenanceHelper FedRawDataInputSource::daqProvenanceHelper_
private

Definition at line 113 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and read().

std::string FedRawDataInputSource::defPath_
private

Definition at line 89 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

uint32 FedRawDataInputSource::detectedFRDversion_ =0
private

Definition at line 138 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readWorker().

jsoncollector::DataPointDefinition* FedRawDataInputSource::dpd_
private

Definition at line 128 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and grabNextJsonFile().

std::unique_ptr<FRDEventMsgView> FedRawDataInputSource::event_
private

Definition at line 115 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), fillFEDRawDataCollection(), and getNextEvent().

unsigned int FedRawDataInputSource::eventChunkBlock_
private
unsigned int FedRawDataInputSource::eventChunkSize_
private
edm::EventID FedRawDataInputSource::eventID_
private

Definition at line 117 of file FedRawDataInputSource.h.

Referenced by read().

uint32_t FedRawDataInputSource::eventRunNumber_ =0
private

Definition at line 121 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and read().

unsigned int FedRawDataInputSource::eventsThisLumi_
private

Definition at line 125 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), getNextEvent(), and read().

unsigned long FedRawDataInputSource::eventsThisRun_ = 0
private

Definition at line 126 of file FedRawDataInputSource.h.

std::mutex FedRawDataInputSource::fileDeleteLock_
private

Definition at line 165 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and read().

int FedRawDataInputSource::fileDescriptor_ = -1
private

Definition at line 176 of file FedRawDataInputSource.h.

Referenced by readNextChunkIntoBuffer().

unsigned int FedRawDataInputSource::fileListIndex_ = 0
private

Definition at line 108 of file FedRawDataInputSource.h.

Referenced by getFile().

const bool FedRawDataInputSource::fileListMode_
private
std::vector<std::string> FedRawDataInputSource::fileNames_
private

Definition at line 104 of file FedRawDataInputSource.h.

Referenced by getFile(), and initFileList().

std::list<std::pair<int,std::string> > FedRawDataInputSource::fileNamesToDelete_
private

Definition at line 164 of file FedRawDataInputSource.h.

tbb::concurrent_queue<InputFile*> FedRawDataInputSource::fileQueue_
private

Definition at line 150 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

std::list<std::pair<int,InputFile*> > FedRawDataInputSource::filesToDelete_
private
evf::FastMonitoringService* FedRawDataInputSource::fms_ = 0
private
tbb::concurrent_queue<InputChunk*> FedRawDataInputSource::freeChunks_
private

Definition at line 149 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), read(), and readSupervisor().

std::string FedRawDataInputSource::fuOutputDir_
private

Definition at line 111 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and grabNextJsonFile().

const bool FedRawDataInputSource::getLSFromFilename_
private

Definition at line 100 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), getNextEvent(), and readSupervisor().

uint32_t FedRawDataInputSource::GTPEventID_ = 0
private

Definition at line 122 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

uint32_t FedRawDataInputSource::L1EventID_ = 0
private

Definition at line 123 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and read().

unsigned int FedRawDataInputSource::maxBufferedFiles_
private

Definition at line 95 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

std::mutex FedRawDataInputSource::monlock_
private

Definition at line 182 of file FedRawDataInputSource.h.

Referenced by getEventReport(), and reportEventsThisLumiInSource().

std::mutex FedRawDataInputSource::mReader_
private

Definition at line 152 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), readWorker(), and ~FedRawDataInputSource().

std::mutex FedRawDataInputSource::mWakeup_
private

Definition at line 171 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

unsigned int FedRawDataInputSource::nStreams_ = 0
private

Definition at line 167 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

unsigned int FedRawDataInputSource::numBuffers_
private

Definition at line 94 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

unsigned int FedRawDataInputSource::numConcurrentReads_
private

Definition at line 96 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and ~FedRawDataInputSource().

edm::ProcessHistoryID FedRawDataInputSource::processHistoryID_
private

Definition at line 118 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), maybeOpenNewLumiSection(), and read().

std::atomic<bool> FedRawDataInputSource::quit_threads_
private
unsigned int FedRawDataInputSource::readBlocks_
private
std::atomic<unsigned int> FedRawDataInputSource::readingFilesCount_
private

Definition at line 97 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

std::unique_ptr<std::thread> FedRawDataInputSource::readSupervisorThread_
private

Definition at line 143 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and ~FedRawDataInputSource().

edm::RunNumber_t FedRawDataInputSource::runNumber_
private

Definition at line 110 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and postForkReacquireResources().

bool FedRawDataInputSource::setExceptionState_ = false
private

Definition at line 158 of file FedRawDataInputSource.h.

Referenced by exceptionState(), getNextEvent(), readSupervisor(), and readWorker().

bool FedRawDataInputSource::singleBufferMode_
private

Definition at line 175 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

std::map<unsigned int,unsigned int> FedRawDataInputSource::sourceEventsReport_
private

Definition at line 181 of file FedRawDataInputSource.h.

Referenced by getEventReport(), and reportEventsThisLumiInSource().

bool FedRawDataInputSource::startedSupervisorThread_ = false
private

Definition at line 142 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and ~FedRawDataInputSource().

std::condition_variable FedRawDataInputSource::startupCv_
private
std::mutex FedRawDataInputSource::startupLock_
private
std::vector<int>* FedRawDataInputSource::streamFileTrackerPtr_ = 0
private

Definition at line 166 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

unsigned char* FedRawDataInputSource::tcds_pointer_
private

Definition at line 124 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

std::vector<unsigned int> FedRawDataInputSource::thread_quit_signal
private
std::atomic<bool> FedRawDataInputSource::threadInit_
private

Definition at line 179 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and readWorker().

std::vector<unsigned int> FedRawDataInputSource::tid_active_
private

Definition at line 154 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), readSupervisor(), and readWorker().

const bool FedRawDataInputSource::useL1EventID_
private

Definition at line 103 of file FedRawDataInputSource.h.

Referenced by read().

const bool FedRawDataInputSource::verifyAdler32_
private

Definition at line 101 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

const bool FedRawDataInputSource::verifyChecksum_
private

Definition at line 102 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

std::vector<ReaderInfo> FedRawDataInputSource::workerJob_
private

Definition at line 147 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), readSupervisor(), and readWorker().

tbb::concurrent_queue<unsigned int> FedRawDataInputSource::workerPool_
private

Definition at line 146 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), and readWorker().

std::vector<std::thread*> FedRawDataInputSource::workerThreads_
private