CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Static Public Member Functions | Protected Member Functions | Private Types | Private Member Functions | Private Attributes | Friends
FedRawDataInputSource Class Reference

#include <FedRawDataInputSource.h>

Inheritance diagram for FedRawDataInputSource:
edm::RawInputSource edm::InputSource edm::ProductRegistryHelper

Public Member Functions

 FedRawDataInputSource (edm::ParameterSet const &, edm::InputSourceDescription const &)
 
std::pair< bool, unsigned int > getEventReport (unsigned int lumi, bool erase)
 
virtual ~FedRawDataInputSource ()
 
- Public Member Functions inherited from edm::RawInputSource
 RawInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
 
virtual ~RawInputSource ()
 
- Public Member Functions inherited from edm::InputSource
std::shared_ptr< ActivityRegistryactReg () const
 Accessor for Activity Registry. More...
 
std::shared_ptr
< BranchIDListHelper const > 
branchIDListHelper () const
 Accessors for branchIDListHelper. More...
 
std::shared_ptr
< BranchIDListHelper > & 
branchIDListHelper ()
 
void closeFile (FileBlock *, bool cleaningUpAfterException)
 close current file More...
 
void doBeginJob ()
 Called by framework at beginning of job. More...
 
void doBeginLumi (LuminosityBlockPrincipal &lbp, ProcessContext const *)
 Called by framework at beginning of lumi block. More...
 
void doBeginRun (RunPrincipal &rp, ProcessContext const *)
 Called by framework at beginning of run. More...
 
void doEndJob ()
 Called by framework at end of job. More...
 
void doEndLumi (LuminosityBlockPrincipal &lbp, bool cleaningUpAfterException, ProcessContext const *)
 Called by framework at end of lumi block. More...
 
void doEndRun (RunPrincipal &rp, bool cleaningUpAfterException, ProcessContext const *)
 Called by framework at end of run. More...
 
void doPostForkReacquireResources (std::shared_ptr< multicore::MessageReceiverForSource >)
 
void doPreForkReleaseResources ()
 Called by the framework before forking the process. More...
 
ProcessingController::ForwardState forwardState () const
 
bool goToEvent (EventID const &eventID)
 
 InputSource (ParameterSet const &, InputSourceDescription const &)
 Constructor. More...
 
 InputSource (InputSource const &)=delete
 
void issueReports (EventID const &eventID)
 issue an event report More...
 
LuminosityBlockNumber_t luminosityBlock () const
 Accessor for current luminosity block number. More...
 
std::shared_ptr
< LuminosityBlockAuxiliary
luminosityBlockAuxiliary () const
 Called by the framework to merge or insert lumi in principal cache. More...
 
int maxEvents () const
 
int maxLuminosityBlocks () const
 
ModuleDescription const & moduleDescription () const
 Accessor for 'module' description. More...
 
ItemType nextItemType ()
 Advances the source to the next item. More...
 
InputSourceoperator= (InputSource const &)=delete
 
ProcessConfiguration const & processConfiguration () const
 Accessor for Process Configuration. More...
 
std::string const & processGUID () const
 Accessor for global process identifier. More...
 
ProcessHistoryRegistry const & processHistoryRegistry () const
 Accessors for process history registry. More...
 
ProcessHistoryRegistryprocessHistoryRegistry ()
 
ProcessingMode processingMode () const
 RunsLumisAndEvents (default), RunsAndLumis, or Runs. More...
 
std::shared_ptr
< ProductRegistry const > 
productRegistry () const
 Accessors for product registry. More...
 
std::shared_ptr
< ProductRegistry > & 
productRegistry ()
 
bool randomAccess () const
 
void readAndMergeLumi (LuminosityBlockPrincipal &lbp)
 Read next luminosity block (same as a prior lumi) More...
 
void readAndMergeRun (RunPrincipal &rp)
 Read next run (same as a prior run) More...
 
void readEvent (EventPrincipal &ep, StreamContext &)
 Read next event. More...
 
bool readEvent (EventPrincipal &ep, EventID const &, StreamContext &)
 Read a specific event. More...
 
std::unique_ptr< FileBlockreadFile ()
 Read next file. More...
 
void readLuminosityBlock (LuminosityBlockPrincipal &lumiPrincipal, HistoryAppender &historyAppender)
 Read next luminosity block (new lumi) More...
 
std::shared_ptr
< LuminosityBlockAuxiliary
readLuminosityBlockAuxiliary ()
 Read next luminosity block Auxilary. More...
 
void readRun (RunPrincipal &runPrincipal, HistoryAppender &historyAppender)
 Read next run (new run) More...
 
std::shared_ptr< RunAuxiliaryreadRunAuxiliary ()
 Read next run Auxiliary. More...
 
ProcessHistoryID const & reducedProcessHistoryID () const
 
void registerProducts ()
 Register any produced products. More...
 
int remainingEvents () const
 
int remainingLuminosityBlocks () const
 
void repeat ()
 Reset the remaining number of events/lumis to the maximum number. More...
 
SharedResourcesAcquirerresourceSharedWithDelayedReader ()
 Returns nullptr if no resource shared between the Source and a DelayedReader. More...
 
ProcessingController::ReverseState reverseState () const
 
void rewind ()
 Begin again at the first event. More...
 
RunNumber_t run () const
 Accessor for current run number. More...
 
std::shared_ptr< RunAuxiliaryrunAuxiliary () const
 Called by the framework to merge or insert run in principal cache. More...
 
void setLuminosityBlockNumber_t (LuminosityBlockNumber_t lb)
 Set the luminosity block ID. More...
 
void setRunNumber (RunNumber_t r)
 Set the run number. More...
 
void skipEvents (int offset)
 
bool skipForForking ()
 
std::shared_ptr
< ThinnedAssociationsHelper
const > 
thinnedAssociationsHelper () const
 Accessors for thinnedAssociationsHelper. More...
 
std::shared_ptr
< ThinnedAssociationsHelper > & 
thinnedAssociationsHelper ()
 
Timestamp const & timestamp () const
 Accessor for the current time, as seen by the input source. More...
 
virtual ~InputSource ()
 Destructor. More...
 

Static Public Member Functions

static void fillDescriptions (edm::ConfigurationDescriptions &descriptions)
 
- Static Public Member Functions inherited from edm::RawInputSource
static void fillDescription (ParameterSetDescription &description)
 
- Static Public Member Functions inherited from edm::InputSource
static const std::string & baseType ()
 
static void fillDescription (ParameterSetDescription &desc)
 
static void fillDescriptions (ConfigurationDescriptions &descriptions)
 
static void prevalidate (ConfigurationDescriptions &)
 

Protected Member Functions

virtual bool checkNextEvent () override
 
virtual void read (edm::EventPrincipal &eventPrincipal) override
 
