test
CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Static Public Member Functions | Protected Member Functions | Private Types | Private Member Functions | Private Attributes | Friends
FedRawDataInputSource Class Reference

#include <FedRawDataInputSource.h>

Inheritance diagram for FedRawDataInputSource:
edm::RawInputSource edm::InputSource edm::ProductRegistryHelper

Public Member Functions

 FedRawDataInputSource (edm::ParameterSet const &, edm::InputSourceDescription const &)
 
std::pair< bool, unsigned int > getEventReport (unsigned int lumi, bool erase)
 
virtual ~FedRawDataInputSource ()
 
- Public Member Functions inherited from edm::RawInputSource
 RawInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
 
virtual ~RawInputSource ()
 
- Public Member Functions inherited from edm::InputSource
std::shared_ptr< ActivityRegistryactReg () const
 Accessor for Activity Registry. More...
 
std::shared_ptr
< BranchIDListHelper const > 
branchIDListHelper () const
 Accessors for branchIDListHelper. More...
 
std::shared_ptr
< BranchIDListHelper > & 
branchIDListHelper ()
 
void closeFile (FileBlock *, bool cleaningUpAfterException)
 close current file More...
 
void doBeginJob ()
 Called by framework at beginning of job. More...
 
void doBeginLumi (LuminosityBlockPrincipal &lbp, ProcessContext const *)
 Called by framework at beginning of lumi block. More...
 
void doBeginRun (RunPrincipal &rp, ProcessContext const *)
 Called by framework at beginning of run. More...
 
void doEndJob ()
 Called by framework at end of job. More...
 
void doEndLumi (LuminosityBlockPrincipal &lbp, bool cleaningUpAfterException, ProcessContext const *)
 Called by framework at end of lumi block. More...
 
void doEndRun (RunPrincipal &rp, bool cleaningUpAfterException, ProcessContext const *)
 Called by framework at end of run. More...
 
void doPostForkReacquireResources (std::shared_ptr< multicore::MessageReceiverForSource >)
 
void doPreForkReleaseResources ()
 Called by the framework before forking the process. More...
 
ProcessingController::ForwardState forwardState () const
 
bool goToEvent (EventID const &eventID)
 
 InputSource (ParameterSet const &, InputSourceDescription const &)
 Constructor. More...
 
 InputSource (InputSource const &)=delete
 
void issueReports (EventID const &eventID)
 issue an event report More...
 
LuminosityBlockNumber_t luminosityBlock () const
 Accessor for current luminosity block number. More...
 
std::shared_ptr
< LuminosityBlockAuxiliary
luminosityBlockAuxiliary () const
 Called by the framework to merge or insert lumi in principal cache. More...
 
int maxEvents () const
 
int maxLuminosityBlocks () const
 
ModuleDescription const & moduleDescription () const
 Accessor for 'module' description. More...
 
ItemType nextItemType ()
 Advances the source to the next item. More...
 
InputSourceoperator= (InputSource const &)=delete
 
ProcessConfiguration const & processConfiguration () const
 Accessor for Process Configuration. More...
 
std::string const & processGUID () const
 Accessor for global process identifier. More...
 
ProcessHistoryRegistry const & processHistoryRegistry () const
 Accessors for process history registry. More...
 
ProcessHistoryRegistryprocessHistoryRegistry ()
 
ProcessingMode processingMode () const
 RunsLumisAndEvents (default), RunsAndLumis, or Runs. More...
 
std::shared_ptr
< ProductRegistry const > 
productRegistry () const
 Accessors for product registry. More...
 
std::shared_ptr
< ProductRegistry > & 
productRegistry ()
 
bool randomAccess () const
 
void readAndMergeLumi (LuminosityBlockPrincipal &lbp)
 Read next luminosity block (same as a prior lumi) More...
 
void readAndMergeRun (RunPrincipal &rp)
 Read next run (same as a prior run) More...
 
void readEvent (EventPrincipal &ep, StreamContext &)
 Read next event. More...
 
bool readEvent (EventPrincipal &ep, EventID const &, StreamContext &)
 Read a specific event. More...
 
std::unique_ptr< FileBlockreadFile ()
 Read next file. More...
 
void readLuminosityBlock (LuminosityBlockPrincipal &lumiPrincipal, HistoryAppender &historyAppender)
 Read next luminosity block (new lumi) More...
 
std::shared_ptr
< LuminosityBlockAuxiliary
readLuminosityBlockAuxiliary ()
 Read next luminosity block Auxilary. More...
 
void readRun (RunPrincipal &runPrincipal, HistoryAppender &historyAppender)
 Read next run (new run) More...
 
std::shared_ptr< RunAuxiliaryreadRunAuxiliary ()
 Read next run Auxiliary. More...
 
ProcessHistoryID const & reducedProcessHistoryID () const
 
void registerProducts ()
 Register any produced products. More...
 
int remainingEvents () const
 
int remainingLuminosityBlocks () const
 
void repeat ()
 Reset the remaining number of events/lumis to the maximum number. More...
 
std::pair
< SharedResourcesAcquirer
*, std::recursive_mutex * > 
resourceSharedWithDelayedReader ()
 Returns nullptr if no resource shared between the Source and a DelayedReader. More...
 
ProcessingController::ReverseState reverseState () const
 
void rewind ()
 Begin again at the first event. More...
 
RunNumber_t run () const
 Accessor for current run number. More...
 
std::shared_ptr< RunAuxiliaryrunAuxiliary () const
 Called by the framework to merge or insert run in principal cache. More...
 
void setLuminosityBlockNumber_t (LuminosityBlockNumber_t lb)
 Set the luminosity block ID. More...
 
void setRunNumber (RunNumber_t r)
 Set the run number. More...
 
void skipEvents (int offset)
 
bool skipForForking ()
 
std::shared_ptr
< ThinnedAssociationsHelper
const > 
thinnedAssociationsHelper () const
 Accessors for thinnedAssociationsHelper. More...
 
std::shared_ptr
< ThinnedAssociationsHelper > & 
thinnedAssociationsHelper ()
 
Timestamp const & timestamp () const
 Accessor for the current time, as seen by the input source. More...
 
virtual ~InputSource () noexcept(false)
 Destructor. More...
 

Static Public Member Functions

static void fillDescriptions (edm::ConfigurationDescriptions &descriptions)
 
- Static Public Member Functions inherited from edm::RawInputSource
static void fillDescription (ParameterSetDescription &description)
 
- Static Public Member Functions inherited from edm::InputSource
static const std::string & baseType ()
 
static void fillDescription (ParameterSetDescription &desc)
 
static void fillDescriptions (ConfigurationDescriptions &descriptions)
 
static void prevalidate (ConfigurationDescriptions &)
 

Protected Member Functions

virtual bool checkNextEvent () override
 
virtual void read (edm::EventPrincipal &eventPrincipal) override
 
