CMS 3D CMS Logo

List of all members | Public Member Functions | Static Public Member Functions | Protected Member Functions | Private Types | Private Member Functions | Private Attributes | Friends
FedRawDataInputSource Class Reference

#include <FedRawDataInputSource.h>

Inheritance diagram for FedRawDataInputSource:
edm::RawInputSource edm::InputSource

Public Member Functions

 FedRawDataInputSource (edm::ParameterSet const &, edm::InputSourceDescription const &)
 
std::pair< bool, unsigned int > getEventReport (unsigned int lumi, bool erase)
 
 ~FedRawDataInputSource () override
 
- Public Member Functions inherited from edm::RawInputSource
 RawInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
 
 ~RawInputSource () override
 
- Public Member Functions inherited from edm::InputSource
std::shared_ptr< ActivityRegistryactReg () const
 Accessor for Activity Registry. More...
 
std::shared_ptr< BranchIDListHelper const > branchIDListHelper () const
 Accessors for branchIDListHelper. More...
 
std::shared_ptr< BranchIDListHelper > & branchIDListHelper ()
 
void closeFile (FileBlock *, bool cleaningUpAfterException)
 close current file More...
 
void doBeginJob ()
 Called by framework at beginning of job. More...
 
virtual void doBeginLumi (LuminosityBlockPrincipal &lbp, ProcessContext const *)
 Called by framework at beginning of lumi block. More...
 
virtual void doBeginRun (RunPrincipal &rp, ProcessContext const *)
 Called by framework at beginning of run. More...
 
void doEndJob ()
 Called by framework at end of job. More...
 
ProcessingController::ForwardState forwardState () const
 
bool goToEvent (EventID const &eventID)
 
 InputSource (ParameterSet const &, InputSourceDescription const &)
 Constructor. More...
 
 InputSource (InputSource const &)=delete
 
void issueReports (EventID const &eventID, StreamID streamID)
 issue an event report More...
 
LuminosityBlockNumber_t luminosityBlock () const
 Accessor for current luminosity block number. More...
 
std::shared_ptr< LuminosityBlockAuxiliaryluminosityBlockAuxiliary () const
 Called by the framework to merge or insert lumi in principal cache. More...
 
int maxEvents () const
 
int maxLuminosityBlocks () const
 
ModuleDescription const & moduleDescription () const
 Accessor for 'module' description. More...
 
ItemType nextItemType ()
 Advances the source to the next item. More...
 
InputSourceoperator= (InputSource const &)=delete
 
ProcessConfiguration const & processConfiguration () const
 Accessor for Process Configuration. More...
 
std::string const & processGUID () const
 Accessor for global process identifier. More...
 
ProcessHistoryRegistry const & processHistoryRegistry () const
 Accessors for process history registry. More...
 
ProcessHistoryRegistryprocessHistoryRegistry ()
 
ProcessingMode processingMode () const
 RunsLumisAndEvents (default), RunsAndLumis, or Runs. More...
 
std::shared_ptr< ProductRegistry const > productRegistry () const
 Accessors for product registry. More...
 
std::shared_ptr< ProductRegistry > & productRegistry ()
 
bool randomAccess () const
 
void readAndMergeLumi (LuminosityBlockPrincipal &lbp)
 Read next luminosity block (same as a prior lumi) More...
 
void readAndMergeRun (RunPrincipal &rp)
 Read next run (same as a prior run) More...
 
void readEvent (EventPrincipal &ep, StreamContext &)
 Read next event. More...
 
bool readEvent (EventPrincipal &ep, EventID const &, StreamContext &)
 Read a specific event. More...
 
std::unique_ptr< FileBlockreadFile ()
 Read next file. More...
 
void readLuminosityBlock (LuminosityBlockPrincipal &lumiPrincipal, HistoryAppender &historyAppender)
 Read next luminosity block (new lumi) More...
 
std::shared_ptr< LuminosityBlockAuxiliaryreadLuminosityBlockAuxiliary ()
 Read next luminosity block Auxilary. More...
 
void readRun (RunPrincipal &runPrincipal, HistoryAppender &historyAppender)
 Read next run (new run) More...
 
std::shared_ptr< RunAuxiliaryreadRunAuxiliary ()
 Read next run Auxiliary. More...
 
ProcessHistoryID const & reducedProcessHistoryID () const
 
virtual void registerProducts ()
 Register any produced products. More...
 
int remainingEvents () const
 
int remainingLuminosityBlocks () const
 
void repeat ()
 Reset the remaining number of events/lumis to the maximum number. More...
 
std::pair< SharedResourcesAcquirer *, std::recursive_mutex * > resourceSharedWithDelayedReader ()
 Returns nullptr if no resource shared between the Source and a DelayedReader. More...
 
ProcessingController::ReverseState reverseState () const
 
void rewind ()
 Begin again at the first event. More...
 
RunNumber_t run () const
 Accessor for current run number. More...
 
std::shared_ptr< RunAuxiliaryrunAuxiliary () const
 Called by the framework to merge or insert run in principal cache. More...
 
void setLuminosityBlockNumber_t (LuminosityBlockNumber_t lb)
 Set the luminosity block ID. More...
 
void setRunNumber (RunNumber_t r)
 Set the run number. More...
 
void skipEvents (int offset)
 
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper () const
 Accessors for thinnedAssociationsHelper. More...
 
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper ()
 
Timestamp const & timestamp () const
 Accessor for the current time, as seen by the input source. More...
 
virtual ~InputSource () noexcept(false)
 Destructor. More...
 

Static Public Member Functions

static void fillDescriptions (edm::ConfigurationDescriptions &descriptions)
 
- Static Public Member Functions inherited from edm::RawInputSource
static void fillDescription (ParameterSetDescription &description)
 
- Static Public Member Functions inherited from edm::InputSource
static const std::string & baseType ()
 
static void fillDescription (ParameterSetDescription &desc)
 
static void fillDescriptions (ConfigurationDescriptions &descriptions)
 
static void prevalidate (ConfigurationDescriptions &)
 

Protected Member Functions

bool checkNextEvent () override
 
void read (edm::EventPrincipal &eventPrincipal) override
 