- Protected Member Functions inherited from edm::RawInputSource
void makeEvent (EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
 
void setInputFileTransitionsEachEvent ()
 
- Protected Member Functions inherited from edm::InputSource
void decreaseRemainingEventsBy (int iSkipped)
 
bool eventCached () const
 
bool newLumi () const
 
bool newRun () const
 
ProcessHistoryRegistryprocessHistoryRegistryForUpdate ()
 
ProductRegistryproductRegistryUpdate ()
 
void reset () const
 
void resetEventCached ()
 
void resetLuminosityBlockAuxiliary (bool isNewLumi=true) const
 
void resetNewLumi ()
 
void resetNewRun ()
 
void resetRunAuxiliary (bool isNewRun=true) const
 
void setEventCached ()
 Called by the framework to merge or ached() const {return eventCached_;}. More...
 
void setLuminosityBlockAuxiliary (LuminosityBlockAuxiliary *lbp)
 
void setNewLumi ()
 
void setNewRun ()
 
void setRunAuxiliary (RunAuxiliary *rp)
 
void setTimestamp (Timestamp const &theTime)
 To set the current time, as seen by the input source. More...
 
virtual void skip (int offset)
 
ItemType state () const
 

Private Types

typedef std::pair< InputFile
*, InputChunk * > 
ReaderInfo
 

Private Member Functions

void createBoLSFile (const uint32_t lumiSection, bool checkIfExists)
 
void deleteFile (std::string const &)
 
bool exceptionState ()
 
edm::Timestamp fillFEDRawDataCollection (FEDRawDataCollection &)
 
evf::EvFDaqDirector::FileStatus getFile (unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
 
evf::EvFDaqDirector::FileStatus getNextEvent ()
 
int grabNextJsonFile (boost::filesystem::path const &)
 
long initFileList ()
 
void maybeOpenNewLumiSection (const uint32_t lumiSection)
 
evf::EvFDaqDirector::FileStatus nextEvent ()
 
virtual void postForkReacquireResources (std::shared_ptr< edm::multicore::MessageReceiverForSource >) override
 
virtual void preForkReleaseResources () override
 
void readNextChunkIntoBuffer (InputFile *file)
 
void readSupervisor ()
 
void readWorker (unsigned int tid)
 
void reportEventsThisLumiInSource (unsigned int lumi, unsigned int events)
 
virtual void rewind_ () override
 
void threadError ()
 

Private Attributes

uint32_t bufferInputRead_ = 0
 
unsigned int checkEvery_ = 10
 
bool chunkIsFree_ =false
 
InputFilecurrentFile_ = 0
 
int currentFileIndex_ = -1
 
unsigned int currentLumiSection_
 
std::vector
< std::condition_variable * > 
cvReader_
 
std::condition_variable cvWakeup_
 
evf::EvFDaqDirectordaqDirector_ = 0
 
const edm::DaqProvenanceHelper daqProvenanceHelper_
 
std::string defPath_
 
uint32 detectedFRDversion_ =0
 
jsoncollector::DataPointDefinitiondpd_
 
std::unique_ptr< FRDEventMsgViewevent_
 
unsigned int eventChunkBlock_
 
unsigned int eventChunkSize_
 
edm::EventID eventID_
 
uint32_t eventRunNumber_ =0
 
unsigned int eventsThisLumi_
 
unsigned long eventsThisRun_ = 0
 
std::mutex fileDeleteLock_
 
int fileDescriptor_ = -1
 
unsigned int fileListIndex_ = 0
 
const bool fileListLoopMode_
 
const bool fileListMode_
 
std::vector< std::string > fileNames_
 
std::list< std::pair< int,
std::string > > 
fileNamesToDelete_
 
tbb::concurrent_queue
< InputFile * > 
fileQueue_
 
std::list< std::pair< int,
InputFile * > > 
filesToDelete_
 
evf::FastMonitoringServicefms_ = 0
 
tbb::concurrent_queue
< InputChunk * > 
freeChunks_
 
std::string fuOutputDir_
 
const bool getLSFromFilename_
 
uint32_t GTPEventID_ = 0
 
uint32_t L1EventID_ = 0
 
unsigned int loopModeIterationInc_ = 0
 
unsigned int maxBufferedFiles_
 
std::mutex monlock_
 
std::mutex mReader_
 
std::mutex mWakeup_
 
unsigned int nStreams_ = 0
 
unsigned int numBuffers_
 
unsigned int numConcurrentReads_
 
edm::ProcessHistoryID processHistoryID_
 
std::atomic< bool > quit_threads_
 
unsigned int readBlocks_
 
std::atomic< unsigned int > readingFilesCount_
 
std::unique_ptr< std::thread > readSupervisorThread_
 
edm::RunNumber_t runNumber_
 
bool setExceptionState_ = false
 
bool singleBufferMode_
 
std::map< unsigned int,
unsigned int > 
sourceEventsReport_
 
bool startedSupervisorThread_ = false
 
std::condition_variable startupCv_
 
std::mutex startupLock_
 
std::vector< int > * streamFileTrackerPtr_ = 0
 
unsigned char * tcds_pointer_
 
std::vector< unsigned int > thread_quit_signal
 
std::atomic< bool > threadInit_
 
std::vector< unsigned int > tid_active_
 
const bool useL1EventID_
 
const bool verifyAdler32_
 
const bool verifyChecksum_
 
std::vector< ReaderInfoworkerJob_
 
tbb::concurrent_queue
< unsigned int > 
workerPool_
 
std::vector< std::thread * > workerThreads_
 

Friends

class InputChunk
 
class InputFile
 Open Root file and provide MEs ############. More...
 

Additional Inherited Members

- Public Types inherited from edm::InputSource
enum  ItemType {
  IsInvalid, IsStop, IsFile, IsRun,
  IsLumi, IsEvent, IsRepeat, IsSynchronize
}
 
enum  ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents }
 
typedef
ProductRegistryHelper::TypeLabelList 
TypeLabelList
 

Detailed Description

Definition at line 43 of file FedRawDataInputSource.h.

Member Typedef Documentation

Definition at line 138 of file FedRawDataInputSource.h.

Constructor & Destructor Documentation

FedRawDataInputSource::FedRawDataInputSource ( edm::ParameterSet const &  pset,
edm::InputSourceDescription const &  desc 
)
explicit

Definition at line 54 of file FedRawDataInputSource.cc.

References assert(), edm::Timestamp::beginOfTime(), crc32c_hw_test(), cvReader_, daqDirector_, edm::DaqProvenanceHelper::daqInit(), daqProvenanceHelper_, defPath_, dpd_, eventChunkBlock_, eventChunkSize_, Exception, fileDeleteLock_, fileListLoopMode_, fileListMode_, filesToDelete_, fms_, freeChunks_, i, evf::FastMonitoringThread::inInit, initFileList(), InputChunk, edm::Timestamp::invalidTimestamp(), numBuffers_, numConcurrentReads_, cppFunctionSkipper::operator, processHistoryID_, edm::InputSource::processHistoryRegistryForUpdate(), edm::InputSource::productRegistryUpdate(), quit_threads_, readBlocks_, readingFilesCount_, readWorker(), edm::InputSource::runAuxiliary(), runNumber_, evf::EvFDaqDirector::setDeleteTracking(), evf::EvFDaqDirector::setFMS(), evf::FastMonitoringService::setInputSource(), evf::FastMonitoringService::setInState(), evf::FastMonitoringService::setInStateSup(), edm::InputSource::setNewRun(), edm::InputSource::setRunAuxiliary(), singleBufferMode_, startupCv_, startupLock_, AlCaHLTBitMon_QueryRunRegistry::string, thread_quit_signal, threadInit_, tid_active_, workerJob_, and workerThreads_.

55  :
57  defPath_(pset.getUntrackedParameter<std::string> ("buDefPath", std::string(getenv("CMSSW_BASE"))+"/src/EventFilter/Utilities/plugins/budef.jsd")),
58  eventChunkSize_(pset.getUntrackedParameter<unsigned int> ("eventChunkSize",32)*1048576),
59  eventChunkBlock_(pset.getUntrackedParameter<unsigned int> ("eventChunkBlock",32)*1048576),
60  numBuffers_(pset.getUntrackedParameter<unsigned int> ("numBuffers",2)),
61  maxBufferedFiles_(pset.getUntrackedParameter<unsigned int> ("maxBufferedFiles",2)),
62  getLSFromFilename_(pset.getUntrackedParameter<bool> ("getLSFromFilename", true)),
63  verifyAdler32_(pset.getUntrackedParameter<bool> ("verifyAdler32", true)),
64  verifyChecksum_(pset.getUntrackedParameter<bool> ("verifyChecksum", true)),
65  useL1EventID_(pset.getUntrackedParameter<bool> ("useL1EventID", false)),
66  fileNames_(pset.getUntrackedParameter<std::vector<std::string>> ("fileNames",std::vector<std::string>())),
67  fileListMode_(pset.getUntrackedParameter<bool> ("fileListMode", false)),
68  fileListLoopMode_(pset.getUntrackedParameter<bool> ("fileListLoopMode", false)),
72  eventID_(),
75  tcds_pointer_(0),
76  eventsThisLumi_(0),
77  dpd_(nullptr)
78 {
79  char thishost[256];
80  gethostname(thishost, 255);
81  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: "
82  << std::endl << (eventChunkSize_/1048576)
83  << " MB on host " << thishost;
84 
85  long autoRunNumber = -1;
86  if (fileListMode_) {
87  autoRunNumber = initFileList();
88  if (!fileListLoopMode_) {
89  if (autoRunNumber<0)
90  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") << "Run number not found from filename";
91  //override run number
92  runNumber_ = (edm::RunNumber_t)autoRunNumber;
93  edm::Service<evf::EvFDaqDirector>()->overrideRunNumber((unsigned int)autoRunNumber);
94  }
95  }
96 
98  setNewRun();
99  //todo:autodetect from file name (assert if names differ)
102 
103  std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
104  defPath_ = std::string(getenv("CMSSW_BASE")) + "/" + defPathSuffix;
105  struct stat statbuf;
106  if (stat(defPath_.c_str(), &statbuf)) {
107  defPath_ = std::string(getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
108  if (stat(defPath_.c_str(), &statbuf)) {
109  defPath_ = defPathSuffix;
110  }
111  }
112 
113  dpd_ = new DataPointDefinition();
114  std::string defLabel = "data";
115  DataPointDefinition::getDataPointDefinitionFor(defPath_, dpd_,&defLabel);
116 
117  //make sure that chunk size is N * block size
122 
123  if (!numBuffers_)
124  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") <<
125  "no reading enabled with numBuffers parameter 0";
126 
130 
131  if (!crc32c_hw_test())
132  edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
133 
134  //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
135  if (fileListMode_) {
136  try {
138  } catch(...) {}
139  }
140  else {
142  if (!fms_) {
143  throw cms::Exception("FedRawDataInputSource") << "FastMonitoringService not found";
144  }
145  }
146 
148  if (!daqDirector_)
149  cms::Exception("FedRawDataInputSource") << "EvFDaqDirector not found";
150 
151  //set DaqDirector to delete files in preGlobalEndLumi callback
153  if (fms_) {
155  fms_->setInputSource(this);
158  }
159  //should delete chunks when run stops
160  for (unsigned int i=0;i<numBuffers_;i++) {
162  }
163 
164  quit_threads_ = false;
165 
166  for (unsigned int i=0;i<numConcurrentReads_;i++)
167  {
168  std::unique_lock<std::mutex> lk(startupLock_);
169  //issue a memory fence here and in threads (constructor was segfaulting without this)
170  thread_quit_signal.push_back(false);
171  workerJob_.push_back(ReaderInfo(nullptr,nullptr));
172  cvReader_.push_back(new std::condition_variable);
173  tid_active_.push_back(0);
174  threadInit_.store(false,std::memory_order_release);
175  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker,this,i));
176  startupCv_.wait(lk);
177  }
178 
179  runAuxiliary()->setProcessHistoryID(processHistoryID_);
180 }
int i
Definition: DBlmapReader.cc:9
std::vector< std::string > fileNames_
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:350
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
bool crc32c_hw_test()
Definition: crc32c.cc:354
jsoncollector::DataPointDefinition * dpd_
void setInState(FastMonitoringThread::InputState inputState)
assert(m_qm.get())
void setInStateSup(FastMonitoringThread::InputState inputState)
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
std::vector< std::condition_variable * > cvReader_
static Timestamp beginOfTime()
Definition: Timestamp.h:103
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
edm::ProcessHistoryID processHistoryID_
const edm::DaqProvenanceHelper daqProvenanceHelper_
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
std::vector< std::thread * > workerThreads_
void setInputSource(FedRawDataInputSource *inputSource)
std::list< std::pair< int, InputFile * > > filesToDelete_
std::condition_variable startupCv_
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:262
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:351
std::atomic< bool > threadInit_
std::pair< InputFile *, InputChunk * > ReaderInfo
evf::EvFDaqDirector * daqDirector_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:353
void readWorker(unsigned int tid)
std::vector< unsigned int > thread_quit_signal
std::atomic< unsigned int > readingFilesCount_
evf::FastMonitoringService * fms_
std::vector< unsigned int > tid_active_
unsigned int RunNumber_t
tbb::concurrent_queue< InputChunk * > freeChunks_
void setFMS(evf::FastMonitoringService *fms)
FedRawDataInputSource::~FedRawDataInputSource ( )
virtual

Definition at line 182 of file FedRawDataInputSource.cc.

References cvReader_, deleteFile(), filesToDelete_, i, mReader_, numConcurrentReads_, quit_threads_, readSupervisorThread_, startedSupervisorThread_, thread_quit_signal, and workerThreads_.

183 {
184  quit_threads_=true;
185 
186  //delete any remaining open files
187  for (auto it = filesToDelete_.begin();it!=filesToDelete_.end();it++) {
188  deleteFile(it->second->fileName_);
189  delete it->second;
190  }
192  readSupervisorThread_->join();
193  }
194  else {
195  //join aux threads in case the supervisor thread was not started
196  for (unsigned int i=0;i<workerThreads_.size();i++) {
197  std::unique_lock<std::mutex> lk(mReader_);
198  thread_quit_signal[i]=true;
199  cvReader_[i]->notify_one();
200  lk.unlock();
201  workerThreads_[i]->join();
202  delete workerThreads_[i];
203  }
204  }
205  for (unsigned int i=0;i<numConcurrentReads_;i++) delete cvReader_[i];
206  /*
207  for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
208  InputChunk *ch;
209  while (!freeChunks_.try_pop(ch)) {}
210  delete ch;
211  }
212  */
213 }
int i
Definition: DBlmapReader.cc:9
std::atomic< bool > quit_threads_
std::unique_ptr< std::thread > readSupervisorThread_
std::vector< std::condition_variable * > cvReader_
std::vector< std::thread * > workerThreads_
std::list< std::pair< int, InputFile * > > filesToDelete_
std::vector< unsigned int > thread_quit_signal
void deleteFile(std::string const &)

Member Function Documentation

bool FedRawDataInputSource::checkNextEvent ( )
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 232 of file FedRawDataInputSource.cc.