- Protected Member Functions inherited from edm::RawInputSource
void makeEvent (EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
 
void setInputFileTransitionsEachEvent ()
 
- Protected Member Functions inherited from edm::InputSource
void decreaseRemainingEventsBy (int iSkipped)
 
bool eventCached () const
 
bool newLumi () const
 
bool newRun () const
 
ProcessHistoryRegistryprocessHistoryRegistryForUpdate ()
 
ProductRegistryproductRegistryUpdate ()
 
void reset () const
 
void resetEventCached ()
 
void resetLuminosityBlockAuxiliary (bool isNewLumi=true) const
 
void resetNewLumi ()
 
void resetNewRun ()
 
void resetRunAuxiliary (bool isNewRun=true) const
 
void setEventCached ()
 Called by the framework to merge or ached() const {return eventCached_;}. More...
 
void setLuminosityBlockAuxiliary (LuminosityBlockAuxiliary *lbp)
 
void setNewLumi ()
 
void setNewRun ()
 
void setRunAuxiliary (RunAuxiliary *rp)
 
void setTimestamp (Timestamp const &theTime)
 To set the current time, as seen by the input source. More...
 
virtual void skip (int offset)
 
ItemType state () const
 

Private Types

typedef std::pair< InputFile
*, InputChunk * > 
ReaderInfo
 

Private Member Functions

void createBoLSFile (const uint32_t lumiSection, bool checkIfExists)
 
void deleteFile (std::string const &)
 
bool exceptionState ()
 
edm::Timestamp fillFEDRawDataCollection (FEDRawDataCollection &)
 
evf::EvFDaqDirector::FileStatus getFile (unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
 
evf::EvFDaqDirector::FileStatus getNextEvent ()
 
int grabNextJsonFile (boost::filesystem::path const &)
 
long initFileList ()
 
void maybeOpenNewLumiSection (const uint32_t lumiSection)
 
evf::EvFDaqDirector::FileStatus nextEvent ()
 
virtual void postForkReacquireResources (std::shared_ptr< edm::multicore::MessageReceiverForSource >) override
 
virtual void preForkReleaseResources () override
 
void readNextChunkIntoBuffer (InputFile *file)
 
void readSupervisor ()
 
void readWorker (unsigned int tid)
 
void reportEventsThisLumiInSource (unsigned int lumi, unsigned int events)
 
virtual void rewind_ () override
 
void threadError ()
 

Private Attributes

uint32_t bufferInputRead_ = 0
 
unsigned int checkEvery_ = 10
 
bool chunkIsFree_ =false
 
InputFilecurrentFile_ = 0
 
int currentFileIndex_ = -1
 
unsigned int currentLumiSection_
 
std::vector
< std::condition_variable * > 
cvReader_
 
std::condition_variable cvWakeup_
 
evf::EvFDaqDirectordaqDirector_ = 0
 
const edm::DaqProvenanceHelper daqProvenanceHelper_
 
std::string defPath_
 
uint32 detectedFRDversion_ =0
 
jsoncollector::DataPointDefinitiondpd_
 
std::unique_ptr< FRDEventMsgViewevent_
 
unsigned int eventChunkBlock_
 
unsigned int eventChunkSize_
 
edm::EventID eventID_
 
uint32_t eventRunNumber_ =0
 
unsigned int eventsThisLumi_
 
unsigned long eventsThisRun_ = 0
 
std::mutex fileDeleteLock_
 
int fileDescriptor_ = -1
 
unsigned int fileListIndex_ = 0
 
const bool fileListLoopMode_
 
const bool fileListMode_
 
std::vector< std::string > fileNames_
 
std::list< std::pair< int,
std::string > > 
fileNamesToDelete_
 
tbb::concurrent_queue
< InputFile * > 
fileQueue_
 
std::list< std::pair< int,
InputFile * > > 
filesToDelete_
 
evf::FastMonitoringServicefms_ = 0
 
tbb::concurrent_queue
< InputChunk * > 
freeChunks_
 
std::string fuOutputDir_
 
const bool getLSFromFilename_
 
uint32_t GTPEventID_ = 0
 
uint32_t L1EventID_ = 0
 
unsigned int loopModeIterationInc_ = 0
 
unsigned int maxBufferedFiles_
 
std::mutex monlock_
 
std::mutex mReader_
 
std::mutex mWakeup_
 
unsigned int nStreams_ = 0
 
unsigned int numBuffers_
 
unsigned int numConcurrentReads_
 
edm::ProcessHistoryID processHistoryID_
 
std::atomic< bool > quit_threads_
 
unsigned int readBlocks_
 
std::atomic< unsigned int > readingFilesCount_
 
std::unique_ptr< std::thread > readSupervisorThread_
 
edm::RunNumber_t runNumber_
 
bool setExceptionState_ = false
 
bool singleBufferMode_
 
std::map< unsigned int,
unsigned int > 
sourceEventsReport_
 
bool startedSupervisorThread_ = false
 
std::condition_variable startupCv_
 
std::mutex startupLock_
 
std::vector< int > * streamFileTrackerPtr_ = 0
 
unsigned char * tcds_pointer_
 
std::vector< unsigned int > thread_quit_signal
 
std::atomic< bool > threadInit_
 
std::vector< unsigned int > tid_active_
 
const bool useL1EventID_
 
const bool verifyAdler32_
 
const bool verifyChecksum_
 
std::vector< ReaderInfoworkerJob_
 
tbb::concurrent_queue
< unsigned int > 
workerPool_
 
std::vector< std::thread * > workerThreads_
 

Friends

class InputChunk
 
class InputFile
 Open Root file and provide MEs ############. More...
 

Additional Inherited Members

- Public Types inherited from edm::InputSource
enum  ItemType {
  IsInvalid, IsStop, IsFile, IsRun,
  IsLumi, IsEvent, IsRepeat, IsSynchronize
}
 
enum  ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents }
 
typedef
ProductRegistryHelper::TypeLabelList 
TypeLabelList
 
- Public Attributes inherited from edm::InputSource
signalslot::Signal< void(StreamContext
const &, ModuleCallingContext
const &)> 
postEventReadFromSourceSignal_
 
signalslot::Signal< void(StreamContext
const &, ModuleCallingContext
const &)> 
preEventReadFromSourceSignal_
 

Detailed Description

Definition at line 43 of file FedRawDataInputSource.h.

Member Typedef Documentation

Definition at line 138 of file FedRawDataInputSource.h.

Constructor & Destructor Documentation

FedRawDataInputSource::FedRawDataInputSource ( edm::ParameterSet const &  pset,
edm::InputSourceDescription const &  desc 
)
explicit

Definition at line 54 of file FedRawDataInputSource.cc.

References assert(), edm::Timestamp::beginOfTime(), crc32c_hw_test(), cvReader_, daqDirector_, edm::DaqProvenanceHelper::daqInit(), daqProvenanceHelper_, defPath_, dpd_, alignCSCRings::e, eventChunkBlock_, eventChunkSize_, Exception, fileDeleteLock_, fileListLoopMode_, fileListMode_, filesToDelete_, fms_, freeChunks_, i, evf::FastMonitoringThread::inInit, initFileList(), InputChunk, edm::Timestamp::invalidTimestamp(), numBuffers_, numConcurrentReads_, cppFunctionSkipper::operator, processHistoryID_, edm::InputSource::processHistoryRegistryForUpdate(), edm::InputSource::productRegistryUpdate(), quit_threads_, readBlocks_, readingFilesCount_, readWorker(), edm::InputSource::runAuxiliary(), runNumber_, evf::EvFDaqDirector::setDeleteTracking(), evf::EvFDaqDirector::setFMS(), evf::FastMonitoringService::setInputSource(), evf::FastMonitoringService::setInState(), evf::FastMonitoringService::setInStateSup(), edm::InputSource::setNewRun(), edm::InputSource::setRunAuxiliary(), singleBufferMode_, startupCv_, startupLock_, AlCaHLTBitMon_QueryRunRegistry::string, thread_quit_signal, threadInit_, tid_active_, workerJob_, and workerThreads_.

55  :
57  defPath_(pset.getUntrackedParameter<std::string> ("buDefPath", std::string(getenv("CMSSW_BASE"))+"/src/EventFilter/Utilities/plugins/budef.jsd")),
58  eventChunkSize_(pset.getUntrackedParameter<unsigned int> ("eventChunkSize",32)*1048576),
59  eventChunkBlock_(pset.getUntrackedParameter<unsigned int> ("eventChunkBlock",32)*1048576),
60  numBuffers_(pset.getUntrackedParameter<unsigned int> ("numBuffers",2)),
61  maxBufferedFiles_(pset.getUntrackedParameter<unsigned int> ("maxBufferedFiles",2)),
62  getLSFromFilename_(pset.getUntrackedParameter<bool> ("getLSFromFilename", true)),
63  verifyAdler32_(pset.getUntrackedParameter<bool> ("verifyAdler32", true)),
64  verifyChecksum_(pset.getUntrackedParameter<bool> ("verifyChecksum", true)),
65  useL1EventID_(pset.getUntrackedParameter<bool> ("useL1EventID", false)),
66  fileNames_(pset.getUntrackedParameter<std::vector<std::string>> ("fileNames",std::vector<std::string>())),
67  fileListMode_(pset.getUntrackedParameter<bool> ("fileListMode", false)),
68  fileListLoopMode_(pset.getUntrackedParameter<bool> ("fileListLoopMode", false)),
72  eventID_(),
75  tcds_pointer_(0),
76  eventsThisLumi_(0),
77  dpd_(nullptr)
78 {
79  char thishost[256];
80  gethostname(thishost, 255);
81  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: "
82  << std::endl << (eventChunkSize_/1048576)
83  << " MB on host " << thishost;
84 
85  long autoRunNumber = -1;
86  if (fileListMode_) {
87  autoRunNumber = initFileList();
88  if (!fileListLoopMode_) {
89  if (autoRunNumber<0)
90  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") << "Run number not found from filename";
91  //override run number
92  runNumber_ = (edm::RunNumber_t)autoRunNumber;
93  edm::Service<evf::EvFDaqDirector>()->overrideRunNumber((unsigned int)autoRunNumber);
94  }
95  }
96 
98  setNewRun();
99  //todo:autodetect from file name (assert if names differ)
102 
103  std::string defPathSuffix = "src/EventFilter/Utilities/plugins/budef.jsd";
104  defPath_ = std::string(getenv("CMSSW_BASE")) + "/" + defPathSuffix;
105  struct stat statbuf;
106  if (stat(defPath_.c_str(), &statbuf)) {
107  defPath_ = std::string(getenv("CMSSW_RELEASE_BASE")) + "/" + defPathSuffix;
108  if (stat(defPath_.c_str(), &statbuf)) {
109  defPath_ = defPathSuffix;
110  }
111  }
112 
113  dpd_ = new DataPointDefinition();
114  std::string defLabel = "data";
115  DataPointDefinition::getDataPointDefinitionFor(defPath_, dpd_,&defLabel);
116 
117  //make sure that chunk size is N * block size
122 
123  if (!numBuffers_)
124  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") <<
125  "no reading enabled with numBuffers parameter 0";
126 
130 
131  if (!crc32c_hw_test())
132  edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
133 
134  //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
135  if (fileListMode_) {
136  try {
138  } catch(cms::Exception e) {
139  edm::LogInfo("FedRawDataInputSource") << "No FastMonitoringService found in the configuration";
140  }
141  }
142  else {
144  if (!fms_) {
145  throw cms::Exception("FedRawDataInputSource") << "FastMonitoringService not found";
146  }
147  }
148 
150  if (!daqDirector_)
151  cms::Exception("FedRawDataInputSource") << "EvFDaqDirector not found";
152 
153  //set DaqDirector to delete files in preGlobalEndLumi callback
155  if (fms_) {
157  fms_->setInputSource(this);
160  }
161  //should delete chunks when run stops
162  for (unsigned int i=0;i<numBuffers_;i++) {
164  }
165 
166  quit_threads_ = false;
167 
168  for (unsigned int i=0;i<numConcurrentReads_;i++)
169  {
170  std::unique_lock<std::mutex> lk(startupLock_);
171  //issue a memory fence here and in threads (constructor was segfaulting without this)
172  thread_quit_signal.push_back(false);
173  workerJob_.push_back(ReaderInfo(nullptr,nullptr));
174  cvReader_.push_back(new std::condition_variable);
175  tid_active_.push_back(0);
176  threadInit_.store(false,std::memory_order_release);
177  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker,this,i));
178  startupCv_.wait(lk);
179  }
180 
181  runAuxiliary()->setProcessHistoryID(processHistoryID_);
182 }
int i
Definition: DBlmapReader.cc:9
std::vector< std::string > fileNames_
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:356
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
bool crc32c_hw_test()
Definition: crc32c.cc:354
jsoncollector::DataPointDefinition * dpd_
void setInState(FastMonitoringThread::InputState inputState)
assert(m_qm.get())
void setInStateSup(FastMonitoringThread::InputState inputState)
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
std::vector< std::condition_variable * > cvReader_
static Timestamp beginOfTime()
Definition: Timestamp.h:103
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, InputFile * >> *filesToDelete)
edm::ProcessHistoryID processHistoryID_
const edm::DaqProvenanceHelper daqProvenanceHelper_
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
std::vector< std::thread * > workerThreads_
void setInputSource(FedRawDataInputSource *inputSource)
std::list< std::pair< int, InputFile * > > filesToDelete_
std::condition_variable startupCv_
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:264
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:357
std::atomic< bool > threadInit_
std::pair< InputFile *, InputChunk * > ReaderInfo
evf::EvFDaqDirector * daqDirector_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:359
void readWorker(unsigned int tid)
std::vector< unsigned int > thread_quit_signal
std::atomic< unsigned int > readingFilesCount_
evf::FastMonitoringService * fms_
std::vector< unsigned int > tid_active_
unsigned int RunNumber_t
tbb::concurrent_queue< InputChunk * > freeChunks_
void setFMS(evf::FastMonitoringService *fms)
FedRawDataInputSource::~FedRawDataInputSource ( )
virtual

Definition at line 184 of file FedRawDataInputSource.cc.

References cvReader_, deleteFile(), filesToDelete_, i, mReader_, numConcurrentReads_, quit_threads_, readSupervisorThread_, startedSupervisorThread_, thread_quit_signal, and workerThreads_.

185 {
186  quit_threads_=true;
187 
188  //delete any remaining open files
189  for (auto it = filesToDelete_.begin();it!=filesToDelete_.end();it++) {
190  deleteFile(it->second->fileName_);
191  delete it->second;
192  }
194  readSupervisorThread_->join();
195  }
196  else {
197  //join aux threads in case the supervisor thread was not started
198  for (unsigned int i=0;i<workerThreads_.size();i++) {
199  std::unique_lock<std::mutex> lk(mReader_);
200  thread_quit_signal[i]=true;
201  cvReader_[i]->notify_one();
202  lk.unlock();
203  workerThreads_[i]->join();
204  delete workerThreads_[i];
205  }
206  }
207  for (unsigned int i=0;i<numConcurrentReads_;i++) delete cvReader_[i];
208  /*
209  for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
210  InputChunk *ch;
211  while (!freeChunks_.try_pop(ch)) {}
212  delete ch;
213  }
214  */
215 }
int i
Definition: DBlmapReader.cc:9
std::atomic< bool > quit_threads_
std::unique_ptr< std::thread > readSupervisorThread_
std::vector< std::condition_variable * > cvReader_
std::vector< std::thread * > workerThreads_
std::list< std::pair< int, InputFile * > > filesToDelete_
std::vector< unsigned int > thread_quit_signal
void deleteFile(std::string const &)

Member Function Documentation

bool FedRawDataInputSource::checkNextEvent ( )
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 234 of file FedRawDataInputSource.cc.