- Protected Member Functions inherited from edm::RawInputSource
void makeEvent (EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
 
void setInputFileTransitionsEachEvent ()
 
- Protected Member Functions inherited from edm::InputSource
virtual void beginJob ()
 Begin protected makes it easier to do template programming. More...
 
void decreaseRemainingEventsBy (int iSkipped)
 
bool eventCached () const
 
bool newLumi () const
 
bool newRun () const
 
ProcessHistoryRegistryprocessHistoryRegistryForUpdate ()
 
ProductRegistryproductRegistryUpdate ()
 
void reset () const
 
void resetEventCached ()
 
void resetLuminosityBlockAuxiliary (bool isNewLumi=true) const
 
void resetNewLumi ()
 
void resetNewRun ()
 
void resetRunAuxiliary (bool isNewRun=true) const
 
void setEventCached ()
 Called by the framework to merge or ached() const {return eventCached_;}. More...
 
void setLuminosityBlockAuxiliary (LuminosityBlockAuxiliary *lbp)
 
void setNewLumi ()
 
void setNewRun ()
 
void setRunAuxiliary (RunAuxiliary *rp)
 
void setTimestamp (Timestamp const &theTime)
 To set the current time, as seen by the input source. More...
 
virtual void skip (int offset)
 
ItemType state () const
 

Private Types

typedef std::pair< InputFile *, InputChunk * > ReaderInfo
 

Private Member Functions

bool exceptionState ()
 
edm::Timestamp fillFEDRawDataCollection (FEDRawDataCollection &)
 
evf::EvFDaqDirector::FileStatus getFile (unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
 
evf::EvFDaqDirector::FileStatus getNextEvent ()
 
long initFileList ()
 
void maybeOpenNewLumiSection (const uint32_t lumiSection)
 
evf::EvFDaqDirector::FileStatus nextEvent ()
 
void readNextChunkIntoBuffer (InputFile *file)
 
void readSupervisor ()
 
void readWorker (unsigned int tid)
 
void reportEventsThisLumiInSource (unsigned int lumi, unsigned int events)
 
void rewind_ () override
 
void threadError ()
 

Private Attributes

const bool alwaysStartFromFirstLS_
 
uint32_t bufferInputRead_ = 0
 
unsigned int checkEvery_ = 10
 
bool chunkIsFree_ = false
 
std::unique_ptr< InputFilecurrentFile_
 
int currentFileIndex_ = -1
 
unsigned int currentLumiSection_
 
std::vector< std::condition_variable * > cvReader_
 
std::condition_variable cvWakeup_
 
evf::EvFDaqDirectordaqDirector_ = 0
 
const edm::DaqProvenanceHelper daqProvenanceHelper_
 
std::string defPath_
 
uint32 detectedFRDversion_ = 0
 
std::unique_ptr< FRDEventMsgViewevent_
 
unsigned int eventChunkBlock_
 
unsigned int eventChunkSize_
 
edm::EventID eventID_
 
uint32_t eventRunNumber_ = 0
 
unsigned int eventsThisLumi_
 
unsigned long eventsThisRun_ = 0
 
std::mutex fileDeleteLock_
 
int fileDescriptor_ = -1
 
unsigned int fileListIndex_ = 0
 
const bool fileListLoopMode_
 
const bool fileListMode_
 
std::vector< std::string > fileNames_
 
std::list< std::pair< int, std::string > > fileNamesToDelete_
 
tbb::concurrent_queue< std::unique_ptr< InputFile > > fileQueue_
 
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
 
evf::FastMonitoringServicefms_ = 0
 
tbb::concurrent_queue< InputChunk * > freeChunks_
 
std::string fuOutputDir_
 
const bool getLSFromFilename_
 
uint32_t GTPEventID_ = 0
 
uint32_t L1EventID_ = 0
 
unsigned int loopModeIterationInc_ = 0
 
unsigned int maxBufferedFiles_
 
std::mutex monlock_
 
std::mutex mReader_
 
std::mutex mWakeup_
 
unsigned int nStreams_ = 0
 
unsigned int numBuffers_
 
unsigned int numConcurrentReads_
 
edm::ProcessHistoryID processHistoryID_
 
std::atomic< bool > quit_threads_
 
unsigned int readBlocks_
 
std::atomic< unsigned int > readingFilesCount_
 
std::unique_ptr< std::thread > readSupervisorThread_
 
edm::RunNumber_t runNumber_
 
bool setExceptionState_ = false
 
bool singleBufferMode_
 
std::map< unsigned int, unsigned int > sourceEventsReport_
 
bool startedSupervisorThread_ = false
 
std::condition_variable startupCv_
 
std::mutex startupLock_
 
std::vector< int > streamFileTracker_
 
unsigned char * tcds_pointer_
 
std::vector< unsigned int > thread_quit_signal
 
std::atomic< bool > threadInit_
 
std::vector< unsigned int > tid_active_
 
bool useFileBroker_
 
const bool useL1EventID_
 
const bool verifyChecksum_
 
std::vector< ReaderInfoworkerJob_
 
tbb::concurrent_queue< unsigned int > workerPool_
 
std::vector< std::thread * > workerThreads_
 

Friends

struct InputChunk
 
struct InputFile
 

Additional Inherited Members

- Public Types inherited from edm::InputSource
enum  ItemType {
  IsInvalid, IsStop, IsFile, IsRun,
  IsLumi, IsEvent, IsRepeat, IsSynchronize
}
 
enum  ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents }
 
- Public Attributes inherited from edm::InputSource
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> postEventReadFromSourceSignal_
 
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> preEventReadFromSourceSignal_
 

Detailed Description

Definition at line 36 of file FedRawDataInputSource.h.

Member Typedef Documentation

Definition at line 128 of file FedRawDataInputSource.h.

Constructor & Destructor Documentation

FedRawDataInputSource::FedRawDataInputSource ( edm::ParameterSet const &  pset,
edm::InputSourceDescription const &  desc 
)
explicit

Definition at line 53 of file FedRawDataInputSource.cc.

References edm::Timestamp::beginOfTime(), crc32c_hw_test(), cvReader_, daqDirector_, edm::DaqProvenanceHelper::daqInit(), daqProvenanceHelper_, eventChunkBlock_, eventChunkSize_, Exception, fileDeleteLock_, fileListLoopMode_, fileListMode_, filesToDelete_, fms_, freeChunks_, mps_fire::i, evf::FastMonitoringThread::inInit, initFileList(), InputChunk, edm::Timestamp::invalidTimestamp(), numBuffers_, numConcurrentReads_, Utilities::operator, processHistoryID_, edm::InputSource::processHistoryRegistryForUpdate(), edm::InputSource::productRegistryUpdate(), quit_threads_, readBlocks_, readingFilesCount_, readWorker(), edm::InputSource::runAuxiliary(), runNumber_, evf::EvFDaqDirector::setDeleteTracking(), evf::EvFDaqDirector::setFMS(), evf::FastMonitoringService::setInputSource(), evf::FastMonitoringService::setInState(), evf::FastMonitoringService::setInStateSup(), edm::InputSource::setNewRun(), edm::InputSource::setRunAuxiliary(), singleBufferMode_, startupCv_, startupLock_, thread_quit_signal, threadInit_, tid_active_, evf::EvFDaqDirector::useFileBroker(), useFileBroker_, workerJob_, and workerThreads_.

54  : edm::RawInputSource(pset, desc),
55  defPath_(pset.getUntrackedParameter<std::string>("buDefPath", "")),
56  eventChunkSize_(pset.getUntrackedParameter<unsigned int>("eventChunkSize", 32) * 1048576),
57  eventChunkBlock_(pset.getUntrackedParameter<unsigned int>("eventChunkBlock", 32) * 1048576),
58  numBuffers_(pset.getUntrackedParameter<unsigned int>("numBuffers", 2)),
59  maxBufferedFiles_(pset.getUntrackedParameter<unsigned int>("maxBufferedFiles", 2)),
60  getLSFromFilename_(pset.getUntrackedParameter<bool>("getLSFromFilename", true)),
61  alwaysStartFromFirstLS_(pset.getUntrackedParameter<bool>("alwaysStartFromFirstLS", false)),
62  verifyChecksum_(pset.getUntrackedParameter<bool>("verifyChecksum", true)),
63  useL1EventID_(pset.getUntrackedParameter<bool>("useL1EventID", false)),
64  fileNames_(pset.getUntrackedParameter<std::vector<std::string>>("fileNames", std::vector<std::string>())),
65  fileListMode_(pset.getUntrackedParameter<bool>("fileListMode", false)),
66  fileListLoopMode_(pset.getUntrackedParameter<bool>("fileListLoopMode", false)),
69  eventID_(),
72  tcds_pointer_(nullptr),
73  eventsThisLumi_(0) {
74  char thishost[256];
75  gethostname(thishost, 255);
76  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: " << std::endl
77  << (eventChunkSize_ / 1048576) << " MB on host " << thishost;
78 
79  long autoRunNumber = -1;
80  if (fileListMode_) {
81  autoRunNumber = initFileList();
82  if (!fileListLoopMode_) {
83  if (autoRunNumber < 0)
84  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") << "Run number not found from filename";
85  //override run number
86  runNumber_ = (edm::RunNumber_t)autoRunNumber;
87  edm::Service<evf::EvFDaqDirector>()->overrideRunNumber((unsigned int)autoRunNumber);
88  }
89  }
90 
92  setNewRun();
93  //todo:autodetect from file name (assert if names differ)
95 
96  //make sure that chunk size is N * block size
101 
102  if (!numBuffers_)
103  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource")
104  << "no reading enabled with numBuffers parameter 0";
105 
108  readingFilesCount_ = 0;
109 
110  if (!crc32c_hw_test())
111  edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
112 
113  //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
114  if (fileListMode_) {
115  try {
117  } catch (cms::Exception const&) {
118  edm::LogInfo("FedRawDataInputSource") << "No FastMonitoringService found in the configuration";
119  }
120  } else {
122  if (!fms_) {
123  throw cms::Exception("FedRawDataInputSource") << "FastMonitoringService not found";
124  }
125  }
126 
128  if (!daqDirector_)
129  cms::Exception("FedRawDataInputSource") << "EvFDaqDirector not found";
130 
132  if (useFileBroker_)
133  edm::LogInfo("FedRawDataInputSource") << "EvFDaqDirector/Source configured to use file service";
134  //set DaqDirector to delete files in preGlobalEndLumi callback
136  if (fms_) {
138  fms_->setInputSource(this);
141  }
142  //should delete chunks when run stops
143  for (unsigned int i = 0; i < numBuffers_; i++) {
145  }
146 
147  quit_threads_ = false;
148 
149  for (unsigned int i = 0; i < numConcurrentReads_; i++) {
150  std::unique_lock<std::mutex> lk(startupLock_);
151  //issue a memory fence here and in threads (constructor was segfaulting without this)
152  thread_quit_signal.push_back(false);
153  workerJob_.push_back(ReaderInfo(nullptr, nullptr));
154  cvReader_.push_back(new std::condition_variable);
155  tid_active_.push_back(0);
156  threadInit_.store(false, std::memory_order_release);
157  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker, this, i));
158  startupCv_.wait(lk);
159  }
160 
161  runAuxiliary()->setProcessHistoryID(processHistoryID_);
162 }
std::vector< std::string > fileNames_
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:326
static Timestamp invalidTimestamp()
Definition: Timestamp.h:82
bool crc32c_hw_test()
Definition: crc32c.cc:354
void setInState(FastMonitoringThread::InputState inputState)
void setInStateSup(FastMonitoringThread::InputState inputState)
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
std::pair< InputFile *, InputChunk * > ReaderInfo
std::vector< std::condition_variable * > cvReader_
bool useFileBroker() const
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
static Timestamp beginOfTime()
Definition: Timestamp.h:84
edm::ProcessHistoryID processHistoryID_
const edm::DaqProvenanceHelper daqProvenanceHelper_
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
std::vector< std::thread * > workerThreads_
void setInputSource(FedRawDataInputSource *inputSource)
def getRunNumber(filename)
std::condition_variable startupCv_
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:239
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:327
std::atomic< bool > threadInit_
evf::EvFDaqDirector * daqDirector_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:329
void readWorker(unsigned int tid)
std::vector< unsigned int > thread_quit_signal
std::atomic< unsigned int > readingFilesCount_
evf::FastMonitoringService * fms_
std::vector< unsigned int > tid_active_
unsigned int RunNumber_t
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
tbb::concurrent_queue< InputChunk * > freeChunks_
void setFMS(evf::FastMonitoringService *fms)
FedRawDataInputSource::~FedRawDataInputSource ( )
override

Definition at line 164 of file FedRawDataInputSource.cc.

References cvReader_, filesToDelete_, mps_fire::i, mReader_, numConcurrentReads_, quit_threads_, readSupervisorThread_, startedSupervisorThread_, AlCaHLTBitMon_QueryRunRegistry::string, thread_quit_signal, and workerThreads_.

164  {
165  quit_threads_ = true;
166 
167  //delete any remaining open files
168  for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++) {
169  std::string fileToDelete = it->second->fileName_;
170  it->second.release();
171  }
173  readSupervisorThread_->join();
174  } else {
175  //join aux threads in case the supervisor thread was not started
176  for (unsigned int i = 0; i < workerThreads_.size(); i++) {
177  std::unique_lock<std::mutex> lk(mReader_);
178  thread_quit_signal[i] = true;
179  cvReader_[i]->notify_one();
180  lk.unlock();
181  workerThreads_[i]->join();
182  delete workerThreads_[i];
183  }
184  }
185  for (unsigned int i = 0; i < numConcurrentReads_; i++)
186  delete cvReader_[i];
187  /*
188  for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
189  InputChunk *ch;
190  while (!freeChunks_.try_pop(ch)) {}
191  delete ch;
192  }
193  */
194 }
std::atomic< bool > quit_threads_
std::unique_ptr< std::thread > readSupervisorThread_
std::vector< std::condition_variable * > cvReader_
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
std::vector< std::thread * > workerThreads_
std::vector< unsigned int > thread_quit_signal

Member Function Documentation

bool FedRawDataInputSource::checkNextEvent ( )
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 219 of file FedRawDataInputSource.cc.

References evf::EvFDaqDirector::createProcessingNotificationMaybe(), currentLumiSection_, daqDirector_, event_, eventRunNumber_, eventsThisLumi_, fileListLoopMode_, fileListMode_, fms_, newFWLiteAna::found, evf::EvFDaqDirector::getEoLSFilePathOnBU(), evf::EvFDaqDirector::getEoLSFilePathOnFU(), evf::EvFDaqDirector::getEoRFilePathOnFU(), getLSFromFilename_, evf::FastMonitoringThread::inWaitInput, L1EventID_, evf::EvFDaqDirector::lockFULocal2(), maybeOpenNewLumiSection(), evf::EvFDaqDirector::newLumi, nextEvent(), evf::EvFDaqDirector::noFile, readSupervisor(), readSupervisorThread_, reportEventsThisLumiInSource(), edm::InputSource::resetLuminosityBlockAuxiliary(), evf::EvFDaqDirector::runEnded, runNumber_, edm::InputSource::setEventCached(), evf::FastMonitoringService::setInState(), startedSupervisorThread_, startupCv_, startupLock_, hgcalPlots::stat, AlCaHLTBitMon_QueryRunRegistry::string, evf::EvFDaqDirector::unlockFULocal2(), and useFileBroker_.