References evf::EvFDaqDirector::createProcessingNotificationMaybe(), currentLumiSection_, daqDirector_, evf::EvFDaqDirector::emptyLumisectionMode(), event_, eventRunNumber_, eventsThisLumi_, fileListLoopMode_, fileListMode_, fms_, newFWLiteAna::found, fuOutputDir_, evf::EvFDaqDirector::getEoLSFilePathOnBU(), evf::EvFDaqDirector::getEoLSFilePathOnFU(), evf::EvFDaqDirector::getEoRFilePathOnFU(), getLSFromFilename_, evf::FastMonitoringThread::inWaitInput, L1EventID_, evf::EvFDaqDirector::lockFULocal2(), maybeOpenNewLumiSection(), evf::EvFDaqDirector::newLumi, nextEvent(), evf::EvFDaqDirector::noFile, readSupervisor(), readSupervisorThread_, reportEventsThisLumiInSource(), edm::InputSource::resetLuminosityBlockAuxiliary(), evf::EvFDaqDirector::runEnded, runNumber_, edm::InputSource::setEventCached(), evf::FastMonitoringService::setInState(), startedSupervisorThread_, startupCv_, startupLock_, AlCaHLTBitMon_QueryRunRegistry::string, and evf::EvFDaqDirector::unlockFULocal2().

233 {
235  {
236  //late init of directory variable
238 
239  //this thread opens new files and dispatches reading to worker readers
240  //threadInit_.store(false,std::memory_order_release);
241  std::unique_lock<std::mutex> lk(startupLock_);
242  readSupervisorThread_.reset(new std::thread(&FedRawDataInputSource::readSupervisor,this));
244  startupCv_.wait(lk);
245  }
246  //signal hltd to start event accounting
250  switch (nextEvent() ) {
252  //maybe create EoL file in working directory before ending run
253  struct stat buf;
254  if ( currentLumiSection_ > 0) {
255  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
256  if (eolFound) {
258  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
259  if ( !found ) {
261  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
262  close(eol_fd);
264  }
265  }
266  }
267  //also create EoR file in FU data directory
268  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(),&buf) == 0);
269  if (!eorFound) {
270  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
271  close(eor_fd);
272  }
274  eventsThisLumi_=0;
276  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
277  return false;
278  }
280  //this is not reachable
281  return true;
282  }
284  //std::cout << "--------------NEW LUMI---------------" << std::endl;
285  return true;
286  }
287  default: {
288  if (!getLSFromFilename_) {
289  //get new lumi from file header
290  if (event_->lumi() > currentLumiSection_) {
292  eventsThisLumi_=0;
293  maybeOpenNewLumiSection( event_->lumi() );
294  }
295  }
298  else
299  eventRunNumber_=event_->run();
300  L1EventID_ = event_->event();
301 
302  setEventCached();
303 
304  return true;
305  }
306  }
307 }
void maybeOpenNewLumiSection(const uint32_t lumiSection)
void setInState(FastMonitoringThread::InputState inputState)
void createProcessingNotificationMaybe() const
std::unique_ptr< std::thread > readSupervisorThread_
std::string getEoLSFilePathOnBU(const unsigned int ls) const
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:382
std::unique_ptr< FRDEventMsgView > event_
std::condition_variable startupCv_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:365
evf::EvFDaqDirector * daqDirector_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
evf::FastMonitoringService * fms_
bool emptyLumisectionMode() const
std::string getEoLSFilePathOnFU(const unsigned int ls) const
evf::EvFDaqDirector::FileStatus nextEvent()
std::string getEoRFilePathOnFU() const
void FedRawDataInputSource::createBoLSFile ( const uint32_t  lumiSection,
bool  checkIfExists 
)
private

Definition at line 309 of file FedRawDataInputSource.cc.

References daqDirector_, evf::EvFDaqDirector::getBoLSFilePathOnFU(), and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by maybeOpenNewLumiSection().

310 {
311  //used for backpressure mechanisms and monitoring
312  const std::string fuBoLS = daqDirector_->getBoLSFilePathOnFU(lumiSection);
313  struct stat buf;
314  if (checkIfExists==false || stat(fuBoLS.c_str(), &buf) != 0) {
315  int bol_fd = open(fuBoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
316  close(bol_fd);
317  }
318 }
std::string getBoLSFilePathOnFU(const unsigned int ls) const
evf::EvFDaqDirector * daqDirector_
void FedRawDataInputSource::deleteFile ( std::string const &  fileName)
private

Definition at line 641 of file FedRawDataInputSource.cc.

References cppFunctionSkipper::exception, fileListMode_, MillePedeFileConverter_cfg::fileName, LogDebug, fed_dqm_sourceclient-live_cfg::path, and MatrixUtil::remove().

Referenced by getNextEvent(), read(), and ~FedRawDataInputSource().

642 {
643  //no deletion in file list mode
644  if (fileListMode_) return;
645 
646  const boost::filesystem::path filePath(fileName);
647  LogDebug("FedRawDataInputSource") << "Deleting input file -:" << fileName;
648  try {
649  //sometimes this fails but file gets deleted
650  boost::filesystem::remove(filePath);
651  }
652  catch (const boost::filesystem::filesystem_error& ex)
653  {
654  edm::LogError("FedRawDataInputSource") << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
655  << ". Trying again.";
656  usleep(100000);
657  try {
658  boost::filesystem::remove(filePath);
659  }
660  catch (const boost::filesystem::filesystem_error&) {/*file gets deleted first time but exception is still thrown*/}
661  }
662  catch (std::exception& ex)
663  {
664  edm::LogError("FedRawDataInputSource") << " - deleteFile std::exception CAUGHT -: " << ex.what()
665  << ". Trying again.";
666  usleep(100000);
667  try {
668  boost::filesystem::remove(filePath);
669  } catch (std::exception&) {/*file gets deleted first time but exception is still thrown*/}
670  }
671 }
#define LogDebug(id)
bool FedRawDataInputSource::exceptionState ( )
inlineprivate

Definition at line 74 of file FedRawDataInputSource.h.

References setExceptionState_.

Referenced by InputFile::advance(), and getNextEvent().

void FedRawDataInputSource::fillDescriptions ( edm::ConfigurationDescriptions descriptions)
static

Definition at line 215 of file FedRawDataInputSource.cc.

References edm::ConfigurationDescriptions::add(), edm::ParameterSetDescription::addUntracked(), edm::ParameterSetDescription::setAllowAnything(), edm::ParameterSetDescription::setComment(), and edm::ParameterDescriptionNode::setComment().

216 {
218  desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk");
219  desc.addUntracked<unsigned int> ("eventChunkSize",32)->setComment("Input buffer (chunk) size");
220  desc.addUntracked<unsigned int> ("eventChunkBlock",32)->setComment("Block size used in a single file read call (must be smaller or equal to buffer size)");
221  desc.addUntracked<unsigned int> ("numBuffers",2)->setComment("Number of buffers used for reading input");
222  desc.addUntracked<unsigned int> ("maxBufferedFiles",2)->setComment("Maximum number of simultaneously buffered raw files");
223  desc.addUntracked<bool> ("verifyAdler32", true)->setComment("Verify event Adler32 checksum with FRDv3 or v4");
224  desc.addUntracked<bool> ("verifyChecksum", true)->setComment("Verify event CRC-32C checksum of FRDv5 or higher");
225  desc.addUntracked<bool> ("useL1EventID", false)->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
226  desc.addUntracked<bool> ("fileListMode", false)->setComment("Use fileNames parameter to directly specify raw files to open");
227  desc.addUntracked<std::vector<std::string>> ("fileNames", std::vector<std::string>())->setComment("file list used when fileListMode is enabled");
228  desc.setAllowAnything();
229  descriptions.add("source", desc);
230 }
void setComment(std::string const &value)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void setAllowAnything()
allow any parameter label/value pairs
void setComment(std::string const &value)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection ( FEDRawDataCollection rawData)
private

Definition at line 746 of file FedRawDataInputSource.cc.

References assert(), FEDRawData::data(), event(), event_, fedt_struct::eventsize, evf::evtn::evm_board_sense(), Exception, FED_EVSZ_EXTRACT, FED_SOID_EXTRACT, FEDRawDataCollection::FEDData(), l1tstage2_dqm_sourceclient-live_cfg::fedId, evf::evtn::get(), evf::evtn::getgpshigh(), evf::evtn::getgpslow(), evf::evtn::gtpe_board_sense(), evf::evtn::gtpe_get(), GTPEventID_, FEDNumbering::MAXFEDID, FEDNumbering::MINTCDSuTCAFEDID, FEDNumbering::MINTriggerEGTPFEDID, FEDNumbering::MINTriggerGTPFEDID, FEDRawData::resize(), fedh_struct::sourceid, and tcds_pointer_.

Referenced by read().

747 {
748  edm::TimeValue_t time;
749  timeval stv;
750  gettimeofday(&stv,0);
751  time = stv.tv_sec;
752  time = (time << 32) + stv.tv_usec;
753  edm::Timestamp tstamp(time);
754 
755  uint32_t eventSize = event_->eventSize();
756  char* event = (char*)event_->payload();
757  GTPEventID_=0;
758  tcds_pointer_ = 0;
759  while (eventSize > 0) {
760  assert(eventSize>=sizeof(fedt_t));
761  eventSize -= sizeof(fedt_t);
762  const fedt_t* fedTrailer = (fedt_t*) (event + eventSize);
763  const uint32_t fedSize = FED_EVSZ_EXTRACT(fedTrailer->eventsize) << 3; //trailer length counts in 8 bytes
764  assert(eventSize>=fedSize - sizeof(fedt_t));
765  eventSize -= (fedSize - sizeof(fedt_t));
766  const fedh_t* fedHeader = (fedh_t *) (event + eventSize);
767  const uint16_t fedId = FED_SOID_EXTRACT(fedHeader->sourceid);
768  if(fedId>FEDNumbering::MAXFEDID)
769  {
770  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
771  }
772  if (fedId == FEDNumbering::MINTCDSuTCAFEDID) {
773  tcds_pointer_ = (unsigned char *)(event + eventSize );
774  }
775  if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
776  if (evf::evtn::evm_board_sense((unsigned char*) fedHeader,fedSize))
777  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,true);
778  else
779  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,false);
780  //evf::evtn::evm_board_setformat(fedSize);
781  const uint64_t gpsl = evf::evtn::getgpslow((unsigned char*) fedHeader);
782  const uint64_t gpsh = evf::evtn::getgpshigh((unsigned char*) fedHeader);
783  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
784  }
785  //take event ID from GTPE FED
786  if (fedId == FEDNumbering::MINTriggerEGTPFEDID && GTPEventID_==0) {
787  if (evf::evtn::gtpe_board_sense((unsigned char*)fedHeader)) {
788  GTPEventID_ = evf::evtn::gtpe_get((unsigned char*) fedHeader);
789  }
790  }
791  FEDRawData& fedData = rawData.FEDData(fedId);
792  fedData.resize(fedSize);
793  memcpy(fedData.data(), event + eventSize, fedSize);
794  }
795  assert(eventSize == 0);
796 
797  return tstamp;
798 }
unsigned int getgpshigh(const unsigned char *)
bool gtpe_board_sense(const unsigned char *p)
unsigned int get(const unsigned char *, bool)
assert(m_qm.get())
unsigned int sourceid
Definition: fed_header.h:32
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
void resize(size_t newsize)
Definition: FEDRawData.cc:32
struct fedt_struct fedt_t
#define FED_EVSZ_EXTRACT(a)
Definition: fed_trailer.h:36
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
unsigned long long TimeValue_t
Definition: Timestamp.h:28
bool evm_board_sense(const unsigned char *p, size_t size)
unsigned long long uint64_t
Definition: Time.h:15
std::unique_ptr< FRDEventMsgView > event_
unsigned int eventsize
Definition: fed_trailer.h:33
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:28
unsigned int gtpe_get(const unsigned char *)
unsigned int getgpslow(const unsigned char *)
#define FED_SOID_EXTRACT(a)
Definition: fed_header.h:53
std::pair< bool, unsigned int > FedRawDataInputSource::getEventReport ( unsigned int  lumi,
bool  erase 
)