References evf::EvFDaqDirector::createProcessingNotificationMaybe(), currentLumiSection_, daqDirector_, evf::EvFDaqDirector::emptyLumisectionMode(), event_, eventRunNumber_, eventsThisLumi_, fileListLoopMode_, fms_, newFWLiteAna::found, fuOutputDir_, evf::EvFDaqDirector::getEoLSFilePathOnBU(), evf::EvFDaqDirector::getEoLSFilePathOnFU(), evf::EvFDaqDirector::getEoRFilePathOnFU(), getLSFromFilename_, evf::FastMonitoringThread::inWaitInput, L1EventID_, evf::EvFDaqDirector::lockFULocal2(), maybeOpenNewLumiSection(), evf::EvFDaqDirector::newLumi, nextEvent(), evf::EvFDaqDirector::noFile, readSupervisor(), readSupervisorThread_, reportEventsThisLumiInSource(), edm::InputSource::resetLuminosityBlockAuxiliary(), evf::EvFDaqDirector::runEnded, runNumber_, edm::InputSource::setEventCached(), evf::FastMonitoringService::setInState(), startedSupervisorThread_, startupCv_, startupLock_, AlCaHLTBitMon_QueryRunRegistry::string, and evf::EvFDaqDirector::unlockFULocal2().

235 {
237  {
238  //late init of directory variable
240 
241  //this thread opens new files and dispatches reading to worker readers
242  //threadInit_.store(false,std::memory_order_release);
243  std::unique_lock<std::mutex> lk(startupLock_);
244  readSupervisorThread_.reset(new std::thread(&FedRawDataInputSource::readSupervisor,this));
246  startupCv_.wait(lk);
247  }
248  //signal hltd to start event accounting
252  switch (nextEvent() ) {
254  //maybe create EoL file in working directory before ending run
255  struct stat buf;
256  if ( currentLumiSection_ > 0) {
257  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
258  if (eolFound) {
260  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
261  if ( !found ) {
263  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
264  close(eol_fd);
266  }
267  }
268  }
269  //also create EoR file in FU data directory
270  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(),&buf) == 0);
271  if (!eorFound) {
272  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
273  close(eor_fd);
274  }
276  eventsThisLumi_=0;
278  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
279  return false;
280  }
282  //this is not reachable
283  return true;
284  }
286  //std::cout << "--------------NEW LUMI---------------" << std::endl;
287  return true;
288  }
289  default: {
290  if (!getLSFromFilename_) {
291  //get new lumi from file header
292  if (event_->lumi() > currentLumiSection_) {
294  eventsThisLumi_=0;
295  maybeOpenNewLumiSection( event_->lumi() );
296  }
297  }
298  if (fileListLoopMode_)
300  else
301  eventRunNumber_=event_->run();
302  L1EventID_ = event_->event();
303 
304  setEventCached();
305 
306  return true;
307  }
308  }
309 }
void maybeOpenNewLumiSection(const uint32_t lumiSection)
void setInState(FastMonitoringThread::InputState inputState)
void createProcessingNotificationMaybe() const
std::unique_ptr< std::thread > readSupervisorThread_
std::string getEoLSFilePathOnBU(const unsigned int ls) const
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:388
std::unique_ptr< FRDEventMsgView > event_
std::condition_variable startupCv_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:371
evf::EvFDaqDirector * daqDirector_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
evf::FastMonitoringService * fms_
bool emptyLumisectionMode() const
std::string getEoLSFilePathOnFU(const unsigned int ls) const
evf::EvFDaqDirector::FileStatus nextEvent()
std::string getEoRFilePathOnFU() const
void FedRawDataInputSource::createBoLSFile ( const uint32_t  lumiSection,
bool  checkIfExists 
)
private

Definition at line 311 of file FedRawDataInputSource.cc.

References daqDirector_, evf::EvFDaqDirector::getBoLSFilePathOnFU(), and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by maybeOpenNewLumiSection().

312 {
313  //used for backpressure mechanisms and monitoring
314  const std::string fuBoLS = daqDirector_->getBoLSFilePathOnFU(lumiSection);
315  struct stat buf;
316  if (checkIfExists==false || stat(fuBoLS.c_str(), &buf) != 0) {
317  int bol_fd = open(fuBoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
318  close(bol_fd);
319  }
320 }
std::string getBoLSFilePathOnFU(const unsigned int ls) const
evf::EvFDaqDirector * daqDirector_
void FedRawDataInputSource::deleteFile ( std::string const &  fileName)
private

Definition at line 643 of file FedRawDataInputSource.cc.

References cppFunctionSkipper::exception, fileListMode_, MillePedeFileConverter_cfg::fileName, LogDebug, fed_dqm_sourceclient-live_cfg::path, and MatrixUtil::remove().

Referenced by getNextEvent(), read(), and ~FedRawDataInputSource().

644 {
645  //no deletion in file list mode
646  if (fileListMode_) return;
647 
648  const boost::filesystem::path filePath(fileName);
649  LogDebug("FedRawDataInputSource") << "Deleting input file -:" << fileName;
650  try {
651  //sometimes this fails but file gets deleted
652  boost::filesystem::remove(filePath);
653  }
654  catch (const boost::filesystem::filesystem_error& ex)
655  {
656  edm::LogError("FedRawDataInputSource") << " - deleteFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what()
657  << ". Trying again.";
658  usleep(100000);
659  try {
660  boost::filesystem::remove(filePath);
661  }
662  catch (const boost::filesystem::filesystem_error&) {/*file gets deleted first time but exception is still thrown*/}
663  }
664  catch (std::exception& ex)
665  {
666  edm::LogError("FedRawDataInputSource") << " - deleteFile std::exception CAUGHT -: " << ex.what()
667  << ". Trying again.";
668  usleep(100000);
669  try {
670  boost::filesystem::remove(filePath);
671  } catch (std::exception&) {/*file gets deleted first time but exception is still thrown*/}
672  }
673 }
#define LogDebug(id)
bool FedRawDataInputSource::exceptionState ( )
inlineprivate

Definition at line 74 of file FedRawDataInputSource.h.

References setExceptionState_.

Referenced by InputFile::advance(), and getNextEvent().

void FedRawDataInputSource::fillDescriptions ( edm::ConfigurationDescriptions descriptions)
static

Definition at line 217 of file FedRawDataInputSource.cc.

References edm::ConfigurationDescriptions::add(), edm::ParameterSetDescription::addUntracked(), edm::ParameterSetDescription::setAllowAnything(), edm::ParameterSetDescription::setComment(), and edm::ParameterDescriptionNode::setComment().

218 {
220  desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk");
221  desc.addUntracked<unsigned int> ("eventChunkSize",32)->setComment("Input buffer (chunk) size");
222  desc.addUntracked<unsigned int> ("eventChunkBlock",32)->setComment("Block size used in a single file read call (must be smaller or equal to buffer size)");
223  desc.addUntracked<unsigned int> ("numBuffers",2)->setComment("Number of buffers used for reading input");
224  desc.addUntracked<unsigned int> ("maxBufferedFiles",2)->setComment("Maximum number of simultaneously buffered raw files");
225  desc.addUntracked<bool> ("verifyAdler32", true)->setComment("Verify event Adler32 checksum with FRDv3 or v4");
226  desc.addUntracked<bool> ("verifyChecksum", true)->setComment("Verify event CRC-32C checksum of FRDv5 or higher");
227  desc.addUntracked<bool> ("useL1EventID", false)->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
228  desc.addUntracked<bool> ("fileListMode", false)->setComment("Use fileNames parameter to directly specify raw files to open");
229  desc.addUntracked<std::vector<std::string>> ("fileNames", std::vector<std::string>())->setComment("file list used when fileListMode is enabled");
230  desc.setAllowAnything();
231  descriptions.add("source", desc);
232 }
void setComment(std::string const &value)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void setAllowAnything()
allow any parameter label/value pairs
void setComment(std::string const &value)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection ( FEDRawDataCollection rawData)
private

Definition at line 748 of file FedRawDataInputSource.cc.

References assert(), FEDRawData::data(), event(), event_, fedt_struct::eventsize, evf::evtn::evm_board_sense(), Exception, FED_EVSZ_EXTRACT, FED_SOID_EXTRACT, FEDRawDataCollection::FEDData(), l1tstage2_dqm_sourceclient-live_cfg::fedId, evf::evtn::get(), evf::evtn::getgpshigh(), evf::evtn::getgpslow(), evf::evtn::gtpe_board_sense(), evf::evtn::gtpe_get(), GTPEventID_, FEDNumbering::MAXFEDID, FEDNumbering::MINTCDSuTCAFEDID, FEDNumbering::MINTriggerEGTPFEDID, FEDNumbering::MINTriggerGTPFEDID, FEDRawData::resize(), fedh_struct::sourceid, and tcds_pointer_.

Referenced by read().

749 {
750  edm::TimeValue_t time;
751  timeval stv;
752  gettimeofday(&stv,0);
753  time = stv.tv_sec;
754  time = (time << 32) + stv.tv_usec;
755  edm::Timestamp tstamp(time);
756 
757  uint32_t eventSize = event_->eventSize();
758  char* event = (char*)event_->payload();
759  GTPEventID_=0;
760  tcds_pointer_ = 0;
761  while (eventSize > 0) {
762  assert(eventSize>=sizeof(fedt_t));
763  eventSize -= sizeof(fedt_t);
764  const fedt_t* fedTrailer = (fedt_t*) (event + eventSize);
765  const uint32_t fedSize = FED_EVSZ_EXTRACT(fedTrailer->eventsize) << 3; //trailer length counts in 8 bytes
766  assert(eventSize>=fedSize - sizeof(fedt_t));
767  eventSize -= (fedSize - sizeof(fedt_t));
768  const fedh_t* fedHeader = (fedh_t *) (event + eventSize);
769  const uint16_t fedId = FED_SOID_EXTRACT(fedHeader->sourceid);
770  if(fedId>FEDNumbering::MAXFEDID)
771  {
772  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
773  }
774  if (fedId == FEDNumbering::MINTCDSuTCAFEDID) {
775  tcds_pointer_ = (unsigned char *)(event + eventSize );
776  }
777  if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
778  if (evf::evtn::evm_board_sense((unsigned char*) fedHeader,fedSize))
779  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,true);
780  else
781  GTPEventID_ = evf::evtn::get((unsigned char*) fedHeader,false);
782  //evf::evtn::evm_board_setformat(fedSize);
783  const uint64_t gpsl = evf::evtn::getgpslow((unsigned char*) fedHeader);
784  const uint64_t gpsh = evf::evtn::getgpshigh((unsigned char*) fedHeader);
785  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t> ((gpsh << 32) + gpsl));
786  }
787  //take event ID from GTPE FED
788  if (fedId == FEDNumbering::MINTriggerEGTPFEDID && GTPEventID_==0) {
789  if (evf::evtn::gtpe_board_sense((unsigned char*)fedHeader)) {
790  GTPEventID_ = evf::evtn::gtpe_get((unsigned char*) fedHeader);
791  }
792  }
793  FEDRawData& fedData = rawData.FEDData(fedId);
794  fedData.resize(fedSize);
795  memcpy(fedData.data(), event + eventSize, fedSize);
796  }
797  assert(eventSize == 0);
798 
799  return tstamp;
800 }
unsigned int getgpshigh(const unsigned char *)
bool gtpe_board_sense(const unsigned char *p)
unsigned int get(const unsigned char *, bool)
assert(m_qm.get())
unsigned int sourceid
Definition: fed_header.h:32
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
void resize(size_t newsize)
Definition: FEDRawData.cc:32
struct fedt_struct fedt_t
#define FED_EVSZ_EXTRACT(a)
Definition: fed_trailer.h:36
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger but the state exists so we define the behavior If all triggers are the negative crieriion will lead to accepting the event(this again matches the behavior of"!*"before the partial wildcard feature was incorporated).The per-event"cost"of each negative criterion with multiple relevant triggers is about the same as!*was in the past
unsigned long long TimeValue_t
Definition: Timestamp.h:28
bool evm_board_sense(const unsigned char *p, size_t size)
unsigned long long uint64_t
Definition: Time.h:15
std::unique_ptr< FRDEventMsgView > event_
unsigned int eventsize
Definition: fed_trailer.h:33
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:28
unsigned int gtpe_get(const unsigned char *)
unsigned int getgpslow(const unsigned char *)
#define FED_SOID_EXTRACT(a)
Definition: fed_header.h:53
std::pair< bool, unsigned int > FedRawDataInputSource::getEventReport ( unsigned int  lumi,
bool  erase 
)