219  {
221  //this thread opens new files and dispatches reading to worker readers
222  //threadInit_.store(false,std::memory_order_release);
223  std::unique_lock<std::mutex> lk(startupLock_);
224  readSupervisorThread_.reset(new std::thread(&FedRawDataInputSource::readSupervisor, this));
226  startupCv_.wait(lk);
227  }
228  //signal hltd to start event accounting
229  if (!currentLumiSection_)
231  if (fms_)
233  switch (nextEvent()) {
235  //maybe create EoL file in working directory before ending run
236  struct stat buf;
237  if (!useFileBroker_ && currentLumiSection_ > 0) {
238  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
239  if (eolFound) {
241  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
242  if (!found) {
244  int eol_fd =
245  open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
246  close(eol_fd);
248  }
249  }
250  }
251  //also create EoR file in FU data directory
252  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(), &buf) == 0);
253  if (!eorFound) {
254  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(),
255  O_RDWR | O_CREAT,
256  S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
257  close(eor_fd);
258  }
260  eventsThisLumi_ = 0;
262  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
263  return false;
264  }
266  //this is not reachable
267  return true;
268  }
270  //std::cout << "--------------NEW LUMI---------------" << std::endl;
271  return true;
272  }
273  default: {
274  if (!getLSFromFilename_) {
275  //get new lumi from file header
276  if (event_->lumi() > currentLumiSection_) {
278  eventsThisLumi_ = 0;
280  }
281  }
284  else
285  eventRunNumber_ = event_->run();
286  L1EventID_ = event_->event();
287 
288  setEventCached();
289 
290  return true;
291  }
292  }
293 }
void maybeOpenNewLumiSection(const uint32_t lumiSection)
void setInState(FastMonitoringThread::InputState inputState)
void createProcessingNotificationMaybe() const
std::unique_ptr< std::thread > readSupervisorThread_
std::string getEoLSFilePathOnBU(const unsigned int ls) const
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:358
std::unique_ptr< FRDEventMsgView > event_
std::condition_variable startupCv_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:341
evf::EvFDaqDirector * daqDirector_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
evf::FastMonitoringService * fms_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
evf::EvFDaqDirector::FileStatus nextEvent()
std::string getEoRFilePathOnFU() const
bool FedRawDataInputSource::exceptionState ( )
inlineprivate
void FedRawDataInputSource::fillDescriptions ( edm::ConfigurationDescriptions descriptions)
static

Definition at line 196 of file FedRawDataInputSource.cc.

References edm::ConfigurationDescriptions::add(), edm::ParameterSetDescription::addUntracked(), edm::ParameterSetDescription::setAllowAnything(), edm::ParameterSetDescription::setComment(), and edm::ParameterDescriptionNode::setComment().

196  {
198  desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk");
199  desc.addUntracked<unsigned int>("eventChunkSize", 32)->setComment("Input buffer (chunk) size");
200  desc.addUntracked<unsigned int>("eventChunkBlock", 32)
201  ->setComment("Block size used in a single file read call (must be smaller or equal to buffer size)");
202  desc.addUntracked<unsigned int>("numBuffers", 2)->setComment("Number of buffers used for reading input");
203  desc.addUntracked<unsigned int>("maxBufferedFiles", 2)
204  ->setComment("Maximum number of simultaneously buffered raw files");
205  desc.addUntracked<unsigned int>("alwaysStartFromfirstLS", false)
206  ->setComment("Force source to start from LS 1 if server provides higher lumisection number");
207  desc.addUntracked<bool>("verifyChecksum", true)
208  ->setComment("Verify event CRC-32C checksum of FRDv5 and higher or Adler32 with v3 and v4");
209  desc.addUntracked<bool>("useL1EventID", false)
210  ->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
211  desc.addUntracked<bool>("fileListMode", false)
212  ->setComment("Use fileNames parameter to directly specify raw files to open");
213  desc.addUntracked<std::vector<std::string>>("fileNames", std::vector<std::string>())
214  ->setComment("file list used when fileListMode is enabled");
215  desc.setAllowAnything();
216  descriptions.add("source", desc);
217 }
void setComment(std::string const &value)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void setAllowAnything()
allow any parameter label/value pairs
void setComment(std::string const &value)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection ( FEDRawDataCollection rawData)
private

Definition at line 683 of file FedRawDataInputSource.cc.

References FEDRawData::data(), event_, evf::evtn::evm_board_sense(), Exception, FEDRawDataCollection::FEDData(), l1tstage2_dqm_sourceclient-live_cfg::fedId, FEDTrailer::fragmentLength(), evf::evtn::get(), evf::evtn::getgpshigh(), evf::evtn::getgpslow(), evf::evtn::gtpe_board_sense(), evf::evtn::gtpe_get(), GTPEventID_, FEDHeader::length, FEDTrailer::length, FEDNumbering::MAXFEDID, FEDNumbering::MINTCDSuTCAFEDID, FEDNumbering::MINTriggerEGTPFEDID, FEDNumbering::MINTriggerGTPFEDID, FEDRawData::resize(), FEDHeader::sourceID(), tcds_pointer_, and ntuplemaker::time.

Referenced by read().

683  {
685  timeval stv;
686  gettimeofday(&stv, nullptr);
687  time = stv.tv_sec;
688  time = (time << 32) + stv.tv_usec;
689  edm::Timestamp tstamp(time);
690 
691  uint32_t eventSize = event_->eventSize();
692  unsigned char* event = (unsigned char*)event_->payload();
693  GTPEventID_ = 0;
694  tcds_pointer_ = nullptr;
695  while (eventSize > 0) {
696  assert(eventSize >= FEDTrailer::length);
697  eventSize -= FEDTrailer::length;
698  const FEDTrailer fedTrailer(event + eventSize);
699  const uint32_t fedSize = fedTrailer.fragmentLength() << 3; //trailer length counts in 8 bytes
700  assert(eventSize >= fedSize - FEDHeader::length);
701  eventSize -= (fedSize - FEDHeader::length);
702  const FEDHeader fedHeader(event + eventSize);
703  const uint16_t fedId = fedHeader.sourceID();
704  if (fedId > FEDNumbering::MAXFEDID) {
705  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
706  }
707  if (fedId == FEDNumbering::MINTCDSuTCAFEDID) {
708  tcds_pointer_ = event + eventSize;
709  }
710  if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
711  if (evf::evtn::evm_board_sense(event + eventSize, fedSize))
712  GTPEventID_ = evf::evtn::get(event + eventSize, true);
713  else
714  GTPEventID_ = evf::evtn::get(event + eventSize, false);
715  //evf::evtn::evm_board_setformat(fedSize);
716  const uint64_t gpsl = evf::evtn::getgpslow(event + eventSize);
717  const uint64_t gpsh = evf::evtn::getgpshigh(event + eventSize);
718  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t>((gpsh << 32) + gpsl));
719  }
720  //take event ID from GTPE FED
721  if (fedId == FEDNumbering::MINTriggerEGTPFEDID && GTPEventID_ == 0) {
722  if (evf::evtn::gtpe_board_sense(event + eventSize)) {
723  GTPEventID_ = evf::evtn::gtpe_get(event + eventSize);
724  }
725  }
726  FEDRawData& fedData = rawData.FEDData(fedId);
727  fedData.resize(fedSize);
728  memcpy(fedData.data(), event + eventSize, fedSize);
729  }
730  assert(eventSize == 0);
731 
732  return tstamp;
733 }
unsigned int getgpshigh(const unsigned char *)
bool gtpe_board_sense(const unsigned char *p)
static const uint32_t length
Definition: FEDTrailer.h:57
unsigned int get(const unsigned char *, bool)
static const uint32_t length
Definition: FEDHeader.h:54
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
void resize(size_t newsize)
Definition: FEDRawData.cc:28
unsigned long long TimeValue_t
Definition: Timestamp.h:28
bool evm_board_sense(const unsigned char *p, size_t size)
unsigned long long uint64_t
Definition: Time.h:13
std::unique_ptr< FRDEventMsgView > event_
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:24
unsigned int gtpe_get(const unsigned char *)
unsigned int getgpslow(const unsigned char *)
Definition: event.py:1
std::pair< bool, unsigned int > FedRawDataInputSource::getEventReport ( unsigned int  lumi,
bool  erase 
)

Definition at line 1467 of file FedRawDataInputSource.cc.

References CommonMethods::lock(), monlock_, runTheMatrix::ret, and sourceEventsReport_.

Referenced by evf::FastMonitoringService::preGlobalEndLumi().

1467  {
1468  std::lock_guard<std::mutex> lock(monlock_);
1469  auto itr = sourceEventsReport_.find(lumi);
1470  if (itr != sourceEventsReport_.end()) {
1471  std::pair<bool, unsigned int> ret(true, itr->second);
1472  if (erase)
1473  sourceEventsReport_.erase(itr);
1474  return ret;
1475  } else
1476  return std::pair<bool, unsigned int>(false, 0);
1477 }
ret
prodAgent to be discontinued
std::map< unsigned int, unsigned int > sourceEventsReport_
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getFile ( unsigned int &  ls,
std::string &  nextFile,
uint32_t &  fsize,
uint64_t &  lockWaitTime 
)
private

Definition at line 1509 of file FedRawDataInputSource.cc.

References fileListIndex_, fileListLoopMode_, MillePedeFileConverter_cfg::fileName, fileNames_, loopModeIterationInc_, evf::EvFDaqDirector::newFile, castor_dqm_sourceclient_file_cfg::path, evf::EvFDaqDirector::runEnded, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by readSupervisor().

1512  {
1513  if (fileListIndex_ < fileNames_.size()) {
1514  nextFile = fileNames_[fileListIndex_];
1515  if (nextFile.find("file://") == 0)
1516  nextFile = nextFile.substr(7);
1517  else if (nextFile.find("file:") == 0)
1518  nextFile = nextFile.substr(5);
1519  boost::filesystem::path fileName = nextFile;
1520  std::string fileStem = fileName.stem().string();
1521  if (fileStem.find("ls"))
1522  fileStem = fileStem.substr(fileStem.find("ls") + 2);
1523  if (fileStem.find("_"))
1524  fileStem = fileStem.substr(0, fileStem.find("_"));
1525 
1526  if (!fileListLoopMode_)
1527  ls = boost::lexical_cast<unsigned int>(fileStem);
1528  else //always starting from LS 1 in loop mode
1529  ls = 1 + loopModeIterationInc_;
1530 
1531  //fsize = 0;
1532  //lockWaitTime = 0;
1533  fileListIndex_++;
1535  } else {
1536  if (!fileListLoopMode_)
1538  else {
1539  //loop through files until interrupted
1541  fileListIndex_ = 0;
1542  return getFile(ls, nextFile, fsize, lockWaitTime);
1543  }
1544  }
1545 }
std::vector< std::string > fileNames_
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
def ls(path, rec=False)
Definition: eostools.py:349
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent ( )
inlineprivate

Definition at line 341 of file FedRawDataInputSource.cc.