Definition at line 1403 of file FedRawDataInputSource.cc.

References CommonMethods::lock(), monlock_, runTheMatrix::ret, and sourceEventsReport_.

Referenced by evf::FastMonitoringService::preGlobalEndLumi().

1404 {
1405  std::lock_guard<std::mutex> lock(monlock_);
1406  auto itr = sourceEventsReport_.find(lumi);
1407  if (itr!=sourceEventsReport_.end()) {
1408  auto && ret = std::pair<bool,unsigned int>(true,itr->second);
1409  if (erase)
1410  sourceEventsReport_.erase(itr);
1411  return ret;
1412  }
1413  else
1414  return std::pair<bool,unsigned int>(false,0);
1415 }
tuple ret
prodAgent to be discontinued
tuple lumi
Definition: fjr2json.py:35
std::map< unsigned int, unsigned int > sourceEventsReport_
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getFile ( unsigned int &  ls,
std::string &  nextFile,
uint32_t &  fsize,
uint64_t &  lockWaitTime 
)
private

Definition at line 1446 of file FedRawDataInputSource.cc.

References fileListIndex_, fileListLoopMode_, MillePedeFileConverter_cfg::fileName, fileNames_, loopModeIterationInc_, evf::EvFDaqDirector::newFile, fed_dqm_sourceclient-live_cfg::path, evf::EvFDaqDirector::runEnded, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by readSupervisor().

1447 {
1448  if (fileListIndex_ < fileNames_.size()) {
1449  nextFile = fileNames_[fileListIndex_];
1450  if (nextFile.find("file://")==0) nextFile=nextFile.substr(7);
1451  else if (nextFile.find("file:")==0) nextFile=nextFile.substr(5);
1452  boost::filesystem::path fileName = nextFile;
1453  std::string fileStem = fileName.stem().string();
1454  if (fileStem.find("ls")) fileStem = fileStem.substr(fileStem.find("ls")+2);
1455  if (fileStem.find("_")) fileStem = fileStem.substr(0,fileStem.find("_"));
1456 
1457  if (!fileListLoopMode_)
1458  ls = boost::lexical_cast<unsigned int>(fileStem);
1459  else //always starting from LS 1 in loop mode
1460  ls = 1 + loopModeIterationInc_;
1461 
1462  //fsize = 0;
1463  //lockWaitTime = 0;
1464  fileListIndex_++;
1466  }
1467  else {
1468  if (!fileListLoopMode_)
1470  else {
1471  //loop through files until interrupted
1473  fileListIndex_=0;
1474  return getFile(ls,nextFile,fsize,lockWaitTime);
1475  }
1476  }
1477 }
std::vector< std::string > fileNames_
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
def ls
Definition: eostools.py:348
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent ( )
inlineprivate

Definition at line 371 of file FedRawDataInputSource.cc.

References InputFile::advance(), assert(), bufferInputRead_, InputFile::bufferPosition_, checkEvery_, chunkIsFree_, InputFile::chunkPosition_, InputFile::chunks_, crc32c(), InputFile::currentChunk_, currentFile_, currentFileIndex_, currentLumiSection_, cvWakeup_, daqDirector_, deleteFile(), detectedFRDversion_, event_, eventChunkSize_, eventsThisLumi_, Exception, exceptionState(), fileDeleteLock_, fileListMode_, InputFile::fileName_, fileQueue_, InputFile::fileSize_, filesToDelete_, fms_, FRDHeaderVersionSize, freeChunks_, getLSFromFilename_, evf::EvFDaqDirector::getStreamFileTracker(), evf::FastMonitoringThread::inCachedEvent, evf::FastMonitoringThread::inChecksumEvent, evf::FastMonitoringThread::inChunkReceived, evf::FastMonitoringThread::inNewLumi, evf::FastMonitoringThread::inProcessingFile, evf::FastMonitoringThread::inRunEnd, evf::FastMonitoringThread::inWaitChunk, evf::FastMonitoringThread::inWaitInput, evf::EvFDaqDirector::isSingleStreamThread(), dttmaxenums::L, InputFile::lumi_, maybeOpenNewLumiSection(), InputFile::moveToPreviousChunk(), mWakeup_, InputFile::nChunks_, InputFile::nEvents_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, InputFile::nProcessed_, nStreams_, InputFile::parent_, readingFilesCount_, readNextChunkIntoBuffer(), reportEventsThisLumiInSource(), InputFile::rewindChunk(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, evf::EvFDaqDirector::sameFile, evf::FastMonitoringService::setExceptionDetected(), setExceptionState_, evf::FastMonitoringService::setInState(), singleBufferMode_, mps_update::status, InputFile::status_, streamFileTrackerPtr_, threadError(), mps_check::timeout, evf::EvFDaqDirector::updateFileIndex(), verifyAdler32_, verifyChecksum_, and InputFile::waitForChunk().

Referenced by nextEvent().

372 {
373 
375  if (!currentFile_)
376  {
377  if (!streamFileTrackerPtr_) {
381  }
382 
385  if (!fileQueue_.try_pop(currentFile_))
386  {
387  //sleep until wakeup (only in single-buffer mode) or timeout
388  std::unique_lock<std::mutex> lkw(mWakeup_);
389  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_)
391  }
392  status = currentFile_->status_;
393  if ( status == evf::EvFDaqDirector::runEnded)
394  {
396  delete currentFile_;
397  currentFile_=nullptr;
398  return status;
399  }
400  else if ( status == evf::EvFDaqDirector::runAbort)
401  {
402  throw cms::Exception("FedRawDataInputSource::getNextEvent") << "Run has been aborted by the input source reader thread";
403  }
404  else if (status == evf::EvFDaqDirector::newLumi)
405  {
407  if (getLSFromFilename_) {
410  eventsThisLumi_=0;
412  }
413  }
414  else {//let this be picked up from next event
416  }
417 
418  delete currentFile_;
419  currentFile_=nullptr;
420  return status;
421  }
422  else if (status == evf::EvFDaqDirector::newFile) {
425  }
426  else
427  assert(0);
428  }
430 
431  //file is empty
432  if (!currentFile_->fileSize_) {
434  //try to open new lumi
436  if (getLSFromFilename_)
439  eventsThisLumi_=0;
441  }
442  //immediately delete empty file
444  delete currentFile_;
445  currentFile_=nullptr;
447  }
448 
449  //file is finished
452  //release last chunk (it is never released elsewhere)
455  {
456  throw cms::Exception("FedRawDataInputSource::getNextEvent")
457  << "Fully processed " << currentFile_->nProcessed_
458  << " from the file " << currentFile_->fileName_
459  << " but according to BU JSON there should be "
460  << currentFile_->nEvents_ << " events";
461  }
462  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
463  if (singleBufferMode_) {
464  std::unique_lock<std::mutex> lkw(mWakeup_);
465  cvWakeup_.notify_one();
466  }
469  //put the file in pending delete list;
470  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
471  filesToDelete_.push_back(std::pair<int,InputFile*>(currentFileIndex_,currentFile_));
472  }
473  else {
474  //in single-thread and stream jobs, events are already processed
476  delete currentFile_;
477  }
478  currentFile_=nullptr;
480  }
481 
482 
483  //file is too short
485  {
486  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
487  "Premature end of input file while reading event header";
488  }
489  if (singleBufferMode_) {
490 
491  //should already be there
494  usleep(10000);
496  }
498 
499  unsigned char *dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
500 
501  //conditions when read amount is not sufficient for the header to fit
502  if (!bufferInputRead_ || bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_]
504  {
506 
507  if (detectedFRDversion_==0) {
508  detectedFRDversion_=*((uint32*)dataPosition);
509  if (detectedFRDversion_>5)
510  throw cms::Exception("FedRawDataInputSource::getNextEvent")
511  << "Unknown FRD version -: " << detectedFRDversion_;
512  assert(detectedFRDversion_>=1);
513  }
514 
515  //recalculate chunk position
516  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
517  if ( bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_])
518  {
519  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
520  "Premature end of input file while reading event header";
521  }
522  }
523 
524  event_.reset( new FRDEventMsgView(dataPosition) );
525  if (event_->size()>eventChunkSize_) {
526  throw cms::Exception("FedRawDataInputSource::getNextEvent")
527  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
528  << " run:" << event_->run() << " of size:" << event_->size()
529  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
530  }
531 
532  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
533 
535  {
536  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
537  "Premature end of input file while reading event data";
538  }
539  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
541  //recalculate chunk position
542  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
543  event_.reset( new FRDEventMsgView(dataPosition) );
544  }
545  currentFile_->bufferPosition_ += event_->size();
546  currentFile_->chunkPosition_ += event_->size();
547  //last chunk is released when this function is invoked next time
548 
549  }
550  //multibuffer mode:
551  else
552  {
553  //wait for the current chunk to become added to the vector
556  usleep(10000);
558  }
560 
561  //check if header is at the boundary of two chunks
562  chunkIsFree_ = false;
563  unsigned char *dataPosition;
564 
565  //read header, copy it to a single chunk if necessary
566  bool chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]);
567 
568  event_.reset( new FRDEventMsgView(dataPosition) );
569  if (event_->size()>eventChunkSize_) {
570  throw cms::Exception("FedRawDataInputSource::getNextEvent")
571  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
572  << " run:" << event_->run() << " of size:" << event_->size()
573  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
574  }
575 
576  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
577 
579  {
580  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
581  "Premature end of input file while reading event data";
582  }
583 
584  if (chunkEnd) {
585  //header was at the chunk boundary, we will have to move payload as well
586  currentFile_->moveToPreviousChunk(msgSize,FRDHeaderVersionSize[detectedFRDversion_]);
587  chunkIsFree_ = true;
588  }
589  else {
590  //header was contiguous, but check if payload fits the chunk
591  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
592  //rewind to header start position
593  currentFile_->rewindChunk(FRDHeaderVersionSize[detectedFRDversion_]);
594  //copy event to a chunk start and move pointers
595  chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]+msgSize);
596  assert(chunkEnd);
597  chunkIsFree_=true;
598  //header is moved
599  event_.reset( new FRDEventMsgView(dataPosition) );
600  }
601  else {
602  //everything is in a single chunk, only move pointers forward
603  chunkEnd = currentFile_->advance(dataPosition,msgSize);
604  assert(!chunkEnd);
605  chunkIsFree_=false;
606  }
607  }
608  }//end multibuffer mode
610 
611  if (verifyChecksum_ && event_->version() >= 5)
612  {
613  uint32_t crc=0;
614  crc = crc32c(crc,(const unsigned char*)event_->payload(),event_->eventSize());
615  if ( crc != event_->crc32c() ) {
617  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
618  "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() <<
619  " but calculated 0x" << crc;
620  }
621  }
622  else if ( verifyAdler32_ && event_->version() >= 3)
623  {
624  uint32_t adler = adler32(0L,Z_NULL,0);
625  adler = adler32(adler,(Bytef*)event_->payload(),event_->eventSize());
626 
627  if ( adler != event_->adler32() ) {
629  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
630  "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() <<
631  " but calculated 0x" << adler;
632  }
633  }
635 
637 
639 }
unsigned int lumi_
uint32_t chunkPosition_
std::condition_variable cvWakeup_
void setExceptionDetected(unsigned int ls)
std::vector< int > * getStreamFileTracker()
void maybeOpenNewLumiSection(const uint32_t lumiSection)
void setInState(FastMonitoringThread::InputState inputState)
void rewindChunk(const size_t size)
const uint32 FRDHeaderVersionSize[6]
assert(m_qm.get())
std::vector< int > * streamFileTrackerPtr_
evf::EvFDaqDirector::FileStatus status_
int timeout
Definition: mps_check.py:51
FedRawDataInputSource * parent_
uint32_t bufferPosition_
void updateFileIndex(int const &fileIndex)
bool advance(unsigned char *&dataPosition, const size_t size)
void moveToPreviousChunk(const size_t size, const size_t offset)
bool waitForChunk(unsigned int chunkid)
unsigned int uint32
Definition: MsgTools.h:13
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
Definition: crc32c.cc:340
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
unsigned int currentChunk_
tbb::concurrent_queue< InputFile * > fileQueue_
std::unique_ptr< FRDEventMsgView > event_
std::string fileName_
tbb::concurrent_vector< InputChunk * > chunks_
evf::EvFDaqDirector * daqDirector_
std::atomic< unsigned int > readingFilesCount_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
void deleteFile(std::string const &)
evf::FastMonitoringService * fms_
tbb::concurrent_queue< InputChunk * > freeChunks_
void readNextChunkIntoBuffer(InputFile *file)
tuple status
Definition: mps_update.py:57
int FedRawDataInputSource::grabNextJsonFile ( boost::filesystem::path const &  jsonSourcePath)
private