Definition at line 1404 of file FedRawDataInputSource.cc.

References CommonMethods::lock(), monlock_, runTheMatrix::ret, and sourceEventsReport_.

Referenced by evf::FastMonitoringService::preGlobalEndLumi().

1405 {
1406  std::lock_guard<std::mutex> lock(monlock_);
1407  auto itr = sourceEventsReport_.find(lumi);
1408  if (itr!=sourceEventsReport_.end()) {
1409  auto && ret = std::pair<bool,unsigned int>(true,itr->second);
1410  if (erase)
1411  sourceEventsReport_.erase(itr);
1412  return ret;
1413  }
1414  else
1415  return std::pair<bool,unsigned int>(false,0);
1416 }
tuple ret
prodAgent to be discontinued
tuple lumi
Definition: fjr2json.py:35
std::map< unsigned int, unsigned int > sourceEventsReport_
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getFile ( unsigned int &  ls,
std::string &  nextFile,
uint32_t &  fsize,
uint64_t &  lockWaitTime 
)
private

Definition at line 1447 of file FedRawDataInputSource.cc.

References fileListIndex_, fileListLoopMode_, MillePedeFileConverter_cfg::fileName, fileNames_, loopModeIterationInc_, evf::EvFDaqDirector::newFile, fed_dqm_sourceclient-live_cfg::path, evf::EvFDaqDirector::runEnded, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by readSupervisor().

1448 {
1449  if (fileListIndex_ < fileNames_.size()) {
1450  nextFile = fileNames_[fileListIndex_];
1451  if (nextFile.find("file://")==0) nextFile=nextFile.substr(7);
1452  else if (nextFile.find("file:")==0) nextFile=nextFile.substr(5);
1453  boost::filesystem::path fileName = nextFile;
1454  std::string fileStem = fileName.stem().string();
1455  if (fileStem.find("ls")) fileStem = fileStem.substr(fileStem.find("ls")+2);
1456  if (fileStem.find("_")) fileStem = fileStem.substr(0,fileStem.find("_"));
1457 
1458  if (!fileListLoopMode_)
1459  ls = boost::lexical_cast<unsigned int>(fileStem);
1460  else //always starting from LS 1 in loop mode
1461  ls = 1 + loopModeIterationInc_;
1462 
1463  //fsize = 0;
1464  //lockWaitTime = 0;
1465  fileListIndex_++;
1467  }
1468  else {
1469  if (!fileListLoopMode_)
1471  else {
1472  //loop through files until interrupted
1474  fileListIndex_=0;
1475  return getFile(ls,nextFile,fsize,lockWaitTime);
1476  }
1477  }
1478 }
std::vector< std::string > fileNames_
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
def ls
Definition: eostools.py:348
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent ( )
inlineprivate

Definition at line 373 of file FedRawDataInputSource.cc.

References InputFile::advance(), assert(), bufferInputRead_, InputFile::bufferPosition_, checkEvery_, chunkIsFree_, InputFile::chunkPosition_, InputFile::chunks_, crc32c(), InputFile::currentChunk_, currentFile_, currentFileIndex_, currentLumiSection_, cvWakeup_, daqDirector_, deleteFile(), detectedFRDversion_, event_, eventChunkSize_, eventsThisLumi_, Exception, exceptionState(), fileDeleteLock_, fileListMode_, InputFile::fileName_, fileQueue_, InputFile::fileSize_, filesToDelete_, fms_, FRDHeaderVersionSize, freeChunks_, getLSFromFilename_, evf::EvFDaqDirector::getStreamFileTracker(), evf::FastMonitoringThread::inCachedEvent, evf::FastMonitoringThread::inChecksumEvent, evf::FastMonitoringThread::inChunkReceived, evf::FastMonitoringThread::inNewLumi, evf::FastMonitoringThread::inProcessingFile, evf::FastMonitoringThread::inRunEnd, evf::FastMonitoringThread::inWaitChunk, evf::FastMonitoringThread::inWaitInput, evf::EvFDaqDirector::isSingleStreamThread(), dttmaxenums::L, InputFile::lumi_, maybeOpenNewLumiSection(), InputFile::moveToPreviousChunk(), mWakeup_, InputFile::nChunks_, InputFile::nEvents_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, InputFile::nProcessed_, nStreams_, InputFile::parent_, readingFilesCount_, readNextChunkIntoBuffer(), reportEventsThisLumiInSource(), InputFile::rewindChunk(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, evf::EvFDaqDirector::sameFile, evf::FastMonitoringService::setExceptionDetected(), setExceptionState_, evf::FastMonitoringService::setInState(), singleBufferMode_, mps_update::status, InputFile::status_, streamFileTrackerPtr_, threadError(), mps_check::timeout, evf::EvFDaqDirector::updateFileIndex(), verifyAdler32_, verifyChecksum_, and InputFile::waitForChunk().

Referenced by nextEvent().

374 {
375 
377  if (!currentFile_)
378  {
379  if (!streamFileTrackerPtr_) {
383  }
384 
387  if (!fileQueue_.try_pop(currentFile_))
388  {
389  //sleep until wakeup (only in single-buffer mode) or timeout
390  std::unique_lock<std::mutex> lkw(mWakeup_);
391  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_)
393  }
394  status = currentFile_->status_;
395  if ( status == evf::EvFDaqDirector::runEnded)
396  {
398  delete currentFile_;
399  currentFile_=nullptr;
400  return status;
401  }
402  else if ( status == evf::EvFDaqDirector::runAbort)
403  {
404  throw cms::Exception("FedRawDataInputSource::getNextEvent") << "Run has been aborted by the input source reader thread";
405  }
406  else if (status == evf::EvFDaqDirector::newLumi)
407  {
409  if (getLSFromFilename_) {
412  eventsThisLumi_=0;
414  }
415  }
416  else {//let this be picked up from next event
418  }
419 
420  delete currentFile_;
421  currentFile_=nullptr;
422  return status;
423  }
424  else if (status == evf::EvFDaqDirector::newFile) {
427  }
428  else
429  assert(0);
430  }
432 
433  //file is empty
434  if (!currentFile_->fileSize_) {
436  //try to open new lumi
438  if (getLSFromFilename_)
441  eventsThisLumi_=0;
443  }
444  //immediately delete empty file
446  delete currentFile_;
447  currentFile_=nullptr;
449  }
450 
451  //file is finished
454  //release last chunk (it is never released elsewhere)
457  {
458  throw cms::Exception("FedRawDataInputSource::getNextEvent")
459  << "Fully processed " << currentFile_->nProcessed_
460  << " from the file " << currentFile_->fileName_
461  << " but according to BU JSON there should be "
462  << currentFile_->nEvents_ << " events";
463  }
464  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
465  if (singleBufferMode_) {
466  std::unique_lock<std::mutex> lkw(mWakeup_);
467  cvWakeup_.notify_one();
468  }
471  //put the file in pending delete list;
472  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
473  filesToDelete_.push_back(std::pair<int,InputFile*>(currentFileIndex_,currentFile_));
474  }
475  else {
476  //in single-thread and stream jobs, events are already processed
478  delete currentFile_;
479  }
480  currentFile_=nullptr;
482  }
483 
484 
485  //file is too short
487  {
488  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
489  "Premature end of input file while reading event header";
490  }
491  if (singleBufferMode_) {
492 
493  //should already be there
496  usleep(10000);
498  }
500 
501  unsigned char *dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
502 
503  //conditions when read amount is not sufficient for the header to fit
504  if (!bufferInputRead_ || bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_]
506  {
508 
509  if (detectedFRDversion_==0) {
510  detectedFRDversion_=*((uint32*)dataPosition);
511  if (detectedFRDversion_>5)
512  throw cms::Exception("FedRawDataInputSource::getNextEvent")
513  << "Unknown FRD version -: " << detectedFRDversion_;
514  assert(detectedFRDversion_>=1);
515  }
516 
517  //recalculate chunk position
518  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
519  if ( bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_])
520  {
521  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
522  "Premature end of input file while reading event header";
523  }
524  }
525 
526  event_.reset( new FRDEventMsgView(dataPosition) );
527  if (event_->size()>eventChunkSize_) {
528  throw cms::Exception("FedRawDataInputSource::getNextEvent")
529  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
530  << " run:" << event_->run() << " of size:" << event_->size()
531  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
532  }
533 
534  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
535 
537  {
538  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
539  "Premature end of input file while reading event data";
540  }
541  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
543  //recalculate chunk position
544  dataPosition = currentFile_->chunks_[0]->buf_+ currentFile_->chunkPosition_;
545  event_.reset( new FRDEventMsgView(dataPosition) );
546  }
547  currentFile_->bufferPosition_ += event_->size();
548  currentFile_->chunkPosition_ += event_->size();
549  //last chunk is released when this function is invoked next time
550 
551  }
552  //multibuffer mode:
553  else
554  {
555  //wait for the current chunk to become added to the vector
558  usleep(10000);
560  }
562 
563  //check if header is at the boundary of two chunks
564  chunkIsFree_ = false;
565  unsigned char *dataPosition;
566 
567  //read header, copy it to a single chunk if necessary
568  bool chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]);
569 
570  event_.reset( new FRDEventMsgView(dataPosition) );
571  if (event_->size()>eventChunkSize_) {
572  throw cms::Exception("FedRawDataInputSource::getNextEvent")
573  << " event id:"<< event_->event()<< " lumi:" << event_->lumi()
574  << " run:" << event_->run() << " of size:" << event_->size()
575  << " bytes does not fit into a chunk of size:" << eventChunkSize_ << " bytes";
576  }
577 
578  const uint32_t msgSize = event_->size()-FRDHeaderVersionSize[detectedFRDversion_];
579 
581  {
582  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
583  "Premature end of input file while reading event data";
584  }
585 
586  if (chunkEnd) {
587  //header was at the chunk boundary, we will have to move payload as well
588  currentFile_->moveToPreviousChunk(msgSize,FRDHeaderVersionSize[detectedFRDversion_]);
589  chunkIsFree_ = true;
590  }
591  else {
592  //header was contiguous, but check if payload fits the chunk
593  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
594  //rewind to header start position
595  currentFile_->rewindChunk(FRDHeaderVersionSize[detectedFRDversion_]);
596  //copy event to a chunk start and move pointers
597  chunkEnd = currentFile_->advance(dataPosition,FRDHeaderVersionSize[detectedFRDversion_]+msgSize);
598  assert(chunkEnd);
599  chunkIsFree_=true;
600  //header is moved
601  event_.reset( new FRDEventMsgView(dataPosition) );
602  }
603  else {
604  //everything is in a single chunk, only move pointers forward
605  chunkEnd = currentFile_->advance(dataPosition,msgSize);
606  assert(!chunkEnd);
607  chunkIsFree_=false;
608  }
609  }
610  }//end multibuffer mode
612 
613  if (verifyChecksum_ && event_->version() >= 5)
614  {
615  uint32_t crc=0;
616  crc = crc32c(crc,(const unsigned char*)event_->payload(),event_->eventSize());
617  if ( crc != event_->crc32c() ) {
619  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
620  "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() <<
621  " but calculated 0x" << crc;
622  }
623  }
624  else if ( verifyAdler32_ && event_->version() >= 3)
625  {
626  uint32_t adler = adler32(0L,Z_NULL,0);
627  adler = adler32(adler,(Bytef*)event_->payload(),event_->eventSize());
628 
629  if ( adler != event_->adler32() ) {
631  throw cms::Exception("FedRawDataInputSource::getNextEvent") <<
632  "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() <<
633  " but calculated 0x" << adler;
634  }
635  }
637 
639 
641 }
unsigned int lumi_
uint32_t chunkPosition_
std::condition_variable cvWakeup_
void setExceptionDetected(unsigned int ls)
std::vector< int > * getStreamFileTracker()
void maybeOpenNewLumiSection(const uint32_t lumiSection)
void setInState(FastMonitoringThread::InputState inputState)
void rewindChunk(const size_t size)
const uint32 FRDHeaderVersionSize[6]
assert(m_qm.get())
std::vector< int > * streamFileTrackerPtr_
evf::EvFDaqDirector::FileStatus status_
int timeout
Definition: mps_check.py:51
FedRawDataInputSource * parent_
uint32_t bufferPosition_
void updateFileIndex(int const &fileIndex)
bool advance(unsigned char *&dataPosition, const size_t size)
void moveToPreviousChunk(const size_t size, const size_t offset)
bool waitForChunk(unsigned int chunkid)
unsigned int uint32
Definition: MsgTools.h:13
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
Definition: crc32c.cc:340
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
unsigned int currentChunk_
tbb::concurrent_queue< InputFile * > fileQueue_
std::unique_ptr< FRDEventMsgView > event_
std::string fileName_
tbb::concurrent_vector< InputChunk * > chunks_
evf::EvFDaqDirector * daqDirector_
std::atomic< unsigned int > readingFilesCount_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
void deleteFile(std::string const &)
evf::FastMonitoringService * fms_
tbb::concurrent_queue< InputChunk * > freeChunks_
void readNextChunkIntoBuffer(InputFile *file)
tuple status
Definition: mps_update.py:57
int FedRawDataInputSource::grabNextJsonFile ( boost::filesystem::path const &  jsonSourcePath)
private