References bufferInputRead_, chunkIsFree_, crc32c(), currentFile_, currentFileIndex_, currentLumiSection_, cvWakeup_, daqDirector_, detectedFRDversion_, event_, eventChunkSize_, eventsThisLumi_, Exception, fileDeleteLock_, fileListMode_, fileQueue_, filesToDelete_, fms_, FRDHeaderVersionSize, freeChunks_, getLSFromFilename_, evf::FastMonitoringThread::inCachedEvent, evf::FastMonitoringThread::inChecksumEvent, evf::FastMonitoringThread::inChunkReceived, evf::FastMonitoringThread::inNewLumi, evf::FastMonitoringThread::inProcessingFile, evf::FastMonitoringThread::inRunEnd, evf::FastMonitoringThread::inWaitChunk, evf::FastMonitoringThread::inWaitInput, evf::EvFDaqDirector::isSingleStreamThread(), dttmaxenums::L, maybeOpenNewLumiSection(), eostools::move(), mWakeup_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, readingFilesCount_, readNextChunkIntoBuffer(), reportEventsThisLumiInSource(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, evf::EvFDaqDirector::sameFile, evf::FastMonitoringService::setExceptionDetected(), setExceptionState_, evf::FastMonitoringService::setInState(), singleBufferMode_, mps_update::status, AlCaHLTBitMon_QueryRunRegistry::string, threadError(), mps_check::timeout, and verifyChecksum_.

Referenced by nextEvent().

341  {
342  if (setExceptionState_)
343  threadError();
344  if (!currentFile_.get()) {
346  if (fms_)
348  if (!fileQueue_.try_pop(currentFile_)) {
349  //sleep until wakeup (only in single-buffer mode) or timeout
350  std::unique_lock<std::mutex> lkw(mWakeup_);
351  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_.get())
353  }
354  status = currentFile_->status_;
355  if (status == evf::EvFDaqDirector::runEnded) {
356  if (fms_)
358  currentFile_.release();
359  return status;
360  } else if (status == evf::EvFDaqDirector::runAbort) {
361  throw cms::Exception("FedRawDataInputSource::getNextEvent")
362  << "Run has been aborted by the input source reader thread";
363  } else if (status == evf::EvFDaqDirector::newLumi) {
364  if (fms_)
366  if (getLSFromFilename_) {
367  if (currentFile_->lumi_ > currentLumiSection_) {
369  eventsThisLumi_ = 0;
371  }
372  } else { //let this be picked up from next event
374  }
375 
376  currentFile_.release();
377  return status;
378  } else if (status == evf::EvFDaqDirector::newFile) {
380  } else
381  assert(false);
382  }
383  if (fms_)
385 
386  //file is empty
387  if (!currentFile_->fileSize_) {
389  //try to open new lumi
390  assert(currentFile_->nChunks_ == 0);
391  if (getLSFromFilename_)
392  if (currentFile_->lumi_ > currentLumiSection_) {
394  eventsThisLumi_ = 0;
396  }
397  //immediately delete empty file
398  std::string currentName = currentFile_->fileName_;
399  currentFile_.release();
401  }
402 
403  //file is finished
404  if (currentFile_->bufferPosition_ == currentFile_->fileSize_) {
406  //release last chunk (it is never released elsewhere)
407  freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]);
408  if (currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_)) {
409  throw cms::Exception("FedRawDataInputSource::getNextEvent")
410  << "Fully processed " << currentFile_->nProcessed_ << " from the file " << currentFile_->fileName_
411  << " but according to BU JSON there should be " << currentFile_->nEvents_ << " events";
412  }
413  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
414  if (singleBufferMode_) {
415  std::unique_lock<std::mutex> lkw(mWakeup_);
416  cvWakeup_.notify_one();
417  }
418  bufferInputRead_ = 0;
420  //put the file in pending delete list;
421  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
422  filesToDelete_.push_back(std::pair<int, std::unique_ptr<InputFile>>(currentFileIndex_, std::move(currentFile_)));
423  } else {
424  //in single-thread and stream jobs, events are already processed
425  std::string currentName = currentFile_->fileName_;
426  currentFile_.release();
427  }
429  }
430 
431  //handle RAW file header
432  if (currentFile_->bufferPosition_ == 0 && currentFile_->rawHeaderSize_ > 0) {
433  if (currentFile_->fileSize_ <= currentFile_->rawHeaderSize_) {
434  if (currentFile_->fileSize_ < currentFile_->rawHeaderSize_)
435  throw cms::Exception("FedRawDataInputSource::getNextEvent")
436  << "Premature end of input file while reading file header";
437 
438  edm::LogWarning("FedRawDataInputSource")
439  << "File with only raw header and no events received in LS " << currentFile_->lumi_;
440  if (getLSFromFilename_)
441  if (currentFile_->lumi_ > currentLumiSection_) {
443  eventsThisLumi_ = 0;
445  }
446  }
447 
448  //advance buffer position to skip file header (chunk will be acquired later)
449  currentFile_->chunkPosition_ += currentFile_->rawHeaderSize_;
450  currentFile_->bufferPosition_ += currentFile_->rawHeaderSize_;
451  }
452 
453  //file is too short
454  if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < FRDHeaderVersionSize[detectedFRDversion_]) {
455  throw cms::Exception("FedRawDataInputSource::getNextEvent")
456  << "Premature end of input file while reading event header";
457  }
458  if (singleBufferMode_) {
459  //should already be there
460  if (fms_)
462  while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
463  usleep(10000);
464  if (currentFile_->parent_->exceptionState() || setExceptionState_)
465  currentFile_->parent_->threadError();
466  }
467  if (fms_)
469 
470  unsigned char* dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
471 
472  //conditions when read amount is not sufficient for the header to fit
473  if (!bufferInputRead_ || bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_] ||
474  eventChunkSize_ - currentFile_->chunkPosition_ < FRDHeaderVersionSize[detectedFRDversion_]) {
476 
477  if (detectedFRDversion_ == 0) {
478  detectedFRDversion_ = *((uint32*)dataPosition);
479  if (detectedFRDversion_ > 5)
480  throw cms::Exception("FedRawDataInputSource::getNextEvent")
481  << "Unknown FRD version -: " << detectedFRDversion_;
482  assert(detectedFRDversion_ >= 1);
483  }
484 
485  //recalculate chunk position
486  dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
487  if (bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_]) {
488  throw cms::Exception("FedRawDataInputSource::getNextEvent")
489  << "Premature end of input file while reading event header";
490  }
491  }
492 
493  event_.reset(new FRDEventMsgView(dataPosition));
494  if (event_->size() > eventChunkSize_) {
495  throw cms::Exception("FedRawDataInputSource::getNextEvent")
496  << " event id:" << event_->event() << " lumi:" << event_->lumi() << " run:" << event_->run()
497  << " of size:" << event_->size() << " bytes does not fit into a chunk of size:" << eventChunkSize_
498  << " bytes";
499  }
500 
501  const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];
502 
503  if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize) {
504  throw cms::Exception("FedRawDataInputSource::getNextEvent")
505  << "Premature end of input file while reading event data";
506  }
507  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
509  //recalculate chunk position
510  dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
511  event_.reset(new FRDEventMsgView(dataPosition));
512  }
513  currentFile_->bufferPosition_ += event_->size();
514  currentFile_->chunkPosition_ += event_->size();
515  //last chunk is released when this function is invoked next time
516 
517  }
518  //multibuffer mode:
519  else {
520  //wait for the current chunk to become added to the vector
521  if (fms_)
523  while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
524  usleep(10000);
525  if (setExceptionState_)
526  threadError();
527  }
528  if (fms_)
530 
531  //check if header is at the boundary of two chunks
532  chunkIsFree_ = false;
533  unsigned char* dataPosition;
534 
535  //read header, copy it to a single chunk if necessary
536  bool chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_]);
537 
538  event_.reset(new FRDEventMsgView(dataPosition));
539  if (event_->size() > eventChunkSize_) {
540  throw cms::Exception("FedRawDataInputSource::getNextEvent")
541  << " event id:" << event_->event() << " lumi:" << event_->lumi() << " run:" << event_->run()
542  << " of size:" << event_->size() << " bytes does not fit into a chunk of size:" << eventChunkSize_
543  << " bytes";
544  }
545 
546  const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];
547 
548  if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize) {
549  throw cms::Exception("FedRawDataInputSource::getNextEvent")
550  << "Premature end of input file while reading event data";
551  }
552 
553  if (chunkEnd) {
554  //header was at the chunk boundary, we will have to move payload as well
555  currentFile_->moveToPreviousChunk(msgSize, FRDHeaderVersionSize[detectedFRDversion_]);
556  chunkIsFree_ = true;
557  } else {
558  //header was contiguous, but check if payload fits the chunk
559  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
560  //rewind to header start position
561  currentFile_->rewindChunk(FRDHeaderVersionSize[detectedFRDversion_]);
562  //copy event to a chunk start and move pointers
563  chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_] + msgSize);
564  assert(chunkEnd);
565  chunkIsFree_ = true;
566  //header is moved
567  event_.reset(new FRDEventMsgView(dataPosition));
568  } else {
569  //everything is in a single chunk, only move pointers forward
570  chunkEnd = currentFile_->advance(dataPosition, msgSize);
571  assert(!chunkEnd);
572  chunkIsFree_ = false;
573  }
574  }
575  } //end multibuffer mode
576  if (fms_)
578 
579  if (verifyChecksum_ && event_->version() >= 5) {
580  uint32_t crc = 0;
581  crc = crc32c(crc, (const unsigned char*)event_->payload(), event_->eventSize());
582  if (crc != event_->crc32c()) {
583  if (fms_)
585  throw cms::Exception("FedRawDataInputSource::getNextEvent")
586  << "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() << " but calculated 0x"
587  << crc;
588  }
589  } else if (verifyChecksum_ && event_->version() >= 3) {
590  uint32_t adler = adler32(0L, Z_NULL, 0);
591  adler = adler32(adler, (Bytef*)event_->payload(), event_->eventSize());
592 
593  if (adler != event_->adler32()) {
594  if (fms_)
596  throw cms::Exception("FedRawDataInputSource::getNextEvent")
597  << "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() << " but calculated 0x"
598  << adler;
599  }
600  }
601  if (fms_)
603 
604  currentFile_->nProcessed_++;
605 
607 }
std::condition_variable cvWakeup_
void setExceptionDetected(unsigned int ls)
void maybeOpenNewLumiSection(const uint32_t lumiSection)
void setInState(FastMonitoringThread::InputState inputState)
const uint32 FRDHeaderVersionSize[6]
tbb::concurrent_queue< std::unique_ptr< InputFile > > fileQueue_
int timeout
Definition: mps_check.py:53
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
unsigned int uint32
Definition: MsgTools.h:13
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
Definition: crc32c.cc:340
std::unique_ptr< FRDEventMsgView > event_
evf::EvFDaqDirector * daqDirector_
std::atomic< unsigned int > readingFilesCount_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
evf::FastMonitoringService * fms_
tbb::concurrent_queue< InputChunk * > freeChunks_
void readNextChunkIntoBuffer(InputFile *file)
std::unique_ptr< InputFile > currentFile_
def move(src, dest)
Definition: eostools.py:511
long FedRawDataInputSource::initFileList ( )
private

Definition at line 1479 of file FedRawDataInputSource.cc.

References a, b, end, MillePedeFileConverter_cfg::fileName, fileNames_, castor_dqm_sourceclient_file_cfg::path, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FedRawDataInputSource().