Definition at line 800 of file FedRawDataInputSource.cc.

References filterCSVwithJSON::copy, daqDirector_, data, jsoncollector::DataPoint::deserialize(), reco::dp, dpd_, alignCSCRings::e, cppFunctionSkipper::exception, Exception, fuOutputDir_, jsoncollector::DataPoint::getData(), Json::Reader::getFormatedErrorMessages(), jsoncollector::DataPointDefinition::getNames(), i, LogDebug, Json::Reader::parse(), fed_dqm_sourceclient-live_cfg::path, matplotRender::reader, MatrixUtil::remove(), contentValuesCheck::ss, AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, and evf::EvFDaqDirector::unlockFULocal().

Referenced by readSupervisor().

801 {
803  try {
804  // assemble json destination path
806 
807  //TODO:should be ported to use fffnaming
808  std::ostringstream fileNameWithPID;
809  fileNameWithPID << jsonSourcePath.stem().string() << "_pid"
810  << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
811  jsonDestPath /= fileNameWithPID.str();
812 
813  LogDebug("FedRawDataInputSource") << "JSON rename -: " << jsonSourcePath << " to "
814  << jsonDestPath;
815  try {
816  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
817  }
818  catch (const boost::filesystem::filesystem_error& ex)
819  {
820  // Input dir gone?
821  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
822  // << " Maybe the file is not yet visible by FU. Trying again in one second";
823  sleep(1);
824  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
825  }
827 
828  try {
829  //sometimes this fails but file gets deleted
830  boost::filesystem::remove(jsonSourcePath);
831  }
832  catch (const boost::filesystem::filesystem_error& ex)
833  {
834  // Input dir gone?
835  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
836  }
837  catch (std::exception& ex)
838  {
839  // Input dir gone?
840  edm::LogError("FedRawDataInputSource") << "grabNextFile std::exception CAUGHT -: " << ex.what();
841  }
842 
843  boost::filesystem::ifstream ij(jsonDestPath);
844  Json::Value deserializeRoot;
846 
847  std::stringstream ss;
848  ss << ij.rdbuf();
849  if (!reader.parse(ss.str(), deserializeRoot)) {
850  edm::LogError("FedRawDataInputSource") << "Failed to deserialize JSON file -: " << jsonDestPath
851  << "\nERROR:\n" << reader.getFormatedErrorMessages()
852  << "CONTENT:\n" << ss.str()<<".";
853  throw std::runtime_error("Cannot deserialize input JSON file");
854  }
855 
856  //read BU JSON
858  DataPoint dp;
859  dp.deserialize(deserializeRoot);
860  bool success = false;
861  for (unsigned int i=0;i<dpd_->getNames().size();i++) {
862  if (dpd_->getNames().at(i)=="NEvents")
863  if (i<dp.getData().size()) {
864  data = dp.getData()[i];
865  success=true;
866  }
867  }
868  if (!success) {
869  if (dp.getData().size())
870  data = dp.getData()[0];
871  else
872  throw cms::Exception("FedRawDataInputSource::grabNextJsonFile") <<
873  " error reading number of events from BU JSON -: No input value " << data;
874  }
875  return boost::lexical_cast<int>(data);
876 
877  }
878  catch (const boost::filesystem::filesystem_error& ex)
879  {
880  // Input dir gone?
882  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
883  }
884  catch (std::runtime_error e)
885  {
886  // Another process grabbed the file and NFS did not register this
888  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - runtime Exception -: " << e.what();
889  }
890 
891  catch( boost::bad_lexical_cast const& ) {
892  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - error parsing number of events from BU JSON. "
893  << "Input value is -: " << data;
894  }
895 
896  catch (std::exception e)
897  {
898  // BU run directory disappeared?
900  edm::LogError("FedRawDataInputSource") << "grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
901  }
902 
903  return -1;
904 }
#define LogDebug(id)
std::vector< std::string > & getData()
Definition: DataPoint.h:58
int i
Definition: DBlmapReader.cc:9
jsoncollector::DataPointDefinition * dpd_
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
Represents a JSON value.
Definition: value.h:111
virtual void deserialize(Json::Value &root)
Definition: DataPoint.cc:56
auto dp
Definition: deltaR.h:22
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
evf::EvFDaqDirector * daqDirector_
Unserialize a JSON document into a Value.
Definition: reader.h:16
std::vector< std::string > const & getNames()
std::string getFormatedErrorMessages() const
Returns a user friendly string that list errors in the parsed document.
long FedRawDataInputSource::initFileList ( )
private

Definition at line 1417 of file FedRawDataInputSource.cc.

References a, b, end, MillePedeFileConverter_cfg::fileName, fileNames_, fed_dqm_sourceclient-live_cfg::path, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FedRawDataInputSource().

1418 {
1419  std::sort(fileNames_.begin(),fileNames_.end(),
1420  [](std::string a, std::string b) {
1421  if (a.rfind("/")!=std::string::npos) a=a.substr(a.rfind("/"));
1422  if (b.rfind("/")!=std::string::npos) b=b.substr(b.rfind("/"));
1423  return b > a;});
1424 
1425  if (fileNames_.size()) {
1426  //get run number from first file in the vector
1428  std::string fileStem = fileName.stem().string();
1429  auto end = fileStem.find("_");
1430  if (fileStem.find("run")==0) {
1431  std::string runStr = fileStem.substr(3,end-3);
1432  try {
1433  //get long to support test run numbers < 2^32
1434  long rval = boost::lexical_cast<long>(runStr);
1435  edm::LogInfo("FedRawDataInputSource") << "Autodetected run number in fileListMode -: "<< rval;
1436  return rval;
1437  }
1438  catch( boost::bad_lexical_cast const& ) {
1439  edm::LogWarning("FedRawDataInputSource") << "Unable to autodetect run number in fileListMode from file -: "<< fileName;
1440  }
1441  }
1442  }
1443  return -1;
1444 }
std::vector< std::string > fileNames_
#define end
Definition: vmac.h:37
unsigned long long int rval
Definition: vlib.h:22
double b
Definition: hdecay.h:120
double a
Definition: hdecay.h:121
void FedRawDataInputSource::maybeOpenNewLumiSection ( const uint32_t  lumiSection)
private

Definition at line 320 of file FedRawDataInputSource.cc.

References createBoLSFile(), currentLumiSection_, daqDirector_, newFWLiteAna::found, evf::EvFDaqDirector::getEoLSFilePathOnFU(), edm::Timestamp::invalidTimestamp(), evf::EvFDaqDirector::lockFULocal2(), edm::InputSource::luminosityBlock(), edm::InputSource::luminosityBlockAuxiliary(), processHistoryID_, edm::InputSource::resetLuminosityBlockAuxiliary(), edm::InputSource::run(), edm::InputSource::runAuxiliary(), edm::InputSource::setLuminosityBlockAuxiliary(), AlCaHLTBitMon_QueryRunRegistry::string, and evf::EvFDaqDirector::unlockFULocal2().

Referenced by checkNextEvent(), and getNextEvent().

321 {
323  || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
324 
325  if ( currentLumiSection_ > 0) {
326  const std::string fuEoLS =
328  struct stat buf;
329  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
330  if ( !found ) {
332  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
333  close(eol_fd);
334  createBoLSFile(lumiSection,false);
336  }
337  }
338  else createBoLSFile(lumiSection,true);//needed for initial lumisection
339 
340  currentLumiSection_ = lumiSection;
341 
343 
344  timeval tv;
345  gettimeofday(&tv, 0);
346  const edm::Timestamp lsopentime( (unsigned long long) tv.tv_sec * 1000000 + (unsigned long long) tv.tv_usec );
347 
348  edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary =
350  runAuxiliary()->run(),
351  lumiSection, lsopentime,
353 
354  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
355  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
356 
357  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: "<< lumiSection;
358  }
359 }
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists)
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:590
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:357
edm::ProcessHistoryID processHistoryID_
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:596
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:262
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:365
evf::EvFDaqDirector * daqDirector_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:265
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::nextEvent ( )
inlineprivate

Definition at line 361 of file FedRawDataInputSource.cc.

References getNextEvent(), svgfig::load(), evf::EvFDaqDirector::noFile, edm::shutdown_flag, and mps_update::status.

Referenced by checkNextEvent().

362 {
364  while ((status = getNextEvent())==evf::EvFDaqDirector::noFile)
365  {
366  if (edm::shutdown_flag.load(std::memory_order_relaxed)) break;
367  }
368  return status;
369 }
volatile std::atomic< bool > shutdown_flag
def load
Definition: svgfig.py:546
evf::EvFDaqDirector::FileStatus getNextEvent()
tuple status
Definition: mps_update.py:57
void FedRawDataInputSource::postForkReacquireResources ( std::shared_ptr< edm::multicore::MessageReceiverForSource )
overrideprivatevirtual