Definition at line 802 of file FedRawDataInputSource.cc.

References filterCSVwithJSON::copy, daqDirector_, data, jsoncollector::DataPoint::deserialize(), reco::dp, dpd_, alignCSCRings::e, cppFunctionSkipper::exception, Exception, fuOutputDir_, jsoncollector::DataPoint::getData(), Json::Reader::getFormatedErrorMessages(), jsoncollector::DataPointDefinition::getNames(), i, LogDebug, Json::Reader::parse(), fed_dqm_sourceclient-live_cfg::path, matplotRender::reader, MatrixUtil::remove(), contentValuesCheck::ss, AlCaHLTBitMon_QueryRunRegistry::string, summarizeEdmComparisonLogfiles::success, and evf::EvFDaqDirector::unlockFULocal().

Referenced by readSupervisor().

803 {
805  try {
806  // assemble json destination path
808 
809  //TODO:should be ported to use fffnaming
810  std::ostringstream fileNameWithPID;
811  fileNameWithPID << jsonSourcePath.stem().string() << "_pid"
812  << std::setfill('0') << std::setw(5) << getpid() << ".jsn";
813  jsonDestPath /= fileNameWithPID.str();
814 
815  LogDebug("FedRawDataInputSource") << "JSON rename -: " << jsonSourcePath << " to "
816  << jsonDestPath;
817  try {
818  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
819  }
820  catch (const boost::filesystem::filesystem_error& ex)
821  {
822  // Input dir gone?
823  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
824  // << " Maybe the file is not yet visible by FU. Trying again in one second";
825  sleep(1);
826  boost::filesystem::copy(jsonSourcePath,jsonDestPath);
827  }
829 
830  try {
831  //sometimes this fails but file gets deleted
832  boost::filesystem::remove(jsonSourcePath);
833  }
834  catch (const boost::filesystem::filesystem_error& ex)
835  {
836  // Input dir gone?
837  edm::LogError("FedRawDataInputSource") << "grabNextFile BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
838  }
839  catch (std::exception& ex)
840  {
841  // Input dir gone?
842  edm::LogError("FedRawDataInputSource") << "grabNextFile std::exception CAUGHT -: " << ex.what();
843  }
844 
845  boost::filesystem::ifstream ij(jsonDestPath);
846  Json::Value deserializeRoot;
848 
849  std::stringstream ss;
850  ss << ij.rdbuf();
851  if (!reader.parse(ss.str(), deserializeRoot)) {
852  edm::LogError("FedRawDataInputSource") << "Failed to deserialize JSON file -: " << jsonDestPath
853  << "\nERROR:\n" << reader.getFormatedErrorMessages()
854  << "CONTENT:\n" << ss.str()<<".";
855  throw std::runtime_error("Cannot deserialize input JSON file");
856  }
857 
858  //read BU JSON
860  DataPoint dp;
861  dp.deserialize(deserializeRoot);
862  bool success = false;
863  for (unsigned int i=0;i<dpd_->getNames().size();i++) {
864  if (dpd_->getNames().at(i)=="NEvents")
865  if (i<dp.getData().size()) {
866  data = dp.getData()[i];
867  success=true;
868  }
869  }
870  if (!success) {
871  if (dp.getData().size())
872  data = dp.getData()[0];
873  else
874  throw cms::Exception("FedRawDataInputSource::grabNextJsonFile") <<
875  " error reading number of events from BU JSON -: No input value " << data;
876  }
877  return boost::lexical_cast<int>(data);
878 
879  }
880  catch (const boost::filesystem::filesystem_error& ex)
881  {
882  // Input dir gone?
884  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - BOOST FILESYSTEM ERROR CAUGHT -: " << ex.what();
885  }
886  catch (std::runtime_error e)
887  {
888  // Another process grabbed the file and NFS did not register this
890  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - runtime Exception -: " << e.what();
891  }
892 
893  catch( boost::bad_lexical_cast const& ) {
894  edm::LogError("FedRawDataInputSource") << "grabNextJsonFile - error parsing number of events from BU JSON. "
895  << "Input value is -: " << data;
896  }
897 
898  catch (std::exception e)
899  {
900  // BU run directory disappeared?
902  edm::LogError("FedRawDataInputSource") << "grabNextFile - SOME OTHER EXCEPTION OCCURED!!!! -: " << e.what();
903  }
904 
905  return -1;
906 }
#define LogDebug(id)
std::vector< std::string > & getData()
Definition: DataPoint.h:58
int i
Definition: DBlmapReader.cc:9
jsoncollector::DataPointDefinition * dpd_
bool parse(const std::string &document, Value &root, bool collectComments=true)
Read a Value from a JSON document.
Represents a JSON value.
Definition: value.h:111
virtual void deserialize(Json::Value &root)
Definition: DataPoint.cc:56
auto dp
Definition: deltaR.h:22
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
evf::EvFDaqDirector * daqDirector_
Unserialize a JSON document into a Value.
Definition: reader.h:16
std::vector< std::string > const & getNames()
std::string getFormatedErrorMessages() const
Returns a user friendly string that list errors in the parsed document.
long FedRawDataInputSource::initFileList ( )
private

Definition at line 1418 of file FedRawDataInputSource.cc.

References a, b, end, MillePedeFileConverter_cfg::fileName, fileNames_, fed_dqm_sourceclient-live_cfg::path, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FedRawDataInputSource().

1419 {
1420  std::sort(fileNames_.begin(),fileNames_.end(),
1421  [](std::string a, std::string b) {
1422  if (a.rfind("/")!=std::string::npos) a=a.substr(a.rfind("/"));
1423  if (b.rfind("/")!=std::string::npos) b=b.substr(b.rfind("/"));
1424  return b > a;});
1425 
1426  if (fileNames_.size()) {
1427  //get run number from first file in the vector
1429  std::string fileStem = fileName.stem().string();
1430  auto end = fileStem.find("_");
1431  if (fileStem.find("run")==0) {
1432  std::string runStr = fileStem.substr(3,end-3);
1433  try {
1434  //get long to support test run numbers < 2^32
1435  long rval = boost::lexical_cast<long>(runStr);
1436  edm::LogInfo("FedRawDataInputSource") << "Autodetected run number in fileListMode -: "<< rval;
1437  return rval;
1438  }
1439  catch( boost::bad_lexical_cast const& ) {
1440  edm::LogWarning("FedRawDataInputSource") << "Unable to autodetect run number in fileListMode from file -: "<< fileName;
1441  }
1442  }
1443  }
1444  return -1;
1445 }
std::vector< std::string > fileNames_
#define end
Definition: vmac.h:37
unsigned long long int rval
Definition: vlib.h:22
double b
Definition: hdecay.h:120
double a
Definition: hdecay.h:121
void FedRawDataInputSource::maybeOpenNewLumiSection ( const uint32_t  lumiSection)
private

Definition at line 322 of file FedRawDataInputSource.cc.

References createBoLSFile(), currentLumiSection_, daqDirector_, newFWLiteAna::found, evf::EvFDaqDirector::getEoLSFilePathOnFU(), edm::Timestamp::invalidTimestamp(), evf::EvFDaqDirector::lockFULocal2(), edm::InputSource::luminosityBlock(), edm::InputSource::luminosityBlockAuxiliary(), processHistoryID_, edm::InputSource::resetLuminosityBlockAuxiliary(), edm::InputSource::run(), edm::InputSource::runAuxiliary(), edm::InputSource::setLuminosityBlockAuxiliary(), AlCaHLTBitMon_QueryRunRegistry::string, and evf::EvFDaqDirector::unlockFULocal2().

Referenced by checkNextEvent(), and getNextEvent().

323 {
325  || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
326 
327  if ( currentLumiSection_ > 0) {
328  const std::string fuEoLS =
330  struct stat buf;
331  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
332  if ( !found ) {
334  int eol_fd = open(fuEoLS.c_str(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
335  close(eol_fd);
336  createBoLSFile(lumiSection,false);
338  }
339  }
340  else createBoLSFile(lumiSection,true);//needed for initial lumisection
341 
342  currentLumiSection_ = lumiSection;
343 
345 
346  timeval tv;
347  gettimeofday(&tv, 0);
348  const edm::Timestamp lsopentime( (unsigned long long) tv.tv_sec * 1000000 + (unsigned long long) tv.tv_usec );
349 
350  edm::LuminosityBlockAuxiliary* lumiBlockAuxiliary =
352  runAuxiliary()->run(),
353  lumiSection, lsopentime,
355 
356  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
357  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
358 
359  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: "<< lumiSection;
360  }
361 }
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists)
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:586
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:363
edm::ProcessHistoryID processHistoryID_
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:592
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:264
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:371
evf::EvFDaqDirector * daqDirector_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:267
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::nextEvent ( )
inlineprivate