1479  {
1480  std::sort(fileNames_.begin(), fileNames_.end(), [](std::string a, std::string b) {
1481  if (a.rfind("/") != std::string::npos)
1482  a = a.substr(a.rfind("/"));
1483  if (b.rfind("/") != std::string::npos)
1484  b = b.substr(b.rfind("/"));
1485  return b > a;
1486  });
1487 
1488  if (!fileNames_.empty()) {
1489  //get run number from first file in the vector
1491  std::string fileStem = fileName.stem().string();
1492  auto end = fileStem.find("_");
1493  if (fileStem.find("run") == 0) {
1494  std::string runStr = fileStem.substr(3, end - 3);
1495  try {
1496  //get long to support test run numbers < 2^32
1497  long rval = boost::lexical_cast<long>(runStr);
1498  edm::LogInfo("FedRawDataInputSource") << "Autodetected run number in fileListMode -: " << rval;
1499  return rval;
1500  } catch (boost::bad_lexical_cast const&) {
1501  edm::LogWarning("FedRawDataInputSource")
1502  << "Unable to autodetect run number in fileListMode from file -: " << fileName;
1503  }
1504  }
1505  }
1506  return -1;
1507 }
std::vector< std::string > fileNames_
#define end
Definition: vmac.h:39
unsigned long long int rval
Definition: vlib.h:21
double b
Definition: hdecay.h:118
double a
Definition: hdecay.h:119
void FedRawDataInputSource::maybeOpenNewLumiSection ( const uint32_t  lumiSection)
private

Definition at line 295 of file FedRawDataInputSource.cc.

References evf::EvFDaqDirector::createBoLSFile(), currentLumiSection_, daqDirector_, newFWLiteAna::found, evf::EvFDaqDirector::getEoLSFilePathOnFU(), edm::Timestamp::invalidTimestamp(), evf::EvFDaqDirector::lockFULocal2(), edm::InputSource::luminosityBlock(), edm::InputSource::luminosityBlockAuxiliary(), processHistoryID_, edm::InputSource::resetLuminosityBlockAuxiliary(), edm::InputSource::run(), edm::InputSource::runAuxiliary(), edm::InputSource::setLuminosityBlockAuxiliary(), hgcalPlots::stat, AlCaHLTBitMon_QueryRunRegistry::string, evf::EvFDaqDirector::unlockFULocal2(), and useFileBroker_.

Referenced by checkNextEvent(), and getNextEvent().

295  {
296  if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
297  if (!useFileBroker_) {
298  if (currentLumiSection_ > 0) {
300  struct stat buf;
301  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
302  if (!found) {
304  int eol_fd =
305  open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
306  close(eol_fd);
307  daqDirector_->createBoLSFile(lumiSection, false);
309  }
310  } else
311  daqDirector_->createBoLSFile(lumiSection, true); //needed for initial lumisection
312  }
313 
314  currentLumiSection_ = lumiSection;
315 
317 
318  timeval tv;
319  gettimeofday(&tv, nullptr);
320  const edm::Timestamp lsopentime((unsigned long long)tv.tv_sec * 1000000 + (unsigned long long)tv.tv_usec);
321 
323  runAuxiliary()->run(), lumiSection, lsopentime, edm::Timestamp::invalidTimestamp());
324 
325  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
326  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
327 
328  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: " << lumiSection;
329  }
330 }
static Timestamp invalidTimestamp()
Definition: Timestamp.h:82
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:436
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:333
edm::ProcessHistoryID processHistoryID_
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:441
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:239
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:341
evf::EvFDaqDirector * daqDirector_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:242
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::nextEvent ( )
inlineprivate

Definition at line 332 of file FedRawDataInputSource.cc.

References getNextEvent(), svgfig::load(), evf::EvFDaqDirector::noFile, edm::shutdown_flag, and mps_update::status.

Referenced by checkNextEvent().

332  {
334  while ((status = getNextEvent()) == evf::EvFDaqDirector::noFile) {
335  if (edm::shutdown_flag.load(std::memory_order_relaxed))
336  break;
337  }
338  return status;
339 }
volatile std::atomic< bool > shutdown_flag
def load(fileName)
Definition: svgfig.py:547
evf::EvFDaqDirector::FileStatus getNextEvent()
void FedRawDataInputSource::read ( edm::EventPrincipal eventPrincipal)
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 609 of file FedRawDataInputSource.cc.

References printConversionInfo::aux, edm::DaqProvenanceHelper::branchDescription(), checkEvery_, chunkIsFree_, watchdog::const, currentFile_, currentFileIndex_, currentLumiSection_, daqProvenanceHelper_, edm::DaqProvenanceHelper::dummyProvenance(), eventID_, eventRunNumber_, eventsThisLumi_, fileDeleteLock_, fileListLoopMode_, filesToDelete_, fillFEDRawDataCollection(), fms_, freeChunks_, GTPEventID_, mps_fire::i, evf::FastMonitoringThread::inNoRequest, evf::FastMonitoringThread::inReadCleanup, evf::FastMonitoringThread::inReadEvent, L1EventID_, FEDHeader::length, edm::RawInputSource::makeEvent(), evf::evtn::makeEventAuxiliary(), eostools::move(), nStreams_, edm::EventAuxiliary::PhysicsTrigger, edm::InputSource::processGUID(), processHistoryID_, edm::EventPrincipal::put(), l1tstage2_dqm_sourceclient-live_cfg::rawData, evf::FastMonitoringService::setInState(), edm::EventAuxiliary::setProcessHistoryID(), streamFileTracker_, edm::EventPrincipal::streamID(), AlCaHLTBitMon_QueryRunRegistry::string, tcds_pointer_, FEDHeader::triggerType(), and useL1EventID_.

Referenced by edmIntegrityCheck.PublishToFileSystem::get(), Vispa.Plugins.EdmBrowser.EdmDataAccessor.EdmDataAccessor::goto(), readNextChunkIntoBuffer(), readWorker(), and Vispa.Plugins.EdmBrowser.EdmDataAccessor.EdmDataAccessor::setFilterBranches().

609  {
610  if (fms_)
612  std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
614 
615  if (useL1EventID_) {
618  aux.setProcessHistoryID(processHistoryID_);
619  makeEvent(eventPrincipal, aux);
620  } else if (tcds_pointer_ == nullptr) {
621  assert(GTPEventID_);
624  aux.setProcessHistoryID(processHistoryID_);
625  makeEvent(eventPrincipal, aux);
626  } else {
627  const FEDHeader fedHeader(tcds_pointer_);
628  tcds::Raw_v1 const* tcds = reinterpret_cast<tcds::Raw_v1 const*>(tcds_pointer_ + FEDHeader::length);
633  static_cast<edm::EventAuxiliary::ExperimentType>(fedHeader.triggerType()),
634  processGUID(),
637  makeEvent(eventPrincipal, aux);
638  }
639 
640  std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
641 
643 
644  eventsThisLumi_++;
645  if (fms_)
647 
648  //resize vector if needed
649  while (streamFileTracker_.size() <= eventPrincipal.streamID())
650  streamFileTracker_.push_back(-1);
651 
652  streamFileTracker_[eventPrincipal.streamID()] = currentFileIndex_;
653 
654  //this old file check runs no more often than every 10 events
655  if (!((currentFile_->nProcessed_ - 1) % (checkEvery_))) {
656  //delete files that are not in processing
657  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
658  auto it = filesToDelete_.begin();
659  while (it != filesToDelete_.end()) {
660  bool fileIsBeingProcessed = false;
661  for (unsigned int i = 0; i < nStreams_; i++) {
662  if (it->first == streamFileTracker_.at(i)) {
663  fileIsBeingProcessed = true;
664  break;
665  }
666  }
667  if (!fileIsBeingProcessed) {
668  std::string fileToDelete = it->second->fileName_;
669  //it->second.release();
670  it = filesToDelete_.erase(it);
671  } else
672  it++;
673  }
674  }
675  if (chunkIsFree_)
676  freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_ - 1]);
677  chunkIsFree_ = false;
678  if (fms_)
680  return;
681 }
edm::EventAuxiliary makeEventAuxiliary(const tcds::Raw_v1 *, unsigned int runNumber, unsigned int lumiSection, const edm::EventAuxiliary::ExperimentType &, const std::string &processGUID, bool verifyLumiSection)
static const uint32_t length
Definition: FEDHeader.h:54
void setInState(FastMonitoringThread::InputState inputState)
ProductProvenance const & dummyProvenance() const
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:204
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
Definition: TCDSRaw.h:16
std::vector< int > streamFileTracker_
StreamID streamID() const
BranchDescription const & branchDescription() const
edm::ProcessHistoryID processHistoryID_
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance) const
const edm::DaqProvenanceHelper daqProvenanceHelper_
void setProcessHistoryID(ProcessHistoryID const &phid)
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
evf::FastMonitoringService * fms_
tbb::concurrent_queue< InputChunk * > freeChunks_
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection &)
std::unique_ptr< InputFile > currentFile_
def move(src, dest)
Definition: eostools.py:511
void FedRawDataInputSource::readNextChunkIntoBuffer ( InputFile file)
private

Definition at line 1401 of file FedRawDataInputSource.cc.

References bufferInputRead_, InputFile::chunkPosition_, InputFile::chunks_, eventChunkBlock_, eventChunkSize_, Exception, fileDescriptor_, InputFile::fileName_, InputFile::fileSize_, mps_fire::i, dqmdumpme::last, LogDebug, InputFile::rawFd_, InputFile::rawHeaderSize_, read(), and readBlocks_.

Referenced by getNextEvent().

1401  {
1402  uint32_t existingSize = 0;
1403 
1404  if (fileDescriptor_ < 0) {
1405  bufferInputRead_ = 0;
1406  if (file->rawFd_ == -1)
1407  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1408  else {
1409  fileDescriptor_ = file->rawFd_;
1410  //skip header size in destination buffer (chunk position was already adjusted)
1412  existingSize += file->rawHeaderSize_;
1413  }
1414  //off_t pos = lseek(fileDescriptor,0,SEEK_SET);
1415  if (fileDescriptor_ >= 0)
1416  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1417  else {
1418  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer")
1419  << "failed to open file " << std::endl
1420  << file->fileName_ << " fd:" << fileDescriptor_;
1421  }
1422  }
1423 
1424  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1425  for (unsigned int i = 0; i < readBlocks_; i++) {
1426  const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1428  existingSize += last;
1429  }
1430  } else {
1431  const uint32_t chunksize = file->chunkPosition_;
1432  const uint32_t blockcount = chunksize / eventChunkBlock_;
1433  const uint32_t leftsize = chunksize % eventChunkBlock_;
1434  uint32_t existingSizeLeft = eventChunkSize_ - file->chunkPosition_;
1435  memmove((void*)file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSizeLeft);
1436 
1437  for (uint32_t i = 0; i < blockcount; i++) {
1438  const ssize_t last =
1439  ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), eventChunkBlock_);
1441  existingSizeLeft += last;
1442  }
1443  if (leftsize) {
1444  const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), leftsize);
1446  }
1447  file->chunkPosition_ = 0; //data was moved to beginning of the chunk
1448  }
1449  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1450  if (fileDescriptor_ != -1) {
1451  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1452  close(fileDescriptor_);
1453  file->rawFd_ = fileDescriptor_ = -1;
1454  }
1455  }
1456 }
#define LogDebug(id)
uint32_t chunkPosition_
void read(edm::EventPrincipal &eventPrincipal) override
uint16_t rawHeaderSize_
std::string fileName_
tbb::concurrent_vector< InputChunk * > chunks_
void FedRawDataInputSource::readSupervisor ( )
private