Definition at line 909 of file FedRawDataInputSource.cc.

References edm::Timestamp::beginOfTime(), edm::Timestamp::invalidTimestamp(), runNumber_, and edm::InputSource::setRunAuxiliary().

910 {
911  InputSource::rewind();
915 }
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
static Timestamp beginOfTime()
Definition: Timestamp.h:103
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:353
void FedRawDataInputSource::preForkReleaseResources ( )
overrideprivatevirtual

Reimplemented from edm::RawInputSource.

Definition at line 906 of file FedRawDataInputSource.cc.

907 {}
void FedRawDataInputSource::read ( edm::EventPrincipal eventPrincipal)
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 674 of file FedRawDataInputSource.cc.

References assert(), printConversionInfo::aux, edm::DaqProvenanceHelper::branchDescription(), checkEvery_, chunkIsFree_, InputFile::chunks_, InputFile::currentChunk_, currentFile_, currentLumiSection_, daqProvenanceHelper_, deleteFile(), edm::DaqProvenanceHelper::dummyProvenance(), eventID_, eventRunNumber_, eventsThisLumi_, fileDeleteLock_, fileListLoopMode_, filesToDelete_, fillFEDRawDataCollection(), fms_, freeChunks_, GTPEventID_, i, evf::FastMonitoringThread::inNoRequest, evf::FastMonitoringThread::inReadCleanup, evf::FastMonitoringThread::inReadEvent, L1EventID_, edm::RawInputSource::makeEvent(), evf::evtn::makeEventAuxiliary(), eostools::move(), InputFile::nProcessed_, nStreams_, edm::EventAuxiliary::PhysicsTrigger, edm::InputSource::processGUID(), processHistoryID_, edm::EventPrincipal::put(), record, evf::FastMonitoringService::setInState(), edm::EventAuxiliary::setProcessHistoryID(), streamFileTrackerPtr_, tcds_pointer_, and useL1EventID_.

Referenced by edmIntegrityCheck.PublishToFileSystem::get(), Vispa.Plugins.EdmBrowser.EdmDataAccessor.EdmDataAccessor::goto(), readNextChunkIntoBuffer(), readWorker(), and Vispa.Plugins.EdmBrowser.EdmDataAccessor.EdmDataAccessor::setFilterBranches().

675 {
677  std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
678  edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData);
679 
680  if (useL1EventID_){
682  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
684  aux.setProcessHistoryID(processHistoryID_);
685  makeEvent(eventPrincipal, aux);
686  }
687  else if(tcds_pointer_==0){
690  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
692  aux.setProcessHistoryID(processHistoryID_);
693  makeEvent(eventPrincipal, aux);
694  }
695  else{
696  evf::evtn::TCDSRecord record((unsigned char *)(tcds_pointer_));
701  makeEvent(eventPrincipal, aux);
702  }
703 
704 
705 
706  std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
707 
708  //FWCore/Sources DaqProvenanceHelper before 7_1_0_pre3
709  //eventPrincipal.put(daqProvenanceHelper_.constBranchDescription_, edp,
710  // daqProvenanceHelper_.dummyProvenance_);
711 
712  eventPrincipal.put(daqProvenanceHelper_.branchDescription(), std::move(edp),
714 
715  eventsThisLumi_++;
717 
718  //this old file check runs no more often than every 10 events
719  if (!((currentFile_->nProcessed_-1)%(checkEvery_))) {
720  //delete files that are not in processing
721  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
722  auto it = filesToDelete_.begin();
723  while (it!=filesToDelete_.end()) {
724  bool fileIsBeingProcessed = false;
725  for (unsigned int i=0;i<nStreams_;i++) {
726  if (it->first == streamFileTrackerPtr_->at(i)) {
727  fileIsBeingProcessed = true;
728  break;
729  }
730  }
731  if (!fileIsBeingProcessed) {
732  deleteFile(it->second->fileName_);
733  delete it->second;
734  it = filesToDelete_.erase(it);
735  }
736  else it++;
737  }
738 
739  }
741  chunkIsFree_=false;
743  return;
744 }
int i
Definition: DBlmapReader.cc:9
JetCorrectorParameters::Record record
Definition: classes.h:7
void setInState(FastMonitoringThread::InputState inputState)
assert(m_qm.get())
std::vector< int > * streamFileTrackerPtr_
ProductProvenance const & dummyProvenance() const
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:217
edm::EventAuxiliary makeEventAuxiliary(TCDSRecord *record, unsigned int runNumber, unsigned int lumiSection, std::string const &processGUID, bool verifyLumiSection)
def move
Definition: eostools.py:510
BranchDescription const & branchDescription() const
edm::ProcessHistoryID processHistoryID_
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance) const
const edm::DaqProvenanceHelper daqProvenanceHelper_
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
unsigned int currentChunk_
void setProcessHistoryID(ProcessHistoryID const &phid)
tbb::concurrent_vector< InputChunk * > chunks_
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
void deleteFile(std::string const &)
evf::FastMonitoringService * fms_
tbb::concurrent_queue< InputChunk * > freeChunks_
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection &)
void FedRawDataInputSource::readNextChunkIntoBuffer ( InputFile file)
private

Definition at line 1338 of file FedRawDataInputSource.cc.

References bufferInputRead_, InputFile::chunkPosition_, InputFile::chunks_, eventChunkBlock_, eventChunkSize_, Exception, fileDescriptor_, InputFile::fileName_, InputFile::fileSize_, i, plotBeamSpotDB::last, LogDebug, read(), and readBlocks_.

Referenced by getNextEvent().

1339 {
1340 
1341  if (fileDescriptor_<0) {
1342  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1343  bufferInputRead_ = 0;
1344  //off_t pos = lseek(fileDescriptor,0,SEEK_SET);
1345  if (fileDescriptor_>=0)
1346  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1347  else
1348  {
1349  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer") << "failed to open file " << std::endl
1350  << file->fileName_ << " fd:" << fileDescriptor_;
1351  }
1352  }
1353 
1354  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1355  uint32_t existingSize = 0;
1356  for (unsigned int i=0;i<readBlocks_;i++)
1357  {
1358  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1359  bufferInputRead_+=last;
1360  existingSize+=last;
1361  }
1362  }
1363  else {
1364  const uint32_t chunksize = file->chunkPosition_;
1365  const uint32_t blockcount=chunksize/eventChunkBlock_;
1366  const uint32_t leftsize = chunksize%eventChunkBlock_;
1367  uint32_t existingSize = eventChunkSize_ - file->chunkPosition_;
1368  memmove((void*) file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSize);
1369 
1370  for (uint32_t i=0;i<blockcount;i++) {
1371  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1372  bufferInputRead_+=last;
1373  existingSize+=last;
1374  }
1375  if (leftsize) {
1376  const ssize_t last = ::read(fileDescriptor_,( void*)( file->chunks_[0]->buf_ + existingSize ), leftsize);
1377  bufferInputRead_+=last;
1378  }
1379  file->chunkPosition_=0;//data was moved to beginning of the chunk
1380  }
1381  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1382  if (fileDescriptor_!=-1)
1383  {
1384  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1385  close(fileDescriptor_);
1386  fileDescriptor_=-1;
1387  }
1388  }
1389 }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
uint32_t chunkPosition_
virtual void read(edm::EventPrincipal &eventPrincipal) override
std::string fileName_
tbb::concurrent_vector< InputChunk * > chunks_
void FedRawDataInputSource::readSupervisor ( )
private

Definition at line 921 of file FedRawDataInputSource.cc.