Definition at line 363 of file FedRawDataInputSource.cc.

References getNextEvent(), svgfig::load(), evf::EvFDaqDirector::noFile, edm::shutdown_flag, and mps_update::status.

Referenced by checkNextEvent().

364 {
366  while ((status = getNextEvent())==evf::EvFDaqDirector::noFile)
367  {
368  if (edm::shutdown_flag.load(std::memory_order_relaxed)) break;
369  }
370  return status;
371 }
volatile std::atomic< bool > shutdown_flag
def load
Definition: svgfig.py:546
evf::EvFDaqDirector::FileStatus getNextEvent()
tuple status
Definition: mps_update.py:57
void FedRawDataInputSource::postForkReacquireResources ( std::shared_ptr< edm::multicore::MessageReceiverForSource )
overrideprivatevirtual

Definition at line 911 of file FedRawDataInputSource.cc.

References edm::Timestamp::beginOfTime(), edm::Timestamp::invalidTimestamp(), runNumber_, and edm::InputSource::setRunAuxiliary().

912 {
913  InputSource::rewind();
917 }
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
static Timestamp beginOfTime()
Definition: Timestamp.h:103
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:359
void FedRawDataInputSource::preForkReleaseResources ( )
overrideprivatevirtual

Reimplemented from edm::RawInputSource.

Definition at line 908 of file FedRawDataInputSource.cc.

909 {}
void FedRawDataInputSource::read ( edm::EventPrincipal eventPrincipal)
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 676 of file FedRawDataInputSource.cc.

References assert(), printConversionInfo::aux, edm::DaqProvenanceHelper::branchDescription(), checkEvery_, chunkIsFree_, InputFile::chunks_, InputFile::currentChunk_, currentFile_, currentLumiSection_, daqProvenanceHelper_, deleteFile(), edm::DaqProvenanceHelper::dummyProvenance(), eventID_, eventRunNumber_, eventsThisLumi_, fileDeleteLock_, fileListLoopMode_, filesToDelete_, fillFEDRawDataCollection(), fms_, freeChunks_, GTPEventID_, i, evf::FastMonitoringThread::inNoRequest, evf::FastMonitoringThread::inReadCleanup, evf::FastMonitoringThread::inReadEvent, L1EventID_, edm::RawInputSource::makeEvent(), evf::evtn::makeEventAuxiliary(), eostools::move(), InputFile::nProcessed_, nStreams_, edm::EventAuxiliary::PhysicsTrigger, edm::InputSource::processGUID(), processHistoryID_, edm::EventPrincipal::put(), record, evf::FastMonitoringService::setInState(), edm::EventAuxiliary::setProcessHistoryID(), streamFileTrackerPtr_, tcds_pointer_, and useL1EventID_.

Referenced by edmIntegrityCheck.PublishToFileSystem::get(), Vispa.Plugins.EdmBrowser.EdmDataAccessor.EdmDataAccessor::goto(), readNextChunkIntoBuffer(), readWorker(), and Vispa.Plugins.EdmBrowser.EdmDataAccessor.EdmDataAccessor::setFilterBranches().

677 {
679  std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
680  edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData);
681 
682  if (useL1EventID_){
684  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
686  aux.setProcessHistoryID(processHistoryID_);
687  makeEvent(eventPrincipal, aux);
688  }
689  else if(tcds_pointer_==0){
692  edm::EventAuxiliary aux(eventID_, processGUID(), tstamp, true,
694  aux.setProcessHistoryID(processHistoryID_);
695  makeEvent(eventPrincipal, aux);
696  }
697  else{
698  evf::evtn::TCDSRecord record((unsigned char *)(tcds_pointer_));
703  makeEvent(eventPrincipal, aux);
704  }
705 
706 
707 
708  std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
709 
710  //FWCore/Sources DaqProvenanceHelper before 7_1_0_pre3
711  //eventPrincipal.put(daqProvenanceHelper_.constBranchDescription_, edp,
712  // daqProvenanceHelper_.dummyProvenance_);
713 
714  eventPrincipal.put(daqProvenanceHelper_.branchDescription(), std::move(edp),
716 
717  eventsThisLumi_++;
719 
720  //this old file check runs no more often than every 10 events
721  if (!((currentFile_->nProcessed_-1)%(checkEvery_))) {
722  //delete files that are not in processing
723  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
724  auto it = filesToDelete_.begin();
725  while (it!=filesToDelete_.end()) {
726  bool fileIsBeingProcessed = false;
727  for (unsigned int i=0;i<nStreams_;i++) {
728  if (it->first == streamFileTrackerPtr_->at(i)) {
729  fileIsBeingProcessed = true;
730  break;
731  }
732  }
733  if (!fileIsBeingProcessed) {
734  deleteFile(it->second->fileName_);
735  delete it->second;
736  it = filesToDelete_.erase(it);
737  }
738  else it++;
739  }
740 
741  }
743  chunkIsFree_=false;
745  return;
746 }
int i
Definition: DBlmapReader.cc:9
JetCorrectorParameters::Record record
Definition: classes.h:7
void setInState(FastMonitoringThread::InputState inputState)
assert(m_qm.get())
std::vector< int > * streamFileTrackerPtr_
ProductProvenance const & dummyProvenance() const
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:219
edm::EventAuxiliary makeEventAuxiliary(TCDSRecord *record, unsigned int runNumber, unsigned int lumiSection, std::string const &processGUID, bool verifyLumiSection)
def move
Definition: eostools.py:510
BranchDescription const & branchDescription() const
edm::ProcessHistoryID processHistoryID_
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance) const
const edm::DaqProvenanceHelper daqProvenanceHelper_
std::list< std::pair< int, InputFile * > > filesToDelete_
unsigned int nProcessed_
unsigned int currentChunk_
void setProcessHistoryID(ProcessHistoryID const &phid)
tbb::concurrent_vector< InputChunk * > chunks_
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
void deleteFile(std::string const &)
evf::FastMonitoringService * fms_
tbb::concurrent_queue< InputChunk * > freeChunks_
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection &)
void FedRawDataInputSource::readNextChunkIntoBuffer ( InputFile file)
private

Definition at line 1339 of file FedRawDataInputSource.cc.

References bufferInputRead_, InputFile::chunkPosition_, InputFile::chunks_, eventChunkBlock_, eventChunkSize_, Exception, fileDescriptor_, InputFile::fileName_, InputFile::fileSize_, i, plotBeamSpotDB::last, LogDebug, read(), and readBlocks_.

Referenced by getNextEvent().

1340 {
1341 
1342  if (fileDescriptor_<0) {
1343  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1344  bufferInputRead_ = 0;
1345  //off_t pos = lseek(fileDescriptor,0,SEEK_SET);
1346  if (fileDescriptor_>=0)
1347  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1348  else
1349  {
1350  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer") << "failed to open file " << std::endl
1351  << file->fileName_ << " fd:" << fileDescriptor_;
1352  }
1353  }
1354 
1355  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1356  uint32_t existingSize = 0;
1357  for (unsigned int i=0;i<readBlocks_;i++)
1358  {
1359  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1360  bufferInputRead_+=last;
1361  existingSize+=last;
1362  }
1363  }
1364  else {
1365  const uint32_t chunksize = file->chunkPosition_;
1366  const uint32_t blockcount=chunksize/eventChunkBlock_;
1367  const uint32_t leftsize = chunksize%eventChunkBlock_;
1368  uint32_t existingSize = eventChunkSize_ - file->chunkPosition_;
1369  memmove((void*) file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSize);
1370 
1371  for (uint32_t i=0;i<blockcount;i++) {
1372  const ssize_t last = ::read(fileDescriptor_,( void*) (file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1373  bufferInputRead_+=last;
1374  existingSize+=last;
1375  }
1376  if (leftsize) {
1377  const ssize_t last = ::read(fileDescriptor_,( void*)( file->chunks_[0]->buf_ + existingSize ), leftsize);
1378  bufferInputRead_+=last;
1379  }
1380  file->chunkPosition_=0;//data was moved to beginning of the chunk
1381  }
1382  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1383  if (fileDescriptor_!=-1)
1384  {
1385  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1386  close(fileDescriptor_);
1387  fileDescriptor_=-1;
1388  }
1389  }
1390 }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
uint32_t chunkPosition_
virtual void read(edm::EventPrincipal &eventPrincipal) override
std::string fileName_
tbb::concurrent_vector< InputChunk * > chunks_
void FedRawDataInputSource::readSupervisor ( )
private

Definition at line 923 of file FedRawDataInputSource.cc.