Definition at line 737 of file FedRawDataInputSource.cc.

References alwaysStartFromFirstLS_, cvReader_, cvWakeup_, daqDirector_, relativeConstraints::empty, eventChunkSize_, fileListMode_, fileQueue_, fms_, freeChunks_, getFile(), getLSFromFilename_, evf::EvFDaqDirector::getLumisectionToStart(), evf::EvFDaqDirector::getNextFromFileBroker(), evf::EvFDaqDirector::grabNextJsonFileAndUnlock(), mps_fire::i, dqmiodatasetharvest::inf, InputFile, evf::FastMonitoringThread::inRunEnd, evf::FastMonitoringThread::inSupBusy, evf::FastMonitoringThread::inSupFileLimit, evf::FastMonitoringThread::inSupLockPolling, evf::FastMonitoringThread::inSupNewFile, evf::FastMonitoringThread::inSupNewFileWaitChunk, evf::FastMonitoringThread::inSupNewFileWaitChunkCopying, evf::FastMonitoringThread::inSupNewFileWaitThread, evf::FastMonitoringThread::inSupNewFileWaitThreadCopying, evf::FastMonitoringThread::inSupNoFile, evf::FastMonitoringThread::inSupWaitFreeChunk, evf::FastMonitoringThread::inSupWaitFreeChunkCopying, evf::FastMonitoringThread::inSupWaitFreeThread, evf::FastMonitoringThread::inSupWaitFreeThreadCopying, createfilelist::int, dqmiolumiharvest::j, LogDebug, eostools::ls(), maxBufferedFiles_, min(), eostools::move(), mReader_, mWakeup_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, evf::EvFDaqDirector::parseFRDFileHeader(), castor_dqm_sourceclient_file_cfg::path, funct::pow(), quit_threads_, InputChunk::readComplete_, readingFilesCount_, evf::FastMonitoringService::reportLockWait(), InputChunk::reset(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, setExceptionState_, evf::FastMonitoringService::setInStateSup(), edm::shutdown_flag, singleBufferMode_, evf::FastMonitoringService::startedLookingForFile(), startupCv_, startupLock_, hgcalPlots::stat, mps_update::status, evf::FastMonitoringService::stoppedLookingForFile(), AlCaHLTBitMon_QueryRunRegistry::string, thread_quit_signal, tid_active_, mps_check::timeout, evf::EvFDaqDirector::updateFuLock(), useFileBroker_, workerJob_, workerPool_, and workerThreads_.

Referenced by checkNextEvent().

737  {
738  bool stop = false;
739  unsigned int currentLumiSection = 0;
740  //threadInit_.exchange(true,std::memory_order_acquire);
741 
742  {
743  std::unique_lock<std::mutex> lk(startupLock_);
744  startupCv_.notify_one();
745  }
746 
747  uint32_t ls = 0;
748  uint32_t monLS = 1;
749  uint32_t lockCount = 0;
750  uint64_t sumLockWaitTimeUs = 0.;
751 
752  while (!stop) {
753  //wait for at least one free thread and chunk
754  int counter = 0;
755  while ((workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty() ||
757  //report state to monitoring
758  if (fms_) {
759  bool copy_active = false;
760  for (auto j : tid_active_)
761  if (j)
762  copy_active = true;
765  else if (freeChunks_.empty()) {
766  if (copy_active)
768  else
770  } else {
771  if (copy_active)
773  else
775  }
776  }
777  std::unique_lock<std::mutex> lkw(mWakeup_);
778  //sleep until woken up by condition or a timeout
779  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
780  counter++;
781  //if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads...";
782  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
783  } else {
784  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
785  }
786  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
787  stop = true;
788  break;
789  }
790  }
791  //if this is reached, there are enough buffers and threads to proceed or processing is instructed to stop
792 
793  if (stop)
794  break;
795 
796  //look for a new file
797  std::string nextFile;
798  uint32_t fileSizeIndex;
799  int64_t fileSizeFromMetadata;
800 
801  if (fms_) {
805  }
806 
808  uint16_t rawHeaderSize = 0;
809  uint32_t lsFromRaw = 0;
810  int32_t serverEventsInNewFile = -1;
811  int rawFd = -1;
812 
813  int backoff_exp = 0;
814 
815  //entering loop which tries to grab new file from ramdisk
816  while (status == evf::EvFDaqDirector::noFile) {
817  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
818  stop = true;
819  break;
820  }
821 
822  assert(rawFd == -1);
823  uint64_t thisLockWaitTimeUs = 0.;
824  if (fileListMode_) {
825  //return LS if LS not set, otherwise return file
826  status = getFile(ls, nextFile, fileSizeIndex, thisLockWaitTimeUs);
827  if (status == evf::EvFDaqDirector::newFile) {
829  rawFd,
830  rawHeaderSize,
831  lsFromRaw,
832  serverEventsInNewFile,
833  fileSizeFromMetadata,
834  false,
835  false,
836  false) != 0) {
837  //error
838  setExceptionState_ = true;
839  stop = true;
840  break;
841  }
842  if (!getLSFromFilename_)
843  ls = lsFromRaw;
844  }
845  } else if (!useFileBroker_)
846  status = daqDirector_->updateFuLock(ls, nextFile, fileSizeIndex, thisLockWaitTimeUs);
847  else {
848  status = daqDirector_->getNextFromFileBroker(currentLumiSection,
849  ls,
850  nextFile,
851  rawFd,
852  rawHeaderSize,
853  serverEventsInNewFile,
854  fileSizeFromMetadata,
855  thisLockWaitTimeUs);
856  }
857 
858  if (fms_)
860 
861  //cycle through all remaining LS even if no files get assigned
862  if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
864 
865  //monitoring of lock wait time
866  if (thisLockWaitTimeUs > 0.)
867  sumLockWaitTimeUs += thisLockWaitTimeUs;
868  lockCount++;
869  if (ls > monLS) {
870  monLS = ls;
871  if (lockCount)
872  if (fms_)
873  fms_->reportLockWait(monLS, sumLockWaitTimeUs, lockCount);
874  lockCount = 0;
875  sumLockWaitTimeUs = 0;
876  }
877 
878  //check again for any remaining index/EoLS files after EoR file is seen
880  if (fms_)
882  usleep(100000);
883  //now all files should have appeared in ramdisk, check again if any raw files were left behind
884  status = daqDirector_->updateFuLock(ls, nextFile, fileSizeIndex, thisLockWaitTimeUs);
885  if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
887  }
888 
889  if (status == evf::EvFDaqDirector::runEnded) {
890  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runEnded));
891  fileQueue_.push(std::move(inf));
892  stop = true;
893  break;
894  }
895 
896  //error from filelocking function
897  if (status == evf::EvFDaqDirector::runAbort) {
898  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
899  fileQueue_.push(std::move(inf));
900  stop = true;
901  break;
902  }
903  //queue new lumisection
904  if (getLSFromFilename_) {
905  if (ls > currentLumiSection) {
906  if (!useFileBroker_) {
907  //file locking
908  //fms_->setInStateSup(evf::FastMonitoringThread::inSupNewLumi);
909  currentLumiSection = ls;
910  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
911  fileQueue_.push(std::move(inf));
912  } else {
913  //new file service
914  if (currentLumiSection == 0 && !alwaysStartFromFirstLS_) {
915  if (ls < 100) {
916  //look at last LS file on disk to start from that lumisection (only within first 100 LS)
917  unsigned int lsToStart = daqDirector_->getLumisectionToStart();
918 
919  for (unsigned int nextLS = lsToStart; nextLS <= ls; nextLS++) {
920  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
921  fileQueue_.push(std::move(inf));
922  }
923  } else {
924  //start from current LS
925  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, ls));
926  fileQueue_.push(std::move(inf));
927  }
928  } else {
929  //queue all lumisections after last one seen to avoid gaps
930  for (unsigned int nextLS = currentLumiSection + 1; nextLS <= ls; nextLS++) {
931  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
932  fileQueue_.push(std::move(inf));
933  }
934  }
935  currentLumiSection = ls;
936  }
937  }
938  //else
939  if (currentLumiSection > 0 && ls < currentLumiSection) {
940  edm::LogError("FedRawDataInputSource")
941  << "Got old LS (" << ls << ") file from EvFDAQDirector! Expected LS:" << currentLumiSection
942  << ". Aborting execution." << std::endl;
943  if (rawFd != -1)
944  close(rawFd);
945  rawFd = -1;
946  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
947  fileQueue_.push(std::move(inf));
948  stop = true;
949  break;
950  }
951  }
952 
953  int dbgcount = 0;
954  if (status == evf::EvFDaqDirector::noFile) {
955  if (fms_)
957  dbgcount++;
958  if (!(dbgcount % 20))
959  LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
960  if (!useFileBroker_)
961  usleep(100000);
962  else {
963  backoff_exp = std::min(4, backoff_exp); // max 1.6 seconds
964  //backoff_exp=0; // disabled!
965  int sleeptime = (int)(100000. * pow(2, backoff_exp));
966  usleep(sleeptime);
967  backoff_exp++;
968  }
969  } else
970  backoff_exp = 0;
971  }
972  //end of file grab loop, parse result
973  if (status == evf::EvFDaqDirector::newFile) {
974  if (fms_)
976  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
977 
978  std::string rawFile;
979  //file service will report raw extension
980  if (useFileBroker_)
981  rawFile = nextFile;
982  else {
983  boost::filesystem::path rawFilePath(nextFile);
984  rawFile = rawFilePath.replace_extension(".raw").string();
985  }
986 
987  struct stat st;
988  int stat_res = stat(rawFile.c_str(), &st);
989  if (stat_res == -1) {
990  edm::LogError("FedRawDataInputSource") << "Can not stat file (" << errno << "):-" << rawFile << std::endl;
991  setExceptionState_ = true;
992  break;
993  }
994  uint64_t fileSize = st.st_size;
995 
996  if (fms_) {
1000  }
1001  int eventsInNewFile;
1002  if (fileListMode_) {
1003  if (fileSize == 0)
1004  eventsInNewFile = 0;
1005  else
1006  eventsInNewFile = -1;
1007  } else {
1009  if (!useFileBroker_)
1010  eventsInNewFile = daqDirector_->grabNextJsonFileAndUnlock(nextFile);
1011  else
1012  eventsInNewFile = serverEventsInNewFile;
1013  assert(eventsInNewFile >= 0);
1014  assert((eventsInNewFile > 0) ==
1015  (fileSize > rawHeaderSize)); //file without events must be empty or contain only header
1016  }
1017 
1018  if (!singleBufferMode_) {
1019  //calculate number of needed chunks
1020  unsigned int neededChunks = fileSize / eventChunkSize_;
1021  if (fileSize % eventChunkSize_)
1022  neededChunks++;
1023 
1024  std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1025  ls,
1026  rawFile,
1027  !fileListMode_,
1028  rawFd,
1029  fileSize,
1030  rawHeaderSize,
1031  neededChunks,
1032  eventsInNewFile,
1033  this));
1035  auto newInputFilePtr = newInputFile.get();
1036  fileQueue_.push(std::move(newInputFile));
1037 
1038  for (unsigned int i = 0; i < neededChunks; i++) {
1039  if (fms_) {
1040  bool copy_active = false;
1041  for (auto j : tid_active_)
1042  if (j)
1043  copy_active = true;
1044  if (copy_active)
1046  else
1048  }
1049  //get thread
1050  unsigned int newTid = 0xffffffff;
1051  while (!workerPool_.try_pop(newTid)) {
1052  usleep(100000);
1053  if (quit_threads_.load(std::memory_order_relaxed)) {
1054  stop = true;
1055  break;
1056  }
1057  }
1058 
1059  if (fms_) {
1060  bool copy_active = false;
1061  for (auto j : tid_active_)
1062  if (j)
1063  copy_active = true;
1064  if (copy_active)
1066  else
1068  }
1069  InputChunk* newChunk = nullptr;
1070  while (!freeChunks_.try_pop(newChunk)) {
1071  usleep(100000);
1072  if (quit_threads_.load(std::memory_order_relaxed)) {
1073  stop = true;
1074  break;
1075  }
1076  }
1077 
1078  if (newChunk == nullptr) {
1079  //return unused tid if we received shutdown (nullptr chunk)
1080  if (newTid != 0xffffffff)
1081  workerPool_.push(newTid);
1082  stop = true;
1083  break;
1084  }
1085  if (stop)
1086  break;
1087  if (fms_)
1089 
1090  std::unique_lock<std::mutex> lk(mReader_);
1091 
1092  unsigned int toRead = eventChunkSize_;
1093  if (i == neededChunks - 1 && fileSize % eventChunkSize_)
1094  toRead = fileSize % eventChunkSize_;
1095  newChunk->reset(i * eventChunkSize_, toRead, i);
1096 
1097  workerJob_[newTid].first = newInputFilePtr;
1098  workerJob_[newTid].second = newChunk;
1099 
1100  //wake up the worker thread
1101  cvReader_[newTid]->notify_one();
1102  }
1103  } else {
1104  if (!eventsInNewFile) {
1105  if (rawFd) {
1106  close(rawFd);
1107  rawFd = -1;
1108  }
1109  //still queue file for lumi update
1110  std::unique_lock<std::mutex> lkw(mWakeup_);
1111  //TODO: also file with only file header fits in this edge case. Check if read correctly in single buffer mode
1112  std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1113  ls,
1114  rawFile,
1115  !fileListMode_,
1116  rawFd,
1117  fileSize,
1118  rawHeaderSize,
1119  (rawHeaderSize > 0),
1120  0,
1121  this));
1123  fileQueue_.push(std::move(newInputFile));
1124  cvWakeup_.notify_one();
1125  break;
1126  }
1127  //in single-buffer mode put single chunk in the file and let the main thread read the file
1128  InputChunk* newChunk;
1129  //should be available immediately
1130  while (!freeChunks_.try_pop(newChunk)) {
1131  usleep(100000);
1132  if (quit_threads_.load(std::memory_order_relaxed))
1133  break;
1134  }
1135 
1136  std::unique_lock<std::mutex> lkw(mWakeup_);
1137 
1138  unsigned int toRead = eventChunkSize_;
1139  if (fileSize % eventChunkSize_)
1140  toRead = fileSize % eventChunkSize_;
1141  newChunk->reset(0, toRead, 0);
1142  newChunk->readComplete_ = true;
1143 
1144  //push file and wakeup main thread
1145  std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1146  ls,
1147  rawFile,
1148  !fileListMode_,
1149  rawFd,
1150  fileSize,
1151  rawHeaderSize,
1152  1,
1153  eventsInNewFile,
1154  this));
1155  newInputFile->chunks_[0] = newChunk;
1157  fileQueue_.push(std::move(newInputFile));
1158  cvWakeup_.notify_one();
1159  }
1160  }
1161  }
1162  if (fms_)
1164  //make sure threads finish reading
1165  unsigned numFinishedThreads = 0;
1166  while (numFinishedThreads < workerThreads_.size()) {
1167  unsigned int tid;
1168  while (!workerPool_.try_pop(tid)) {
1169  usleep(10000);
1170  }
1171  std::unique_lock<std::mutex> lk(mReader_);
1172  thread_quit_signal[tid] = true;
1173  cvReader_[tid]->notify_one();
1174  numFinishedThreads++;
1175  }
1176  for (unsigned int i = 0; i < workerThreads_.size(); i++) {
1177  workerThreads_[i]->join();
1178  delete workerThreads_[i];
1179  }
1180 }
#define LogDebug(id)
std::condition_variable cvWakeup_
tbb::concurrent_queue< unsigned int > workerPool_
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
volatile std::atomic< bool > shutdown_flag
tbb::concurrent_queue< std::unique_ptr< InputFile > > fileQueue_
void setInStateSup(FastMonitoringThread::InputState inputState)
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs)
int grabNextJsonFileAndUnlock(boost::filesystem::path const &jsonSourcePath)
int timeout
Definition: mps_check.py:53
std::vector< std::condition_variable * > cvReader_
void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex)
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
T min(T a, T b)
Definition: MathUtil.h:58
std::vector< std::thread * > workerThreads_
std::atomic< bool > readComplete_
def ls(path, rec=False)
Definition: eostools.py:349
unsigned int getLumisectionToStart() const
unsigned long long uint64_t
Definition: Time.h:13
std::condition_variable startupCv_
void stoppedLookingForFile(unsigned int lumi)
evf::EvFDaqDirector * daqDirector_
std::vector< unsigned int > thread_quit_signal
std::atomic< unsigned int > readingFilesCount_
evf::FastMonitoringService * fms_
std::vector< unsigned int > tid_active_
tbb::concurrent_queue< InputChunk * > freeChunks_
Power< A, B >::type pow(const A &a, const B &b)
Definition: Power.h:30
def move(src, dest)
Definition: eostools.py:511
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
void FedRawDataInputSource::readWorker ( unsigned int  tid)
private