References assert(), InputFile::chunks_, counter, cvReader_, cvWakeup_, daqDirector_, eventChunkSize_, fileListMode_, fileQueue_, fms_, freeChunks_, getFile(), getLSFromFilename_, grabNextJsonFile(), i, InputFile, evf::FastMonitoringThread::inRunEnd, evf::FastMonitoringThread::inSupBusy, evf::FastMonitoringThread::inSupFileLimit, evf::FastMonitoringThread::inSupLockPolling, evf::FastMonitoringThread::inSupNewFile, evf::FastMonitoringThread::inSupNewFileWaitChunk, evf::FastMonitoringThread::inSupNewFileWaitChunkCopying, evf::FastMonitoringThread::inSupNewFileWaitThread, evf::FastMonitoringThread::inSupNewFileWaitThreadCopying, evf::FastMonitoringThread::inSupNoFile, evf::FastMonitoringThread::inSupWaitFreeChunk, evf::FastMonitoringThread::inSupWaitFreeChunkCopying, evf::FastMonitoringThread::inSupWaitFreeThread, evf::FastMonitoringThread::inSupWaitFreeThreadCopying, j, LogDebug, eostools::ls(), maxBufferedFiles_, mReader_, mWakeup_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, fed_dqm_sourceclient-live_cfg::path, quit_threads_, InputChunk::readComplete_, readingFilesCount_, evf::FastMonitoringService::reportLockWait(), InputChunk::reset(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, setExceptionState_, evf::FastMonitoringService::setInStateSup(), edm::shutdown_flag, singleBufferMode_, evf::FastMonitoringService::startedLookingForFile(), startupCv_, startupLock_, mps_update::status, evf::FastMonitoringService::stoppedLookingForFile(), AlCaHLTBitMon_QueryRunRegistry::string, thread_quit_signal, tid_active_, mps_check::timeout, evf::EvFDaqDirector::updateFuLock(), workerJob_, workerPool_, and workerThreads_.

Referenced by checkNextEvent().

922 {
923  bool stop=false;
924  unsigned int currentLumiSection = 0;
925  //threadInit_.exchange(true,std::memory_order_acquire);
926 
927  {
928  std::unique_lock<std::mutex> lk(startupLock_);
929  startupCv_.notify_one();
930  }
931 
932  uint32_t ls=0;
933  uint32_t monLS=1;
934  uint32_t lockCount=0;
935  uint64_t sumLockWaitTimeUs=0.;
936 
937  while (!stop) {
938 
939  //wait for at least one free thread and chunk
940  int counter=0;
942  {
943  //report state to monitoring
944  if (fms_) {
945  bool copy_active=false;
946  for (auto j : tid_active_) if (j) copy_active=true;
948  else if (freeChunks_.empty()) {
949  if (copy_active)
951  else
953  }
954  else {
955  if (copy_active)
957  else
959  }
960  }
961  std::unique_lock<std::mutex> lkw(mWakeup_);
962  //sleep until woken up by condition or a timeout
963  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
964  counter++;
965  //if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads...";
966  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
967  }
968  else {
969  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
970  }
971  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {stop=true;break;}
972  }
973 
974  if (stop) break;
975 
976  //look for a new file
977  std::string nextFile;
978  uint32_t fileSize;
979 
980  if (fms_) {
984  }
985 
987 
988  while (status == evf::EvFDaqDirector::noFile) {
989  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
990  stop=true;
991  break;
992  }
993 
994  uint64_t thisLockWaitTimeUs=0.;
995  if (fileListMode_) {
996  //return LS if LS not set, otherwise return file
997  status = getFile(ls,nextFile,fileSize,thisLockWaitTimeUs);
998  }
999  else
1000  status = daqDirector_->updateFuLock(ls,nextFile,fileSize,thisLockWaitTimeUs);
1001 
1003 
1004  if (currentLumiSection!=ls && status==evf::EvFDaqDirector::runEnded) status=evf::EvFDaqDirector::noFile;
1005 
1006  //monitoring of lock wait time
1007  if (thisLockWaitTimeUs>0.)
1008  sumLockWaitTimeUs+=thisLockWaitTimeUs;
1009  lockCount++;
1010  if (ls>monLS) {
1011  monLS=ls;
1012  if (lockCount)
1013  if (fms_) fms_->reportLockWait(monLS,sumLockWaitTimeUs,lockCount);
1014  lockCount=0;
1015  sumLockWaitTimeUs=0;
1016  }
1017 
1018  //check again for any remaining index/EoLS files after EoR file is seen
1019  if ( status == evf::EvFDaqDirector::runEnded && !fileListMode_) {
1021  usleep(100000);
1022  //now all files should have appeared in ramdisk, check again if any raw files were left behind
1023  status = daqDirector_->updateFuLock(ls,nextFile,fileSize,thisLockWaitTimeUs);
1024  if (currentLumiSection!=ls && status==evf::EvFDaqDirector::runEnded) status=evf::EvFDaqDirector::noFile;
1025  }
1026 
1027  if ( status == evf::EvFDaqDirector::runEnded) {
1029  stop=true;
1030  break;
1031  }
1032 
1033  //queue new lumisection
1034  if( getLSFromFilename_ && ls > currentLumiSection) {
1035  //fms_->setInStateSup(evf::FastMonitoringThread::inSupNewLumi);
1036  currentLumiSection = ls;
1037  fileQueue_.push(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
1038  }
1039 
1040  if( getLSFromFilename_ && currentLumiSection>0 && ls < currentLumiSection) {
1041  edm::LogError("FedRawDataInputSource") << "Got old LS ("<<ls<<") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<". Aborting execution."<<std::endl;
1043  stop=true;
1044  break;
1045  }
1046 
1047  int dbgcount=0;
1048  if (status == evf::EvFDaqDirector::noFile) {
1050  dbgcount++;
1051  if (!(dbgcount%20)) LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
1052  usleep(100000);
1053  }
1054  }
1055  if ( status == evf::EvFDaqDirector::newFile ) {
1057  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
1058 
1059 
1060  boost::filesystem::path rawFilePath(nextFile);
1061  std::string rawFile = rawFilePath.replace_extension(".raw").string();
1062 
1063  struct stat st;
1064  int stat_res = stat(rawFile.c_str(),&st);
1065  if (stat_res==-1) {
1066  edm::LogError("FedRawDataInputSource") << "Can not stat file (" << errno << "):-"<< rawFile << std::endl;
1067  setExceptionState_=true;
1068  stop=true;
1069  break;
1070  }
1071  fileSize=st.st_size;
1072 
1073  if (fms_) {
1077  }
1078  int eventsInNewFile;
1079  if (fileListMode_) {
1080  if (fileSize==0) eventsInNewFile=0;
1081  else eventsInNewFile=-1;
1082  }
1083  else {
1084  eventsInNewFile = grabNextJsonFile(nextFile);
1085  assert( eventsInNewFile>=0 );
1086  assert((eventsInNewFile>0) == (fileSize>0));//file without events must be empty
1087  }
1088  //int eventsInNewFile = fileListMode_ ? -1 : grabNextJsonFile(nextFile);
1089  //if (fileListMode_ && fileSize==0) {eventsInNewFile=0;}
1090  //if (!fileListMode_) {
1091  // assert( eventsInNewFile>=0 );
1092  // assert((eventsInNewFile>0) == (fileSize>0));//file without events must be empty
1093  //}
1094 
1095  if (!singleBufferMode_) {
1096  //calculate number of needed chunks
1097  unsigned int neededChunks = fileSize/eventChunkSize_;
1098  if (fileSize%eventChunkSize_) neededChunks++;
1099 
1100  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,this);
1102  fileQueue_.push(newInputFile);
1103 
1104  for (unsigned int i=0;i<neededChunks;i++) {
1105 
1106  if (fms_) {
1107  bool copy_active=false;
1108  for (auto j : tid_active_) if (j) copy_active=true;
1109  if (copy_active)
1111  else
1113  }
1114  //get thread
1115  unsigned int newTid = 0xffffffff;
1116  while (!workerPool_.try_pop(newTid)) {
1117  usleep(100000);
1118  }
1119 
1120  if (fms_) {
1121  bool copy_active=false;
1122  for (auto j : tid_active_) if (j) copy_active=true;
1123  if (copy_active)
1125  else
1127  }
1128  InputChunk * newChunk = nullptr;
1129  while (!freeChunks_.try_pop(newChunk)) {
1130  usleep(100000);
1131  if (quit_threads_.load(std::memory_order_relaxed)) break;
1132  }
1133 
1134  if (newChunk == nullptr) {
1135  //return unused tid if we received shutdown (nullptr chunk)
1136  if (newTid!=0xffffffff) workerPool_.push(newTid);
1137  stop = true;
1138  break;
1139  }
1141 
1142  std::unique_lock<std::mutex> lk(mReader_);
1143 
1144  unsigned int toRead = eventChunkSize_;
1145  if (i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
1146  newChunk->reset(i*eventChunkSize_,toRead,i);
1147 
1148  workerJob_[newTid].first=newInputFile;
1149  workerJob_[newTid].second=newChunk;
1150 
1151  //wake up the worker thread
1152  cvReader_[newTid]->notify_one();
1153  }
1154  }
1155  else {
1156  if (!eventsInNewFile) {
1157  //still queue file for lumi update
1158  std::unique_lock<std::mutex> lkw(mWakeup_);
1159  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,this);
1161  fileQueue_.push(newInputFile);
1162  cvWakeup_.notify_one();
1163  return;
1164  }
1165  //in single-buffer mode put single chunk in the file and let the main thread read the file
1166  InputChunk * newChunk;
1167  //should be available immediately
1168  while(!freeChunks_.try_pop(newChunk)) usleep(100000);
1169 
1170  std::unique_lock<std::mutex> lkw(mWakeup_);
1171 
1172  unsigned int toRead = eventChunkSize_;
1173  if (fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
1174  newChunk->reset(0,toRead,0);
1175  newChunk->readComplete_=true;
1176 
1177  //push file and wakeup main thread
1178  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,this);
1179  newInputFile->chunks_[0]=newChunk;
1181  fileQueue_.push(newInputFile);
1182  cvWakeup_.notify_one();
1183  }
1184  }
1185  }
1187  //make sure threads finish reading
1188  unsigned numFinishedThreads = 0;
1189  while (numFinishedThreads < workerThreads_.size()) {
1190  unsigned int tid;
1191  while (!workerPool_.try_pop(tid)) {usleep(10000);}
1192  std::unique_lock<std::mutex> lk(mReader_);
1193  thread_quit_signal[tid]=true;
1194  cvReader_[tid]->notify_one();
1195  numFinishedThreads++;
1196  }
1197  for (unsigned int i=0;i<workerThreads_.size();i++) {
1198  workerThreads_[i]->join();
1199  delete workerThreads_[i];
1200  }
1201 }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
std::condition_variable cvWakeup_
tbb::concurrent_queue< unsigned int > workerPool_
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
assert(m_qm.get())
def ls
Definition: eostools.py:348
void setInStateSup(FastMonitoringThread::InputState inputState)
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
volatile std::atomic< bool > shutdown_flag
int timeout
Definition: mps_check.py:51
std::vector< std::condition_variable * > cvReader_
void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
int grabNextJsonFile(boost::filesystem::path const &)
int j
Definition: DBlmapReader.cc:9
std::vector< std::thread * > workerThreads_
std::atomic< bool > readComplete_
unsigned long long uint64_t
Definition: Time.h:15
tbb::concurrent_queue< InputFile * > fileQueue_
std::condition_variable startupCv_
void stoppedLookingForFile(unsigned int lumi)
tbb::concurrent_vector< InputChunk * > chunks_
evf::EvFDaqDirector * daqDirector_
static std::atomic< unsigned int > counter
std::vector< unsigned int > thread_quit_signal
std::atomic< unsigned int > readingFilesCount_
evf::FastMonitoringService * fms_
friend class InputFile
Open Root file and provide MEs ############.
std::vector< unsigned int > tid_active_
tbb::concurrent_queue< InputChunk * > freeChunks_
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
tuple status
Definition: mps_update.py:57
void FedRawDataInputSource::readWorker ( unsigned int  tid)
private

Definition at line 1203 of file FedRawDataInputSource.cc.

References assert(), InputChunk::buf_, InputFile::chunks_, cvReader_, detectedFRDversion_, mps_update::diff, end, eventChunkBlock_, mergeVDriftHistosByStation::file, InputChunk::fileIndex_, InputFile::fileName_, plotBeamSpotDB::first, i, init, plotBeamSpotDB::last, LogDebug, mReader_, fileCollector::now, InputChunk::offset_, read(), readBlocks_, InputChunk::readComplete_, edm::second(), setExceptionState_, dqm_diff::start, startupCv_, startupLock_, thread_quit_signal, threadInit_, tid_active_, InputChunk::usedSize_, workerJob_, and workerPool_.

Referenced by FedRawDataInputSource().