References assert(), InputFile::chunks_, counter, cvReader_, cvWakeup_, daqDirector_, eventChunkSize_, fileListMode_, fileQueue_, fms_, freeChunks_, getFile(), getLSFromFilename_, grabNextJsonFile(), i, InputFile, evf::FastMonitoringThread::inRunEnd, evf::FastMonitoringThread::inSupBusy, evf::FastMonitoringThread::inSupFileLimit, evf::FastMonitoringThread::inSupLockPolling, evf::FastMonitoringThread::inSupNewFile, evf::FastMonitoringThread::inSupNewFileWaitChunk, evf::FastMonitoringThread::inSupNewFileWaitChunkCopying, evf::FastMonitoringThread::inSupNewFileWaitThread, evf::FastMonitoringThread::inSupNewFileWaitThreadCopying, evf::FastMonitoringThread::inSupNoFile, evf::FastMonitoringThread::inSupWaitFreeChunk, evf::FastMonitoringThread::inSupWaitFreeChunkCopying, evf::FastMonitoringThread::inSupWaitFreeThread, evf::FastMonitoringThread::inSupWaitFreeThreadCopying, j, LogDebug, eostools::ls(), maxBufferedFiles_, mReader_, mWakeup_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, fed_dqm_sourceclient-live_cfg::path, quit_threads_, InputChunk::readComplete_, readingFilesCount_, evf::FastMonitoringService::reportLockWait(), InputChunk::reset(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, setExceptionState_, evf::FastMonitoringService::setInStateSup(), edm::shutdown_flag, singleBufferMode_, evf::FastMonitoringService::startedLookingForFile(), startupCv_, startupLock_, mps_update::status, evf::FastMonitoringService::stoppedLookingForFile(), AlCaHLTBitMon_QueryRunRegistry::string, thread_quit_signal, tid_active_, mps_check::timeout, evf::EvFDaqDirector::updateFuLock(), workerJob_, workerPool_, and workerThreads_.

Referenced by checkNextEvent().

924 {
925  bool stop=false;
926  unsigned int currentLumiSection = 0;
927  //threadInit_.exchange(true,std::memory_order_acquire);
928 
929  {
930  std::unique_lock<std::mutex> lk(startupLock_);
931  startupCv_.notify_one();
932  }
933 
934  uint32_t ls=0;
935  uint32_t monLS=1;
936  uint32_t lockCount=0;
937  uint64_t sumLockWaitTimeUs=0.;
938 
939  while (!stop) {
940 
941  //wait for at least one free thread and chunk
942  int counter=0;
944  {
945  //report state to monitoring
946  if (fms_) {
947  bool copy_active=false;
948  for (auto j : tid_active_) if (j) copy_active=true;
950  else if (freeChunks_.empty()) {
951  if (copy_active)
953  else
955  }
956  else {
957  if (copy_active)
959  else
961  }
962  }
963  std::unique_lock<std::mutex> lkw(mWakeup_);
964  //sleep until woken up by condition or a timeout
965  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
966  counter++;
967  //if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads...";
968  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
969  }
970  else {
971  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
972  }
973  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {stop=true;break;}
974  }
975 
976  if (stop) break;
977 
978  //look for a new file
979  std::string nextFile;
980  uint32_t fileSize;
981 
982  if (fms_) {
986  }
987 
989 
990  while (status == evf::EvFDaqDirector::noFile) {
991  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
992  stop=true;
993  break;
994  }
995 
996  uint64_t thisLockWaitTimeUs=0.;
997  if (fileListMode_) {
998  //return LS if LS not set, otherwise return file
999  status = getFile(ls,nextFile,fileSize,thisLockWaitTimeUs);
1000  }
1001  else
1002  status = daqDirector_->updateFuLock(ls,nextFile,fileSize,thisLockWaitTimeUs);
1003 
1005 
1006  if (currentLumiSection!=ls && status==evf::EvFDaqDirector::runEnded) status=evf::EvFDaqDirector::noFile;
1007 
1008  //monitoring of lock wait time
1009  if (thisLockWaitTimeUs>0.)
1010  sumLockWaitTimeUs+=thisLockWaitTimeUs;
1011  lockCount++;
1012  if (ls>monLS) {
1013  monLS=ls;
1014  if (lockCount)
1015  if (fms_) fms_->reportLockWait(monLS,sumLockWaitTimeUs,lockCount);
1016  lockCount=0;
1017  sumLockWaitTimeUs=0;
1018  }
1019 
1020  //check again for any remaining index/EoLS files after EoR file is seen
1021  if ( status == evf::EvFDaqDirector::runEnded && !fileListMode_) {
1023  usleep(100000);
1024  //now all files should have appeared in ramdisk, check again if any raw files were left behind
1025  status = daqDirector_->updateFuLock(ls,nextFile,fileSize,thisLockWaitTimeUs);
1026  if (currentLumiSection!=ls && status==evf::EvFDaqDirector::runEnded) status=evf::EvFDaqDirector::noFile;
1027  }
1028 
1029  if ( status == evf::EvFDaqDirector::runEnded) {
1031  stop=true;
1032  break;
1033  }
1034 
1035  //queue new lumisection
1036  if( getLSFromFilename_ && ls > currentLumiSection) {
1037  //fms_->setInStateSup(evf::FastMonitoringThread::inSupNewLumi);
1038  currentLumiSection = ls;
1039  fileQueue_.push(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
1040  }
1041 
1042  if( getLSFromFilename_ && currentLumiSection>0 && ls < currentLumiSection) {
1043  edm::LogError("FedRawDataInputSource") << "Got old LS ("<<ls<<") file from EvFDAQDirector! Expected LS:" << currentLumiSection<<". Aborting execution."<<std::endl;
1045  stop=true;
1046  break;
1047  }
1048 
1049  int dbgcount=0;
1050  if (status == evf::EvFDaqDirector::noFile) {
1052  dbgcount++;
1053  if (!(dbgcount%20)) LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
1054  usleep(100000);
1055  }
1056  }
1057  if ( status == evf::EvFDaqDirector::newFile ) {
1059  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
1060 
1061 
1062  boost::filesystem::path rawFilePath(nextFile);
1063  std::string rawFile = rawFilePath.replace_extension(".raw").string();
1064 
1065  struct stat st;
1066  int stat_res = stat(rawFile.c_str(),&st);
1067  if (stat_res==-1) {
1068  edm::LogError("FedRawDataInputSource") << "Can not stat file (" << errno << "):-"<< rawFile << std::endl;
1069  setExceptionState_=true;
1070  break;
1071  }
1072  fileSize=st.st_size;
1073 
1074  if (fms_) {
1078  }
1079  int eventsInNewFile;
1080  if (fileListMode_) {
1081  if (fileSize==0) eventsInNewFile=0;
1082  else eventsInNewFile=-1;
1083  }
1084  else {
1085  eventsInNewFile = grabNextJsonFile(nextFile);
1086  assert( eventsInNewFile>=0 );
1087  assert((eventsInNewFile>0) == (fileSize>0));//file without events must be empty
1088  }
1089  //int eventsInNewFile = fileListMode_ ? -1 : grabNextJsonFile(nextFile);
1090  //if (fileListMode_ && fileSize==0) {eventsInNewFile=0;}
1091  //if (!fileListMode_) {
1092  // assert( eventsInNewFile>=0 );
1093  // assert((eventsInNewFile>0) == (fileSize>0));//file without events must be empty
1094  //}
1095 
1096  if (!singleBufferMode_) {
1097  //calculate number of needed chunks
1098  unsigned int neededChunks = fileSize/eventChunkSize_;
1099  if (fileSize%eventChunkSize_) neededChunks++;
1100 
1101  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,neededChunks,eventsInNewFile,this);
1103  fileQueue_.push(newInputFile);
1104 
1105  for (unsigned int i=0;i<neededChunks;i++) {
1106 
1107  if (fms_) {
1108  bool copy_active=false;
1109  for (auto j : tid_active_) if (j) copy_active=true;
1110  if (copy_active)
1112  else
1114  }
1115  //get thread
1116  unsigned int newTid = 0xffffffff;
1117  while (!workerPool_.try_pop(newTid)) {
1118  usleep(100000);
1119  }
1120 
1121  if (fms_) {
1122  bool copy_active=false;
1123  for (auto j : tid_active_) if (j) copy_active=true;
1124  if (copy_active)
1126  else
1128  }
1129  InputChunk * newChunk = nullptr;
1130  while (!freeChunks_.try_pop(newChunk)) {
1131  usleep(100000);
1132  if (quit_threads_.load(std::memory_order_relaxed)) break;
1133  }
1134 
1135  if (newChunk == nullptr) {
1136  //return unused tid if we received shutdown (nullptr chunk)
1137  if (newTid!=0xffffffff) workerPool_.push(newTid);
1138  stop = true;
1139  break;
1140  }
1142 
1143  std::unique_lock<std::mutex> lk(mReader_);
1144 
1145  unsigned int toRead = eventChunkSize_;
1146  if (i==neededChunks-1 && fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
1147  newChunk->reset(i*eventChunkSize_,toRead,i);
1148 
1149  workerJob_[newTid].first=newInputFile;
1150  workerJob_[newTid].second=newChunk;
1151 
1152  //wake up the worker thread
1153  cvReader_[newTid]->notify_one();
1154  }
1155  }
1156  else {
1157  if (!eventsInNewFile) {
1158  //still queue file for lumi update
1159  std::unique_lock<std::mutex> lkw(mWakeup_);
1160  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,0,0,0,this);
1162  fileQueue_.push(newInputFile);
1163  cvWakeup_.notify_one();
1164  return;
1165  }
1166  //in single-buffer mode put single chunk in the file and let the main thread read the file
1167  InputChunk * newChunk;
1168  //should be available immediately
1169  while(!freeChunks_.try_pop(newChunk)) usleep(100000);
1170 
1171  std::unique_lock<std::mutex> lkw(mWakeup_);
1172 
1173  unsigned int toRead = eventChunkSize_;
1174  if (fileSize%eventChunkSize_) toRead = fileSize%eventChunkSize_;
1175  newChunk->reset(0,toRead,0);
1176  newChunk->readComplete_=true;
1177 
1178  //push file and wakeup main thread
1179  InputFile * newInputFile = new InputFile(evf::EvFDaqDirector::FileStatus::newFile,ls,rawFile,fileSize,1,eventsInNewFile,this);
1180  newInputFile->chunks_[0]=newChunk;
1182  fileQueue_.push(newInputFile);
1183  cvWakeup_.notify_one();
1184  }
1185  }
1186  }
1188  //make sure threads finish reading
1189  unsigned numFinishedThreads = 0;
1190  while (numFinishedThreads < workerThreads_.size()) {
1191  unsigned int tid;
1192  while (!workerPool_.try_pop(tid)) {usleep(10000);}
1193  std::unique_lock<std::mutex> lk(mReader_);
1194  thread_quit_signal[tid]=true;
1195  cvReader_[tid]->notify_one();
1196  numFinishedThreads++;
1197  }
1198  for (unsigned int i=0;i<workerThreads_.size();i++) {
1199  workerThreads_[i]->join();
1200  delete workerThreads_[i];
1201  }
1202 }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
std::condition_variable cvWakeup_
tbb::concurrent_queue< unsigned int > workerPool_
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
assert(m_qm.get())
def ls
Definition: eostools.py:348
void setInStateSup(FastMonitoringThread::InputState inputState)
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
volatile std::atomic< bool > shutdown_flag
int timeout
Definition: mps_check.py:51
std::vector< std::condition_variable * > cvReader_
void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
int grabNextJsonFile(boost::filesystem::path const &)
int j
Definition: DBlmapReader.cc:9
std::vector< std::thread * > workerThreads_
std::atomic< bool > readComplete_
unsigned long long uint64_t
Definition: Time.h:15
tbb::concurrent_queue< InputFile * > fileQueue_
std::condition_variable startupCv_
void stoppedLookingForFile(unsigned int lumi)
tbb::concurrent_vector< InputChunk * > chunks_
evf::EvFDaqDirector * daqDirector_
static std::atomic< unsigned int > counter
std::vector< unsigned int > thread_quit_signal
std::atomic< unsigned int > readingFilesCount_
evf::FastMonitoringService * fms_
friend class InputFile
Open Root file and provide MEs ############.
std::vector< unsigned int > tid_active_
tbb::concurrent_queue< InputChunk * > freeChunks_
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
tuple status
Definition: mps_update.py:57
void FedRawDataInputSource::readWorker ( unsigned int  tid)
private

Definition at line 1204 of file FedRawDataInputSource.cc.

References assert(), InputChunk::buf_, InputFile::chunks_, cvReader_, detectedFRDversion_, mps_update::diff, end, eventChunkBlock_, mergeVDriftHistosByStation::file, InputChunk::fileIndex_, InputFile::fileName_, plotBeamSpotDB::first, i, plotBeamSpotDB::last, LogDebug, mReader_, fileCollector::now, InputChunk::offset_, read(), readBlocks_, InputChunk::readComplete_, edm::second(), setExceptionState_, command_line::start, startupCv_, startupLock_, thread_quit_signal, threadInit_, tid_active_, InputChunk::usedSize_, workerJob_, and workerPool_.

Referenced by FedRawDataInputSource().