Definition at line 1182 of file FedRawDataInputSource.cc.

References InputChunk::buf_, InputFile::chunks_, cvReader_, detectedFRDversion_, change_name::diff, end, eventChunkBlock_, FrontierConditions_GlobalTag_cff::file, InputChunk::fileIndex_, InputFile::fileName_, InputFile::fileSize_, dqmdumpme::first, mps_fire::i, dqmdumpme::last, LogDebug, min(), mReader_, fileCollector::now, numConcurrentReads_, InputChunk::offset_, InputFile::rawFd_, InputFile::rawHeaderSize_, read(), readBlocks_, InputChunk::readComplete_, edm::second(), setExceptionState_, runEdmFileComparison::skipped, command_line::start, startupCv_, startupLock_, thread_quit_signal, threadInit_, tid_active_, InputChunk::usedSize_, workerJob_, and workerPool_.

Referenced by FedRawDataInputSource().

1182  {
1183  bool init = true;
1184  threadInit_.exchange(true, std::memory_order_acquire);
1185 
1186  while (true) {
1187  tid_active_[tid] = false;
1188  std::unique_lock<std::mutex> lk(mReader_);
1189  workerJob_[tid].first = nullptr;
1190  workerJob_[tid].first = nullptr;
1191 
1192  assert(!thread_quit_signal[tid]); //should never get it here
1193  workerPool_.push(tid);
1194 
1195  if (init) {
1196  std::unique_lock<std::mutex> lk(startupLock_);
1197  init = false;
1198  startupCv_.notify_one();
1199  }
1200  cvReader_[tid]->wait(lk);
1201 
1202  if (thread_quit_signal[tid])
1203  return;
1204  tid_active_[tid] = true;
1205 
1206  InputFile* file;
1207  InputChunk* chunk;
1208 
1209  assert(workerJob_[tid].first != nullptr && workerJob_[tid].second != nullptr);
1210 
1211  file = workerJob_[tid].first;
1212  chunk = workerJob_[tid].second;
1213 
1214  //skip reading initial header size in first chunk if inheriting file descriptor (already set at appropriate position)
1215  unsigned int bufferLeft = (chunk->offset_ == 0 && file->rawFd_ != 0) ? file->rawHeaderSize_ : 0;
1216 
1217  //if only one worker thread exists, use single fd for all operations
1218  //if more worker threads exist, use rawFd_ for only the first read operation and then close file
1219  int fileDescriptor;
1220  bool fileOpenedHere = false;
1221 
1222  if (numConcurrentReads_ == 1) {
1223  fileDescriptor = file->rawFd_;
1224  if (fileDescriptor == -1) {
1225  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1226  fileOpenedHere = true;
1227  file->rawFd_ = fileDescriptor;
1228  }
1229  } else {
1230  if (chunk->offset_ == 0) {
1231  fileDescriptor = file->rawFd_;
1232  file->rawFd_ = -1;
1233  if (fileDescriptor == -1) {
1234  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1235  fileOpenedHere = true;
1236  }
1237  } else {
1238  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1239  fileOpenedHere = true;
1240  }
1241  }
1242 
1243  if (fileDescriptor < 0) {
1244  edm::LogError("FedRawDataInputSource") << "readWorker failed to open file -: " << file->fileName_
1245  << " fd:" << fileDescriptor << " error: " << strerror(errno);
1246  setExceptionState_ = true;
1247  continue;
1248  }
1249  if (fileOpenedHere) { //fast forward to this chunk position
1250  off_t pos = 0;
1251  pos = lseek(fileDescriptor, chunk->offset_, SEEK_SET);
1252  if (pos == -1) {
1253  edm::LogError("FedRawDataInputSource")
1254  << "readWorker failed to seek file -: " << file->fileName_ << " fd:" << fileDescriptor << " to offset "
1255  << chunk->offset_ << " error: " << strerror(errno);
1256  setExceptionState_ = true;
1257  continue;
1258  }
1259  }
1260 
1261  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_
1262  << " at offset " << lseek(fileDescriptor, 0, SEEK_CUR);
1263 
1264  unsigned int skipped = bufferLeft;
1266  for (unsigned int i = 0; i < readBlocks_; i++) {
1267  ssize_t last;
1268 
1269  //protect against reading into next block
1270  last = ::read(
1271  fileDescriptor, (void*)(chunk->buf_ + bufferLeft), std::min(chunk->usedSize_ - bufferLeft, eventChunkBlock_));
1272 
1273  if (last < 0) {
1274  edm::LogError("FedRawDataInputSource") << "readWorker failed to read file -: " << file->fileName_
1275  << " fd:" << fileDescriptor << " error: " << strerror(errno);
1276  setExceptionState_ = true;
1277  break;
1278  }
1279  if (last > 0)
1280  bufferLeft += last;
1281  if (last < eventChunkBlock_) { //last read
1282  //check if this is last block, then total read size must match file size
1283  if (!(chunk->usedSize_ - skipped == i * eventChunkBlock_ + last)) {
1284  edm::LogError("FedRawDataInputSource")
1285  << "readWorker failed to read file -: " << file->fileName_ << " fd:" << fileDescriptor << " last:" << last
1286  << " expectedChunkSize:" << chunk->usedSize_
1287  << " readChunkSize:" << (skipped + i * eventChunkBlock_ + last) << " skipped:" << skipped
1288  << " block:" << (i + 1) << "/" << readBlocks_ << " error: " << strerror(errno);
1289  setExceptionState_ = true;
1290  }
1291  break;
1292  }
1293  }
1294  if (setExceptionState_)
1295  continue;
1296 
1298  auto diff = end - start;
1299  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1300  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1301  << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1302  << " GB/s)";
1303 
1304  if (chunk->offset_ + bufferLeft == file->fileSize_) { //file reading finished using same fd
1305  close(fileDescriptor);
1306  fileDescriptor = -1;
1307  if (numConcurrentReads_ == 1)
1308  file->rawFd_ = -1;
1309  }
1310  if (numConcurrentReads_ > 1 && fileDescriptor != -1)
1311  close(fileDescriptor);
1312 
1313  //detect FRD event version. Skip file Header if it exists
1314  if (detectedFRDversion_ == 0 && chunk->offset_ == 0) {
1315  detectedFRDversion_ = *((uint32*)(chunk->buf_ + file->rawHeaderSize_));
1316  }
1317  assert(detectedFRDversion_ <= 5);
1318  chunk->readComplete_ =
1319  true; //this is atomic to secure the sequential buffer fill before becoming available for processing)
1320  file->chunks_[chunk->fileIndex_] = chunk; //put the completed chunk in the file chunk vector at predetermined index
1321  }
1322 }
#define LogDebug(id)
Definition: start.py:1
void read(edm::EventPrincipal &eventPrincipal) override
unsigned int offset_
tbb::concurrent_queue< unsigned int > workerPool_
std::vector< ReaderInfo > workerJob_
uint16_t rawHeaderSize_
std::vector< std::condition_variable * > cvReader_
U second(std::pair< T, U > const &p)
unsigned char * buf_
#define end
Definition: vmac.h:39
T min(T a, T b)
Definition: MathUtil.h:58
unsigned int fileIndex_
unsigned int uint32
Definition: MsgTools.h:13
std::atomic< bool > readComplete_
Definition: init.py:1
std::string fileName_
std::condition_variable startupCv_
tbb::concurrent_vector< InputChunk * > chunks_
std::atomic< bool > threadInit_
std::vector< unsigned int > thread_quit_signal
std::vector< unsigned int > tid_active_
void FedRawDataInputSource::reportEventsThisLumiInSource ( unsigned int  lumi,
unsigned int  events 
)
private