1204 {
1205  bool init = true;
1206  threadInit_.exchange(true,std::memory_order_acquire);
1207 
1208  while (1) {
1209 
1210  tid_active_[tid]=false;
1211  std::unique_lock<std::mutex> lk(mReader_);
1212  workerJob_[tid].first=nullptr;
1213  workerJob_[tid].first=nullptr;
1214 
1215  assert(!thread_quit_signal[tid]);//should never get it here
1216  workerPool_.push(tid);
1217 
1218  if (init) {
1219  std::unique_lock<std::mutex> lk(startupLock_);
1220  init = false;
1221  startupCv_.notify_one();
1222  }
1223  cvReader_[tid]->wait(lk);
1224 
1225  if (thread_quit_signal[tid]) return;
1226  tid_active_[tid]=true;
1227 
1228  InputFile * file;
1229  InputChunk * chunk;
1230 
1231  assert(workerJob_[tid].first!=nullptr && workerJob_[tid].second!=nullptr);
1232 
1233  file = workerJob_[tid].first;
1234  chunk = workerJob_[tid].second;
1235 
1236  int fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1237  off_t pos = lseek(fileDescriptor,chunk->offset_,SEEK_SET);
1238 
1239 
1240  if (fileDescriptor>=0)
1241  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_ << " at offset " << pos;
1242  else
1243  {
1244  edm::LogError("FedRawDataInputSource") <<
1245  "readWorker failed to open file -: " << file->fileName_ << " fd:" << fileDescriptor <<
1246  " or seek to offset " << chunk->offset_ << ", lseek returned:" << pos;
1247  setExceptionState_=true;
1248  return;
1249 
1250  }
1251 
1252  unsigned int bufferLeft = 0;
1254  for (unsigned int i=0;i<readBlocks_;i++)
1255  {
1256  const ssize_t last = ::read(fileDescriptor,( void*) (chunk->buf_+bufferLeft), eventChunkBlock_);
1257  if ( last > 0 )
1258  bufferLeft+=last;
1259  if (last < eventChunkBlock_) {
1260  assert(chunk->usedSize_==i*eventChunkBlock_+last);
1261  break;
1262  }
1263  }
1265  auto diff = end-start;
1266  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1267  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB" << " in " << msec.count() << " ms ("<< (bufferLeft >> 20)/double(msec.count())<<" GB/s)";
1268  close(fileDescriptor);
1269 
1270  if (detectedFRDversion_==0 && chunk->offset_==0) detectedFRDversion_=*((uint32*)chunk->buf_);
1272  chunk->readComplete_=true;//this is atomic to secure the sequential buffer fill before becoming available for processing)
1273  file->chunks_[chunk->fileIndex_]=chunk;//put the completed chunk in the file chunk vector at predetermined index
1274 
1275  }
1276 }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
tuple start
Check for commandline option errors.
Definition: dqm_diff.py:58
virtual void read(edm::EventPrincipal &eventPrincipal) override
unsigned int offset_
tbb::concurrent_queue< unsigned int > workerPool_
assert(m_qm.get())
int init
Definition: HydjetWrapper.h:67
std::vector< ReaderInfo > workerJob_
std::vector< std::condition_variable * > cvReader_
list diff
Definition: mps_update.py:85
U second(std::pair< T, U > const &p)
unsigned char * buf_
#define end
Definition: vmac.h:37
unsigned int fileIndex_
unsigned int uint32
Definition: MsgTools.h:13
std::atomic< bool > readComplete_
std::string fileName_
std::condition_variable startupCv_
tbb::concurrent_vector< InputChunk * > chunks_
std::atomic< bool > threadInit_
std::vector< unsigned int > thread_quit_signal
std::vector< unsigned int > tid_active_
void FedRawDataInputSource::reportEventsThisLumiInSource ( unsigned int  lumi,
unsigned int  events 
)
private

Definition at line 1392 of file FedRawDataInputSource.cc.

References CommonMethods::lock(), monlock_, and sourceEventsReport_.

Referenced by checkNextEvent(), and getNextEvent().

1393 {
1394 
1395  std::lock_guard<std::mutex> lock(monlock_);
1396  auto itr = sourceEventsReport_.find(lumi);
1397  if (itr!=sourceEventsReport_.end())
1398  itr->second+=events;
1399  else
1401 }
tuple lumi
Definition: fjr2json.py:35
std::map< unsigned int, unsigned int > sourceEventsReport_
tuple events
Definition: patZpeak.py:19
void FedRawDataInputSource::rewind_ ( )
overrideprivatevirtual

Reimplemented from edm::RawInputSource.

Definition at line 917 of file FedRawDataInputSource.cc.

918 {}
void FedRawDataInputSource::threadError ( )
private

Definition at line 1278 of file FedRawDataInputSource.cc.

References Exception, and quit_threads_.

Referenced by InputFile::advance(), and getNextEvent().

1279 {
1280  quit_threads_=true;
1281  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1282 
1283 }
std::atomic< bool > quit_threads_

Friends And Related Function Documentation

friend class InputChunk
friend

Definition at line 46 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

friend class InputFile
friend

Open Root file and provide MEs ############.

Definition at line 45 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

Member Data Documentation

uint32_t FedRawDataInputSource::bufferInputRead_ = 0
private

Definition at line 179 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readNextChunkIntoBuffer().

unsigned int FedRawDataInputSource::checkEvery_ = 10
private

Definition at line 170 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

bool FedRawDataInputSource::chunkIsFree_ =false
private

Definition at line 142 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

InputFile* FedRawDataInputSource::currentFile_ = 0
private

Definition at line 141 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

int FedRawDataInputSource::currentFileIndex_ = -1
private

Definition at line 164 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

unsigned int FedRawDataInputSource::currentLumiSection_
private
std::vector<std::condition_variable*> FedRawDataInputSource::cvReader_
private
std::condition_variable FedRawDataInputSource::cvWakeup_
private

Definition at line 174 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

evf::EvFDaqDirector* FedRawDataInputSource::daqDirector_ = 0
private
const edm::DaqProvenanceHelper FedRawDataInputSource::daqProvenanceHelper_
private

Definition at line 115 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and read().

std::string FedRawDataInputSource::defPath_
private

Definition at line 89 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

uint32 FedRawDataInputSource::detectedFRDversion_ =0
private

Definition at line 140 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readWorker().

jsoncollector::DataPointDefinition* FedRawDataInputSource::dpd_
private

Definition at line 130 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and grabNextJsonFile().

std::unique_ptr<FRDEventMsgView> FedRawDataInputSource::event_
private

Definition at line 117 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), fillFEDRawDataCollection(), and getNextEvent().

unsigned int FedRawDataInputSource::eventChunkBlock_
private
unsigned int FedRawDataInputSource::eventChunkSize_
private
edm::EventID FedRawDataInputSource::eventID_
private

Definition at line 119 of file FedRawDataInputSource.h.

Referenced by read().

uint32_t FedRawDataInputSource::eventRunNumber_ =0
private

Definition at line 123 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and read().

unsigned int FedRawDataInputSource::eventsThisLumi_
private

Definition at line 127 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), getNextEvent(), and read().

unsigned long FedRawDataInputSource::eventsThisRun_ = 0
private

Definition at line 128 of file FedRawDataInputSource.h.

std::mutex FedRawDataInputSource::fileDeleteLock_
private

Definition at line 167 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and read().

int FedRawDataInputSource::fileDescriptor_ = -1
private

Definition at line 178 of file FedRawDataInputSource.h.

Referenced by readNextChunkIntoBuffer().

unsigned int FedRawDataInputSource::fileListIndex_ = 0
private

Definition at line 108 of file FedRawDataInputSource.h.

Referenced by getFile().

const bool FedRawDataInputSource::fileListLoopMode_
private

Definition at line 109 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), FedRawDataInputSource(), getFile(), and read().

const bool FedRawDataInputSource::fileListMode_
private
std::vector<std::string> FedRawDataInputSource::fileNames_
private

Definition at line 104 of file FedRawDataInputSource.h.

Referenced by getFile(), and initFileList().

std::list<std::pair<int,std::string> > FedRawDataInputSource::fileNamesToDelete_
private

Definition at line 166 of file FedRawDataInputSource.h.

tbb::concurrent_queue<InputFile*> FedRawDataInputSource::fileQueue_
private

Definition at line 152 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

std::list<std::pair<int,InputFile*> > FedRawDataInputSource::filesToDelete_
private
evf::FastMonitoringService* FedRawDataInputSource::fms_ = 0
private
tbb::concurrent_queue<InputChunk*> FedRawDataInputSource::freeChunks_
private

Definition at line 151 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), read(), and readSupervisor().

std::string FedRawDataInputSource::fuOutputDir_
private

Definition at line 113 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and grabNextJsonFile().

const bool FedRawDataInputSource::getLSFromFilename_
private

Definition at line 100 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), getNextEvent(), and readSupervisor().

uint32_t FedRawDataInputSource::GTPEventID_ = 0
private

Definition at line 124 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

uint32_t FedRawDataInputSource::L1EventID_ = 0
private

Definition at line 125 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and read().

unsigned int FedRawDataInputSource::loopModeIterationInc_ = 0
private

Definition at line 110 of file FedRawDataInputSource.h.

Referenced by getFile().

unsigned int FedRawDataInputSource::maxBufferedFiles_
private

Definition at line 95 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

std::mutex FedRawDataInputSource::monlock_
private

Definition at line 184 of file FedRawDataInputSource.h.

Referenced by getEventReport(), and reportEventsThisLumiInSource().

std::mutex FedRawDataInputSource::mReader_
private

Definition at line 154 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), readWorker(), and ~FedRawDataInputSource().

std::mutex FedRawDataInputSource::mWakeup_
private

Definition at line 173 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

unsigned int FedRawDataInputSource::nStreams_ = 0
private

Definition at line 169 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

unsigned int FedRawDataInputSource::numBuffers_
private

Definition at line 94 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

unsigned int FedRawDataInputSource::numConcurrentReads_
private

Definition at line 96 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and ~FedRawDataInputSource().

edm::ProcessHistoryID FedRawDataInputSource::processHistoryID_
private

Definition at line 120 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), maybeOpenNewLumiSection(), and read().

std::atomic<bool> FedRawDataInputSource::quit_threads_
private
unsigned int FedRawDataInputSource::readBlocks_
private
std::atomic<unsigned int> FedRawDataInputSource::readingFilesCount_
private

Definition at line 97 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

std::unique_ptr<std::thread> FedRawDataInputSource::readSupervisorThread_
private

Definition at line 145 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and ~FedRawDataInputSource().

edm::RunNumber_t FedRawDataInputSource::runNumber_
private
bool FedRawDataInputSource::setExceptionState_ = false
private

Definition at line 160 of file FedRawDataInputSource.h.

Referenced by exceptionState(), getNextEvent(), readSupervisor(), and readWorker().

bool FedRawDataInputSource::singleBufferMode_
private

Definition at line 177 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

std::map<unsigned int,unsigned int> FedRawDataInputSource::sourceEventsReport_
private

Definition at line 183 of file FedRawDataInputSource.h.

Referenced by getEventReport(), and reportEventsThisLumiInSource().

bool FedRawDataInputSource::startedSupervisorThread_ = false
private

Definition at line 144 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and ~FedRawDataInputSource().

std::condition_variable FedRawDataInputSource::startupCv_
private
std::mutex FedRawDataInputSource::startupLock_
private
std::vector<int>* FedRawDataInputSource::streamFileTrackerPtr_ = 0
private

Definition at line 168 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

unsigned char* FedRawDataInputSource::tcds_pointer_
private

Definition at line 126 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

std::vector<unsigned int> FedRawDataInputSource::thread_quit_signal
private
std::atomic<bool> FedRawDataInputSource::threadInit_
private

Definition at line 181 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and readWorker().

std::vector<unsigned int> FedRawDataInputSource::tid_active_
private

Definition at line 156 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), readSupervisor(), and readWorker().

const bool FedRawDataInputSource::useL1EventID_
private

Definition at line 103 of file FedRawDataInputSource.h.

Referenced by read().

const bool FedRawDataInputSource::verifyAdler32_
private

Definition at line 101 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

const bool FedRawDataInputSource::verifyChecksum_
private

Definition at line 102 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

std::vector<ReaderInfo> FedRawDataInputSource::workerJob_
private

Definition at line 149 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), readSupervisor(), and readWorker().

tbb::concurrent_queue<unsigned int> FedRawDataInputSource::workerPool_
private

Definition at line 148 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), and readWorker().

std::vector<std::thread*> FedRawDataInputSource::workerThreads_
private