1205 {
1206  bool init = true;
1207  threadInit_.exchange(true,std::memory_order_acquire);
1208 
1209  while (1) {
1210 
1211  tid_active_[tid]=false;
1212  std::unique_lock<std::mutex> lk(mReader_);
1213  workerJob_[tid].first=nullptr;
1214  workerJob_[tid].first=nullptr;
1215 
1216  assert(!thread_quit_signal[tid]);//should never get it here
1217  workerPool_.push(tid);
1218 
1219  if (init) {
1220  std::unique_lock<std::mutex> lk(startupLock_);
1221  init = false;
1222  startupCv_.notify_one();
1223  }
1224  cvReader_[tid]->wait(lk);
1225 
1226  if (thread_quit_signal[tid]) return;
1227  tid_active_[tid]=true;
1228 
1229  InputFile * file;
1230  InputChunk * chunk;
1231 
1232  assert(workerJob_[tid].first!=nullptr && workerJob_[tid].second!=nullptr);
1233 
1234  file = workerJob_[tid].first;
1235  chunk = workerJob_[tid].second;
1236 
1237  int fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1238  off_t pos = lseek(fileDescriptor,chunk->offset_,SEEK_SET);
1239 
1240 
1241  if (fileDescriptor>=0)
1242  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_ << " at offset " << pos;
1243  else
1244  {
1245  edm::LogError("FedRawDataInputSource") <<
1246  "readWorker failed to open file -: " << file->fileName_ << " fd:" << fileDescriptor <<
1247  " or seek to offset " << chunk->offset_ << ", lseek returned:" << pos;
1248  setExceptionState_=true;
1249  return;
1250 
1251  }
1252 
1253  unsigned int bufferLeft = 0;
1255  for (unsigned int i=0;i<readBlocks_;i++)
1256  {
1257  const ssize_t last = ::read(fileDescriptor,( void*) (chunk->buf_+bufferLeft), eventChunkBlock_);
1258  if ( last > 0 )
1259  bufferLeft+=last;
1260  if (last < eventChunkBlock_) {
1261  assert(chunk->usedSize_==i*eventChunkBlock_+last);
1262  break;
1263  }
1264  }
1266  auto diff = end-start;
1267  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1268  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB" << " in " << msec.count() << " ms ("<< (bufferLeft >> 20)/double(msec.count())<<" GB/s)";
1269  close(fileDescriptor);
1270 
1271  if (detectedFRDversion_==0 && chunk->offset_==0) detectedFRDversion_=*((uint32*)chunk->buf_);
1273  chunk->readComplete_=true;//this is atomic to secure the sequential buffer fill before becoming available for processing)
1274  file->chunks_[chunk->fileIndex_]=chunk;//put the completed chunk in the file chunk vector at predetermined index
1275 
1276  }
1277 }
#define LogDebug(id)
int i
Definition: DBlmapReader.cc:9
virtual void read(edm::EventPrincipal &eventPrincipal) override
unsigned int offset_
tbb::concurrent_queue< unsigned int > workerPool_
assert(m_qm.get())
std::vector< ReaderInfo > workerJob_
std::vector< std::condition_variable * > cvReader_
list diff
Definition: mps_update.py:85
U second(std::pair< T, U > const &p)
unsigned char * buf_
#define end
Definition: vmac.h:37
unsigned int fileIndex_
unsigned int uint32
Definition: MsgTools.h:13
std::atomic< bool > readComplete_
std::string fileName_
std::condition_variable startupCv_
tbb::concurrent_vector< InputChunk * > chunks_
std::atomic< bool > threadInit_
std::vector< unsigned int > thread_quit_signal
std::vector< unsigned int > tid_active_
void FedRawDataInputSource::reportEventsThisLumiInSource ( unsigned int  lumi,
unsigned int  events 
)
private

Definition at line 1393 of file FedRawDataInputSource.cc.

References CommonMethods::lock(), monlock_, and sourceEventsReport_.

Referenced by checkNextEvent(), and getNextEvent().

1394 {
1395 
1396  std::lock_guard<std::mutex> lock(monlock_);
1397  auto itr = sourceEventsReport_.find(lumi);
1398  if (itr!=sourceEventsReport_.end())
1399  itr->second+=events;
1400  else
1402 }
tuple lumi
Definition: fjr2json.py:35
std::map< unsigned int, unsigned int > sourceEventsReport_
tuple events
Definition: patZpeak.py:19
void FedRawDataInputSource::rewind_ ( )
overrideprivatevirtual

Reimplemented from edm::RawInputSource.

Definition at line 919 of file FedRawDataInputSource.cc.

920 {}
void FedRawDataInputSource::threadError ( )
private

Definition at line 1279 of file FedRawDataInputSource.cc.

References Exception, and quit_threads_.

Referenced by InputFile::advance(), and getNextEvent().

1280 {
1281  quit_threads_=true;
1282  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1283 
1284 }
std::atomic< bool > quit_threads_

Friends And Related Function Documentation

friend class InputChunk
friend

Definition at line 46 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

friend class InputFile
friend

Open Root file and provide MEs ############.

Definition at line 45 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

Member Data Documentation

uint32_t FedRawDataInputSource::bufferInputRead_ = 0
private

Definition at line 179 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readNextChunkIntoBuffer().

unsigned int FedRawDataInputSource::checkEvery_ = 10
private

Definition at line 170 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

bool FedRawDataInputSource::chunkIsFree_ =false
private

Definition at line 142 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

InputFile* FedRawDataInputSource::currentFile_ = 0
private

Definition at line 141 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

int FedRawDataInputSource::currentFileIndex_ = -1
private

Definition at line 164 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

unsigned int FedRawDataInputSource::currentLumiSection_
private
std::vector<std::condition_variable*> FedRawDataInputSource::cvReader_
private
std::condition_variable FedRawDataInputSource::cvWakeup_
private

Definition at line 174 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

evf::EvFDaqDirector* FedRawDataInputSource::daqDirector_ = 0
private
const edm::DaqProvenanceHelper FedRawDataInputSource::daqProvenanceHelper_
private

Definition at line 115 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and read().

std::string FedRawDataInputSource::defPath_
private

Definition at line 89 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

uint32 FedRawDataInputSource::detectedFRDversion_ =0
private

Definition at line 140 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readWorker().

jsoncollector::DataPointDefinition* FedRawDataInputSource::dpd_
private

Definition at line 130 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and grabNextJsonFile().

std::unique_ptr<FRDEventMsgView> FedRawDataInputSource::event_
private

Definition at line 117 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), fillFEDRawDataCollection(), and getNextEvent().

unsigned int FedRawDataInputSource::eventChunkBlock_
private
unsigned int FedRawDataInputSource::eventChunkSize_
private
edm::EventID FedRawDataInputSource::eventID_
private

Definition at line 119 of file FedRawDataInputSource.h.

Referenced by read().

uint32_t FedRawDataInputSource::eventRunNumber_ =0
private

Definition at line 123 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and read().

unsigned int FedRawDataInputSource::eventsThisLumi_
private

Definition at line 127 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), getNextEvent(), and read().

unsigned long FedRawDataInputSource::eventsThisRun_ = 0
private

Definition at line 128 of file FedRawDataInputSource.h.

std::mutex FedRawDataInputSource::fileDeleteLock_
private

Definition at line 167 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and read().

int FedRawDataInputSource::fileDescriptor_ = -1
private

Definition at line 178 of file FedRawDataInputSource.h.

Referenced by readNextChunkIntoBuffer().

unsigned int FedRawDataInputSource::fileListIndex_ = 0
private

Definition at line 108 of file FedRawDataInputSource.h.

Referenced by getFile().

const bool FedRawDataInputSource::fileListLoopMode_
private

Definition at line 109 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), FedRawDataInputSource(), getFile(), and read().

const bool FedRawDataInputSource::fileListMode_
private
std::vector<std::string> FedRawDataInputSource::fileNames_
private

Definition at line 104 of file FedRawDataInputSource.h.

Referenced by getFile(), and initFileList().

std::list<std::pair<int,std::string> > FedRawDataInputSource::fileNamesToDelete_
private

Definition at line 166 of file FedRawDataInputSource.h.

tbb::concurrent_queue<InputFile*> FedRawDataInputSource::fileQueue_
private

Definition at line 152 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

std::list<std::pair<int,InputFile*> > FedRawDataInputSource::filesToDelete_
private
evf::FastMonitoringService* FedRawDataInputSource::fms_ = 0
private
tbb::concurrent_queue<InputChunk*> FedRawDataInputSource::freeChunks_
private

Definition at line 151 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), read(), and readSupervisor().

std::string FedRawDataInputSource::fuOutputDir_
private

Definition at line 113 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and grabNextJsonFile().

const bool FedRawDataInputSource::getLSFromFilename_
private

Definition at line 100 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), getNextEvent(), and readSupervisor().

uint32_t FedRawDataInputSource::GTPEventID_ = 0
private

Definition at line 124 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

uint32_t FedRawDataInputSource::L1EventID_ = 0
private

Definition at line 125 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and read().

unsigned int FedRawDataInputSource::loopModeIterationInc_ = 0
private

Definition at line 110 of file FedRawDataInputSource.h.

Referenced by getFile().

unsigned int FedRawDataInputSource::maxBufferedFiles_
private

Definition at line 95 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

std::mutex FedRawDataInputSource::monlock_
private

Definition at line 184 of file FedRawDataInputSource.h.

Referenced by getEventReport(), and reportEventsThisLumiInSource().

std::mutex FedRawDataInputSource::mReader_
private

Definition at line 154 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), readWorker(), and ~FedRawDataInputSource().

std::mutex FedRawDataInputSource::mWakeup_
private

Definition at line 173 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

unsigned int FedRawDataInputSource::nStreams_ = 0
private

Definition at line 169 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

unsigned int FedRawDataInputSource::numBuffers_
private

Definition at line 94 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

unsigned int FedRawDataInputSource::numConcurrentReads_
private

Definition at line 96 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and ~FedRawDataInputSource().

edm::ProcessHistoryID FedRawDataInputSource::processHistoryID_
private

Definition at line 120 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), maybeOpenNewLumiSection(), and read().

std::atomic<bool> FedRawDataInputSource::quit_threads_
private
unsigned int FedRawDataInputSource::readBlocks_
private
std::atomic<unsigned int> FedRawDataInputSource::readingFilesCount_
private

Definition at line 97 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

std::unique_ptr<std::thread> FedRawDataInputSource::readSupervisorThread_
private

Definition at line 145 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and ~FedRawDataInputSource().

edm::RunNumber_t FedRawDataInputSource::runNumber_
private
bool FedRawDataInputSource::setExceptionState_ = false
private

Definition at line 160 of file FedRawDataInputSource.h.

Referenced by exceptionState(), getNextEvent(), readSupervisor(), and readWorker().

bool FedRawDataInputSource::singleBufferMode_
private

Definition at line 177 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

std::map<unsigned int,unsigned int> FedRawDataInputSource::sourceEventsReport_
private

Definition at line 183 of file FedRawDataInputSource.h.

Referenced by getEventReport(), and reportEventsThisLumiInSource().

bool FedRawDataInputSource::startedSupervisorThread_ = false
private

Definition at line 144 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and ~FedRawDataInputSource().

std::condition_variable FedRawDataInputSource::startupCv_
private
std::mutex FedRawDataInputSource::startupLock_
private
std::vector<int>* FedRawDataInputSource::streamFileTrackerPtr_ = 0
private

Definition at line 168 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

unsigned char* FedRawDataInputSource::tcds_pointer_
private

Definition at line 126 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

std::vector<unsigned int> FedRawDataInputSource::thread_quit_signal
private
std::atomic<bool> FedRawDataInputSource::threadInit_
private

Definition at line 181 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and readWorker().

std::vector<unsigned int> FedRawDataInputSource::tid_active_
private

Definition at line 156 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), readSupervisor(), and readWorker().

const bool FedRawDataInputSource::useL1EventID_
private

Definition at line 103 of file FedRawDataInputSource.h.

Referenced by read().

const bool FedRawDataInputSource::verifyAdler32_
private

Definition at line 101 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

const bool FedRawDataInputSource::verifyChecksum_
private

Definition at line 102 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

std::vector<ReaderInfo> FedRawDataInputSource::workerJob_
private

Definition at line 149 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), readSupervisor(), and readWorker().

tbb::concurrent_queue<unsigned int> FedRawDataInputSource::workerPool_
private

Definition at line 148 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), and readWorker().

std::vector<std::thread*> FedRawDataInputSource::workerThreads_
private