Definition at line 1458 of file FedRawDataInputSource.cc.

References CommonMethods::lock(), monlock_, and sourceEventsReport_.

Referenced by checkNextEvent(), and getNextEvent().

1458  {
1459  std::lock_guard<std::mutex> lock(monlock_);
1460  auto itr = sourceEventsReport_.find(lumi);
1461  if (itr != sourceEventsReport_.end())
1462  itr->second += events;
1463  else
1465 }
std::map< unsigned int, unsigned int > sourceEventsReport_
void FedRawDataInputSource::rewind_ ( )
overrideprivatevirtual

Reimplemented from edm::InputSource.

Definition at line 735 of file FedRawDataInputSource.cc.

735 {}
void FedRawDataInputSource::threadError ( )
private

Definition at line 1324 of file FedRawDataInputSource.cc.

References Exception, and quit_threads_.

Referenced by getNextEvent().

1324  {
1325  quit_threads_ = true;
1326  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1327 }
std::atomic< bool > quit_threads_

Friends And Related Function Documentation

friend struct InputChunk
friend

Definition at line 38 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

friend struct InputFile
friend

Definition at line 37 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

Member Data Documentation

const bool FedRawDataInputSource::alwaysStartFromFirstLS_
private

Definition at line 92 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

uint32_t FedRawDataInputSource::bufferInputRead_ = 0
private

Definition at line 169 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readNextChunkIntoBuffer().

unsigned int FedRawDataInputSource::checkEvery_ = 10
private

Definition at line 160 of file FedRawDataInputSource.h.

Referenced by read().

bool FedRawDataInputSource::chunkIsFree_ = false
private

Definition at line 132 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

std::unique_ptr<InputFile> FedRawDataInputSource::currentFile_
private

Definition at line 131 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

int FedRawDataInputSource::currentFileIndex_ = -1
private

Definition at line 154 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

unsigned int FedRawDataInputSource::currentLumiSection_
private
std::vector<std::condition_variable*> FedRawDataInputSource::cvReader_
private
std::condition_variable FedRawDataInputSource::cvWakeup_
private

Definition at line 164 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

evf::EvFDaqDirector* FedRawDataInputSource::daqDirector_ = 0
private
const edm::DaqProvenanceHelper FedRawDataInputSource::daqProvenanceHelper_
private

Definition at line 107 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and read().

std::string FedRawDataInputSource::defPath_
private

Definition at line 80 of file FedRawDataInputSource.h.

uint32 FedRawDataInputSource::detectedFRDversion_ = 0
private

Definition at line 130 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readWorker().

std::unique_ptr<FRDEventMsgView> FedRawDataInputSource::event_
private

Definition at line 109 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), fillFEDRawDataCollection(), and getNextEvent().

unsigned int FedRawDataInputSource::eventChunkBlock_
private
unsigned int FedRawDataInputSource::eventChunkSize_
private
edm::EventID FedRawDataInputSource::eventID_
private

Definition at line 111 of file FedRawDataInputSource.h.

Referenced by read().

uint32_t FedRawDataInputSource::eventRunNumber_ = 0
private

Definition at line 115 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and read().

unsigned int FedRawDataInputSource::eventsThisLumi_
private

Definition at line 119 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), getNextEvent(), and read().

unsigned long FedRawDataInputSource::eventsThisRun_ = 0
private

Definition at line 120 of file FedRawDataInputSource.h.

std::mutex FedRawDataInputSource::fileDeleteLock_
private

Definition at line 157 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and read().

int FedRawDataInputSource::fileDescriptor_ = -1
private

Definition at line 168 of file FedRawDataInputSource.h.

Referenced by readNextChunkIntoBuffer().

unsigned int FedRawDataInputSource::fileListIndex_ = 0
private

Definition at line 100 of file FedRawDataInputSource.h.

Referenced by getFile().

const bool FedRawDataInputSource::fileListLoopMode_
private

Definition at line 101 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), FedRawDataInputSource(), getFile(), and read().

const bool FedRawDataInputSource::fileListMode_
private
std::vector<std::string> FedRawDataInputSource::fileNames_
private

Definition at line 95 of file FedRawDataInputSource.h.

Referenced by getFile(), and initFileList().

std::list<std::pair<int, std::string> > FedRawDataInputSource::fileNamesToDelete_
private

Definition at line 156 of file FedRawDataInputSource.h.

tbb::concurrent_queue<std::unique_ptr<InputFile> > FedRawDataInputSource::fileQueue_
private

Definition at line 142 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

std::list<std::pair<int, std::unique_ptr<InputFile> > > FedRawDataInputSource::filesToDelete_
private
evf::FastMonitoringService* FedRawDataInputSource::fms_ = 0
private
tbb::concurrent_queue<InputChunk*> FedRawDataInputSource::freeChunks_
private

Definition at line 141 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), read(), and readSupervisor().

std::string FedRawDataInputSource::fuOutputDir_
private

Definition at line 105 of file FedRawDataInputSource.h.

const bool FedRawDataInputSource::getLSFromFilename_
private

Definition at line 91 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), getNextEvent(), and readSupervisor().

uint32_t FedRawDataInputSource::GTPEventID_ = 0
private

Definition at line 116 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

uint32_t FedRawDataInputSource::L1EventID_ = 0
private

Definition at line 117 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and read().

unsigned int FedRawDataInputSource::loopModeIterationInc_ = 0
private

Definition at line 102 of file FedRawDataInputSource.h.

Referenced by getFile().

unsigned int FedRawDataInputSource::maxBufferedFiles_
private

Definition at line 86 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

std::mutex FedRawDataInputSource::monlock_
private

Definition at line 174 of file FedRawDataInputSource.h.

Referenced by getEventReport(), and reportEventsThisLumiInSource().

std::mutex FedRawDataInputSource::mReader_
private

Definition at line 144 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), readWorker(), and ~FedRawDataInputSource().

std::mutex FedRawDataInputSource::mWakeup_
private

Definition at line 163 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

unsigned int FedRawDataInputSource::nStreams_ = 0
private

Definition at line 159 of file FedRawDataInputSource.h.

Referenced by read().

unsigned int FedRawDataInputSource::numBuffers_
private

Definition at line 85 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

unsigned int FedRawDataInputSource::numConcurrentReads_
private
edm::ProcessHistoryID FedRawDataInputSource::processHistoryID_
private

Definition at line 112 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), maybeOpenNewLumiSection(), and read().

std::atomic<bool> FedRawDataInputSource::quit_threads_
private
unsigned int FedRawDataInputSource::readBlocks_
private
std::atomic<unsigned int> FedRawDataInputSource::readingFilesCount_
private

Definition at line 88 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

std::unique_ptr<std::thread> FedRawDataInputSource::readSupervisorThread_
private

Definition at line 135 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and ~FedRawDataInputSource().

edm::RunNumber_t FedRawDataInputSource::runNumber_
private

Definition at line 104 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and FedRawDataInputSource().

bool FedRawDataInputSource::setExceptionState_ = false
private

Definition at line 150 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), readSupervisor(), and readWorker().

bool FedRawDataInputSource::singleBufferMode_
private

Definition at line 167 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

std::map<unsigned int, unsigned int> FedRawDataInputSource::sourceEventsReport_
private

Definition at line 173 of file FedRawDataInputSource.h.

Referenced by getEventReport(), and reportEventsThisLumiInSource().

bool FedRawDataInputSource::startedSupervisorThread_ = false
private

Definition at line 134 of file FedRawDataInputSource.h.

Referenced by checkNextEvent(), and ~FedRawDataInputSource().

std::condition_variable FedRawDataInputSource::startupCv_
private
std::mutex FedRawDataInputSource::startupLock_
private
std::vector<int> FedRawDataInputSource::streamFileTracker_
private

Definition at line 158 of file FedRawDataInputSource.h.

Referenced by read().

unsigned char* FedRawDataInputSource::tcds_pointer_
private

Definition at line 118 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

std::vector<unsigned int> FedRawDataInputSource::thread_quit_signal
private
std::atomic<bool> FedRawDataInputSource::threadInit_
private

Definition at line 171 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and readWorker().

std::vector<unsigned int> FedRawDataInputSource::tid_active_
private

Definition at line 146 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), readSupervisor(), and readWorker().

bool FedRawDataInputSource::useFileBroker_
private
const bool FedRawDataInputSource::useL1EventID_
private

Definition at line 94 of file FedRawDataInputSource.h.

Referenced by read().

const bool FedRawDataInputSource::verifyChecksum_
private

Definition at line 93 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

std::vector<ReaderInfo> FedRawDataInputSource::workerJob_
private

Definition at line 139 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), readSupervisor(), and readWorker().

tbb::concurrent_queue<unsigned int> FedRawDataInputSource::workerPool_
private

Definition at line 138 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), and readWorker().

std::vector<std::thread*> FedRawDataInputSource::workerThreads_
private