CMS 3D CMS Logo

List of all members | Public Member Functions | Static Public Member Functions | Protected Member Functions | Private Types | Private Member Functions | Private Attributes | Friends
FedRawDataInputSource Class Reference

#include <FedRawDataInputSource.h>

Inheritance diagram for FedRawDataInputSource:
edm::RawInputSource edm::InputSource

Public Member Functions

 FedRawDataInputSource (edm::ParameterSet const &, edm::InputSourceDescription const &)
 
std::pair< bool, unsigned int > getEventReport (unsigned int lumi, bool erase)
 
 ~FedRawDataInputSource () override
 
- Public Member Functions inherited from edm::RawInputSource
 RawInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
 
 ~RawInputSource () override
 
- Public Member Functions inherited from edm::InputSource
std::shared_ptr< ActivityRegistryactReg () const
 Accessor for Activity Registry. More...
 
std::shared_ptr< BranchIDListHelper const > branchIDListHelper () const
 Accessors for branchIDListHelper. More...
 
std::shared_ptr< BranchIDListHelper > & branchIDListHelper ()
 
void closeFile (FileBlock *, bool cleaningUpAfterException)
 close current file More...
 
void doBeginJob ()
 Called by framework at beginning of job. More...
 
virtual void doBeginLumi (LuminosityBlockPrincipal &lbp, ProcessContext const *)
 Called by framework at beginning of lumi block. More...
 
virtual void doBeginRun (RunPrincipal &rp, ProcessContext const *)
 Called by framework at beginning of run. More...
 
void doEndJob ()
 Called by framework at end of job. More...
 
void fillProcessBlockHelper ()
 Fill the ProcessBlockHelper with info for the current file. More...
 
ProcessingController::ForwardState forwardState () const
 
bool goToEvent (EventID const &eventID)
 
 InputSource (ParameterSet const &, InputSourceDescription const &)
 Constructor. More...
 
 InputSource (InputSource const &)=delete
 
void issueReports (EventID const &eventID, StreamID streamID)
 issue an event report More...
 
LuminosityBlockNumber_t luminosityBlock () const
 Accessor for current luminosity block number. More...
 
std::shared_ptr< LuminosityBlockAuxiliaryluminosityBlockAuxiliary () const
 Called by the framework to merge or insert lumi in principal cache. More...
 
int maxEvents () const
 
int maxLuminosityBlocks () const
 
ModuleDescription const & moduleDescription () const
 Accessor for 'module' description. More...
 
ItemType nextItemType ()
 Advances the source to the next item. More...
 
bool nextProcessBlock (ProcessBlockPrincipal &)
 Next process block, return false if there is none, sets the processName in the principal. More...
 
InputSourceoperator= (InputSource const &)=delete
 
std::shared_ptr< ProcessBlockHelper const > processBlockHelper () const
 Accessors for processBlockHelper. More...
 
std::shared_ptr< ProcessBlockHelper > & processBlockHelper ()
 
ProcessConfiguration const & processConfiguration () const
 Accessor for Process Configuration. More...
 
std::string const & processGUID () const
 Accessor for global process identifier. More...
 
ProcessHistoryRegistry const & processHistoryRegistry () const
 Accessors for process history registry. More...
 
ProcessHistoryRegistryprocessHistoryRegistry ()
 
ProcessingMode processingMode () const
 RunsLumisAndEvents (default), RunsAndLumis, or Runs. More...
 
std::shared_ptr< ProductRegistry const > productRegistry () const
 Accessors for product registry. More...
 
bool randomAccess () const
 
void readAndMergeLumi (LuminosityBlockPrincipal &lbp)
 Read next luminosity block (same as a prior lumi) More...
 
void readAndMergeRun (RunPrincipal &rp)
 Read next run (same as a prior run) More...
 
void readEvent (EventPrincipal &ep, StreamContext &)
 Read next event. More...
 
bool readEvent (EventPrincipal &ep, EventID const &, StreamContext &)
 Read a specific event. More...
 
std::shared_ptr< FileBlockreadFile ()
 Read next file. More...
 
void readLuminosityBlock (LuminosityBlockPrincipal &lumiPrincipal, HistoryAppender &historyAppender)
 Read next luminosity block (new lumi) More...
 
std::shared_ptr< LuminosityBlockAuxiliaryreadLuminosityBlockAuxiliary ()
 Read next luminosity block Auxilary. More...
 
void readProcessBlock (ProcessBlockPrincipal &)
 Read next process block. More...
 
void readRun (RunPrincipal &runPrincipal, HistoryAppender &historyAppender)
 Read next run (new run) More...
 
std::shared_ptr< RunAuxiliaryreadRunAuxiliary ()
 Read next run Auxiliary. More...
 
ProcessHistoryID const & reducedProcessHistoryID () const
 
virtual void registerProducts ()
 Register any produced products. More...
 
int remainingEvents () const
 
int remainingLuminosityBlocks () const
 
void repeat ()
 Reset the remaining number of events/lumis to the maximum number. More...
 
std::pair< SharedResourcesAcquirer *, std::recursive_mutex * > resourceSharedWithDelayedReader ()
 Returns nullptr if no resource shared between the Source and a DelayedReader. More...
 
ProcessingController::ReverseState reverseState () const
 
void rewind ()
 Begin again at the first event. More...
 
RunNumber_t run () const
 Accessor for current run number. More...
 
std::shared_ptr< RunAuxiliaryrunAuxiliary () const
 Called by the framework to merge or insert run in principal cache. More...
 
void setLuminosityBlockNumber_t (LuminosityBlockNumber_t lb)
 Set the luminosity block ID. More...
 
void setRunNumber (RunNumber_t r)
 Set the run number. More...
 
void skipEvents (int offset)
 
void switchTo (std::shared_ptr< ProductRegistry > iOther)
 switch to a different ProductRegistry. More...
 
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper () const
 Accessors for thinnedAssociationsHelper. More...
 
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper ()
 
Timestamp const & timestamp () const
 Accessor for the current time, as seen by the input source. More...
 
virtual ~InputSource () noexcept(false)
 Destructor. More...
 

Static Public Member Functions

static void fillDescriptions (edm::ConfigurationDescriptions &descriptions)
 
- Static Public Member Functions inherited from edm::RawInputSource
static void fillDescription (ParameterSetDescription &description)
 
- Static Public Member Functions inherited from edm::InputSource
static const std::string & baseType ()
 
static void fillDescription (ParameterSetDescription &desc)
 
static void fillDescriptions (ConfigurationDescriptions &descriptions)
 
static void prevalidate (ConfigurationDescriptions &)
 

Protected Member Functions

Next checkNext () override
 
void read (edm::EventPrincipal &eventPrincipal) override
 
void setMonState (evf::FastMonState::InputState state)
 
void setMonStateSup (evf::FastMonState::InputState state)
 
- Protected Member Functions inherited from edm::RawInputSource
void makeEvent (EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
 
void setInputFileTransitionsEachEvent ()
 
- Protected Member Functions inherited from edm::InputSource
virtual void beginJob ()
 Begin protected makes it easier to do template programming. More...
 
void decreaseRemainingEventsBy (int iSkipped)
 
bool eventCached () const
 
bool newLumi () const
 
bool newRun () const
 
ProcessHistoryRegistryprocessHistoryRegistryForUpdate ()
 
ProductRegistryproductRegistryUpdate ()
 
void reset () const
 
void resetEventCached ()
 
void resetLuminosityBlockAuxiliary (bool isNewLumi=true) const
 
void resetNewLumi ()
 
void resetNewRun ()
 
void resetRunAuxiliary (bool isNewRun=true) const
 
void setEventCached ()
 Called by the framework to merge or ached() const {return eventCached_;}. More...
 
void setLuminosityBlockAuxiliary (LuminosityBlockAuxiliary *lbp)
 
void setNewLumi ()
 
void setNewRun ()
 
void setRunAuxiliary (RunAuxiliary *rp)
 
void setTimestamp (Timestamp const &theTime)
 To set the current time, as seen by the input source. More...
 
virtual void skip (int offset)
 
ItemType state () const
 

Private Types

typedef std::pair< InputFile *, InputChunk * > ReaderInfo
 

Private Member Functions

bool exceptionState ()
 
edm::Timestamp fillFEDRawDataCollection (FEDRawDataCollection &rawData, bool &tcdsInRange)
 
evf::EvFDaqDirector::FileStatus getFile (unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
 
evf::EvFDaqDirector::FileStatus getNextEvent ()
 
long initFileList ()
 
void maybeOpenNewLumiSection (const uint32_t lumiSection)
 
evf::EvFDaqDirector::FileStatus nextEvent ()
 
void readNextChunkIntoBuffer (InputFile *file)
 
void readSupervisor ()
 
void readWorker (unsigned int tid)
 
void reportEventsThisLumiInSource (unsigned int lumi, unsigned int events)
 
void rewind_ () override
 
void threadError ()
 

Private Attributes

const bool alwaysStartFromFirstLS_
 
uint32_t bufferInputRead_ = 0
 
unsigned int checkEvery_ = 10
 
bool chunkIsFree_ = false
 
std::unique_ptr< InputFilecurrentFile_
 
int currentFileIndex_ = -1
 
unsigned int currentLumiSection_
 
std::vector< std::condition_variable * > cvReader_
 
std::condition_variable cvWakeup_
 
evf::EvFDaqDirectordaqDirector_ = nullptr
 
const edm::DaqProvenanceHelper daqProvenanceHelper_
 
std::string defPath_
 
uint16_t detectedFRDversion_ = 0
 
std::unique_ptr< FRDEventMsgViewevent_
 
unsigned int eventChunkBlock_
 
unsigned int eventChunkSize_
 
edm::EventID eventID_
 
uint32_t eventRunNumber_ = 0
 
unsigned int eventsThisLumi_
 
unsigned long eventsThisRun_ = 0
 
std::mutex fileDeleteLock_
 
int fileDescriptor_ = -1
 
unsigned int fileListIndex_ = 0
 
const bool fileListLoopMode_
 
const bool fileListMode_
 
std::vector< std::string > fileNames_
 
std::list< std::pair< int, std::string > > fileNamesToDelete_
 
tbb::concurrent_queue< std::unique_ptr< InputFile > > fileQueue_
 
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
 
evf::FastMonitoringServicefms_ = nullptr
 
tbb::concurrent_queue< InputChunk * > freeChunks_
 
std::string fuOutputDir_
 
const bool getLSFromFilename_
 
uint32_t GTPEventID_ = 0
 
uint32_t L1EventID_ = 0
 
unsigned int loopModeIterationInc_ = 0
 
unsigned int maxBufferedFiles_
 
uint16_t MAXTCDSuTCAFEDID_ = FEDNumbering::MAXTCDSuTCAFEDID
 
uint16_t MINTCDSuTCAFEDID_ = FEDNumbering::MINTCDSuTCAFEDID
 
std::mutex monlock_
 
std::mutex mReader_
 
std::mutex mWakeup_
 
unsigned int nStreams_ = 0
 
unsigned int numBuffers_
 
unsigned int numConcurrentReads_
 
edm::ProcessHistoryID processHistoryID_
 
std::atomic< bool > quit_threads_
 
unsigned int readBlocks_
 
std::atomic< unsigned int > readingFilesCount_
 
std::unique_ptr< std::thread > readSupervisorThread_
 
edm::RunNumber_t runNumber_
 
bool setExceptionState_ = false
 
bool singleBufferMode_
 
std::map< unsigned int, unsigned int > sourceEventsReport_
 
bool startedSupervisorThread_ = false
 
std::condition_variable startupCv_
 
std::mutex startupLock_
 
std::vector< int > streamFileTracker_
 
unsigned char * tcds_pointer_
 
const std::vector< unsigned int > testTCDSFEDRange_
 
std::vector< unsigned int > thread_quit_signal
 
std::atomic< bool > threadInit_
 
std::vector< unsigned int > tid_active_
 
bool useFileBroker_
 
const bool useL1EventID_
 
const bool verifyChecksum_
 
std::vector< ReaderInfoworkerJob_
 
tbb::concurrent_queue< unsigned int > workerPool_
 
std::vector< std::thread * > workerThreads_
 

Friends

struct InputChunk
 
class InputFile
 

Additional Inherited Members

- Public Types inherited from edm::RawInputSource
enum  Next { Next::kEvent, Next::kFile, Next::kStop }
 
- Public Types inherited from edm::InputSource
enum  ItemType {
  IsInvalid, IsStop, IsFile, IsRun,
  IsLumi, IsEvent, IsRepeat, IsSynchronize
}
 
enum  ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents }
 
- Public Attributes inherited from edm::InputSource
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> postEventReadFromSourceSignal_
 
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> preEventReadFromSourceSignal_
 

Detailed Description

Definition at line 42 of file FedRawDataInputSource.h.

Member Typedef Documentation

◆ ReaderInfo

Definition at line 140 of file FedRawDataInputSource.h.

Constructor & Destructor Documentation

◆ FedRawDataInputSource()

FedRawDataInputSource::FedRawDataInputSource ( edm::ParameterSet const &  pset,
edm::InputSourceDescription const &  desc 
)
explicit

Definition at line 51 of file FedRawDataInputSource.cc.

References cms::cuda::assert(), edm::Timestamp::beginOfTime(), crc32c_hw_test(), cvReader_, daqDirector_, edm::DaqProvenanceHelper::daqInit(), daqProvenanceHelper_, eventChunkBlock_, eventChunkSize_, Exception, fileDeleteLock_, fileListLoopMode_, fileListMode_, filesToDelete_, fms_, freeChunks_, mps_fire::i, evf::FastMonState::inInit, initFileList(), InputChunk, edm::Timestamp::invalidTimestamp(), MAXTCDSuTCAFEDID_, MINTCDSuTCAFEDID_, numBuffers_, numConcurrentReads_, Utilities::operator, processHistoryID_, edm::InputSource::processHistoryRegistryForUpdate(), edm::InputSource::productRegistryUpdate(), quit_threads_, readBlocks_, readingFilesCount_, readWorker(), edm::InputSource::runAuxiliary(), runNumber_, evf::EvFDaqDirector::setDeleteTracking(), evf::EvFDaqDirector::setFMS(), evf::FastMonitoringService::setInputSource(), evf::FastMonitoringService::setInState(), evf::FastMonitoringService::setInStateSup(), edm::InputSource::setNewRun(), edm::InputSource::setRunAuxiliary(), singleBufferMode_, startupCv_, startupLock_, testTCDSFEDRange_, thread_quit_signal, threadInit_, tid_active_, evf::EvFDaqDirector::useFileBroker(), useFileBroker_, workerJob_, and workerThreads_.

53  defPath_(pset.getUntrackedParameter<std::string>("buDefPath", "")),
54  eventChunkSize_(pset.getUntrackedParameter<unsigned int>("eventChunkSize", 32) * 1048576),
55  eventChunkBlock_(pset.getUntrackedParameter<unsigned int>("eventChunkBlock", 32) * 1048576),
56  numBuffers_(pset.getUntrackedParameter<unsigned int>("numBuffers", 2)),
57  maxBufferedFiles_(pset.getUntrackedParameter<unsigned int>("maxBufferedFiles", 2)),
58  getLSFromFilename_(pset.getUntrackedParameter<bool>("getLSFromFilename", true)),
59  alwaysStartFromFirstLS_(pset.getUntrackedParameter<bool>("alwaysStartFromFirstLS", false)),
60  verifyChecksum_(pset.getUntrackedParameter<bool>("verifyChecksum", true)),
61  useL1EventID_(pset.getUntrackedParameter<bool>("useL1EventID", false)),
63  pset.getUntrackedParameter<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())),
64  fileNames_(pset.getUntrackedParameter<std::vector<std::string>>("fileNames", std::vector<std::string>())),
65  fileListMode_(pset.getUntrackedParameter<bool>("fileListMode", false)),
66  fileListLoopMode_(pset.getUntrackedParameter<bool>("fileListLoopMode", false)),
69  eventID_(),
72  tcds_pointer_(nullptr),
73  eventsThisLumi_(0) {
74  char thishost[256];
75  gethostname(thishost, 255);
76  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: " << std::endl
77  << (eventChunkSize_ / 1048576) << " MB on host " << thishost;
78 
79  if (!testTCDSFEDRange_.empty()) {
80  if (testTCDSFEDRange_.size() != 2) {
81  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection")
82  << "Invalid TCDS Test FED range parameter";
83  }
86  }
87 
88  long autoRunNumber = -1;
89  if (fileListMode_) {
90  autoRunNumber = initFileList();
91  if (!fileListLoopMode_) {
92  if (autoRunNumber < 0)
93  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") << "Run number not found from filename";
94  //override run number
95  runNumber_ = (edm::RunNumber_t)autoRunNumber;
96  edm::Service<evf::EvFDaqDirector>()->overrideRunNumber((unsigned int)autoRunNumber);
97  }
98  }
99 
101  setNewRun();
102  //todo:autodetect from file name (assert if names differ)
104 
105  //make sure that chunk size is N * block size
110 
111  if (!numBuffers_)
112  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource")
113  << "no reading enabled with numBuffers parameter 0";
114 
117  readingFilesCount_ = 0;
118 
119  if (!crc32c_hw_test())
120  edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
121 
122  //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
123  if (fileListMode_) {
124  try {
126  } catch (cms::Exception const&) {
127  edm::LogInfo("FedRawDataInputSource") << "No FastMonitoringService found in the configuration";
128  }
129  } else {
131  if (!fms_) {
132  throw cms::Exception("FedRawDataInputSource") << "FastMonitoringService not found";
133  }
134  }
135 
137  if (!daqDirector_)
138  cms::Exception("FedRawDataInputSource") << "EvFDaqDirector not found";
139 
141  if (useFileBroker_)
142  edm::LogInfo("FedRawDataInputSource") << "EvFDaqDirector/Source configured to use file service";
143  //set DaqDirector to delete files in preGlobalEndLumi callback
145  if (fms_) {
147  fms_->setInputSource(this);
150  }
151  //should delete chunks when run stops
152  for (unsigned int i = 0; i < numBuffers_; i++) {
154  }
155 
156  quit_threads_ = false;
157 
158  for (unsigned int i = 0; i < numConcurrentReads_; i++) {
159  std::unique_lock<std::mutex> lk(startupLock_);
160  //issue a memory fence here and in threads (constructor was segfaulting without this)
161  thread_quit_signal.push_back(false);
162  workerJob_.push_back(ReaderInfo(nullptr, nullptr));
163  cvReader_.push_back(new std::condition_variable);
164  tid_active_.push_back(0);
165  threadInit_.store(false, std::memory_order_release);
166  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker, this, i));
167  startupCv_.wait(lk);
168  }
169 
170  runAuxiliary()->setProcessHistoryID(processHistoryID_);
171 }
std::vector< std::string > fileNames_
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:232
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:330
static Timestamp invalidTimestamp()
Definition: Timestamp.h:75
bool crc32c_hw_test()
Definition: crc32c.cc:354
bool useFileBroker() const
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
std::pair< InputFile *, InputChunk * > ReaderInfo
assert(be >=bs)
std::vector< std::condition_variable * > cvReader_
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
static Timestamp beginOfTime()
Definition: Timestamp.h:77
const std::vector< unsigned int > testTCDSFEDRange_
edm::ProcessHistoryID processHistoryID_
const edm::DaqProvenanceHelper daqProvenanceHelper_
std::vector< std::thread * > workerThreads_
void setInputSource(FedRawDataInputSource *inputSource)
Log< level::Info, false > LogInfo
def getRunNumber(filename)
void setInStateSup(FastMonState::InputState inputState)
std::condition_variable startupCv_
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:331
std::atomic< bool > threadInit_
evf::EvFDaqDirector * daqDirector_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:333
void readWorker(unsigned int tid)
std::vector< unsigned int > thread_quit_signal
std::atomic< unsigned int > readingFilesCount_
evf::FastMonitoringService * fms_
std::vector< unsigned int > tid_active_
unsigned int RunNumber_t
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
tbb::concurrent_queue< InputChunk * > freeChunks_
void setFMS(evf::FastMonitoringService *fms)
void setInState(FastMonState::InputState inputState)

◆ ~FedRawDataInputSource()

FedRawDataInputSource::~FedRawDataInputSource ( )
override

Definition at line 173 of file FedRawDataInputSource.cc.

References currentFile_, cvReader_, evf::FastMonitoringService::exceptionDetected(), filesToDelete_, fms_, mps_fire::i, evf::FastMonitoringService::isExceptionOnData(), mReader_, numConcurrentReads_, quit_threads_, readSupervisorThread_, startedSupervisorThread_, thread_quit_signal, and workerThreads_.

173  {
174  quit_threads_ = true;
175 
176  //delete any remaining open files
177  if (!fms_ || !fms_->exceptionDetected()) {
178  for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++)
179  it->second.reset();
180  } else {
181  //skip deleting files with exception
182  for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++) {
183  //it->second->unsetDeleteFile();
184  if (fms_->isExceptionOnData(it->second->lumi_))
185  it->second->unsetDeleteFile();
186  else
187  it->second.reset();
188  }
189  //disable deleting current file with exception
190  if (currentFile_.get())
191  if (fms_->isExceptionOnData(currentFile_->lumi_))
192  currentFile_->unsetDeleteFile();
193  }
194 
196  readSupervisorThread_->join();
197  } else {
198  //join aux threads in case the supervisor thread was not started
199  for (unsigned int i = 0; i < workerThreads_.size(); i++) {
200  std::unique_lock<std::mutex> lk(mReader_);
201  thread_quit_signal[i] = true;
202  cvReader_[i]->notify_one();
203  lk.unlock();
204  workerThreads_[i]->join();
205  delete workerThreads_[i];
206  }
207  }
208  for (unsigned int i = 0; i < numConcurrentReads_; i++)
209  delete cvReader_[i];
210  /*
211  for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
212  InputChunk *ch;
213  while (!freeChunks_.try_pop(ch)) {}
214  delete ch;
215  }
216  */
217 }
std::atomic< bool > quit_threads_
std::unique_ptr< std::thread > readSupervisorThread_
bool isExceptionOnData(unsigned int ls)
std::vector< std::condition_variable * > cvReader_
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
std::vector< std::thread * > workerThreads_
std::vector< unsigned int > thread_quit_signal
evf::FastMonitoringService * fms_
std::unique_ptr< InputFile > currentFile_

Member Function Documentation

◆ checkNext()

edm::RawInputSource::Next FedRawDataInputSource::checkNext ( )
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 244 of file FedRawDataInputSource.cc.

References visDQMUpload::buf, evf::EvFDaqDirector::createProcessingNotificationMaybe(), currentLumiSection_, daqDirector_, event_, eventRunNumber_, eventsThisLumi_, fileListLoopMode_, fileListMode_, newFWLiteAna::found, evf::EvFDaqDirector::getEoLSFilePathOnBU(), evf::EvFDaqDirector::getEoLSFilePathOnFU(), evf::EvFDaqDirector::getEoRFilePathOnFU(), getLSFromFilename_, evf::FastMonState::inWaitInput, edm::RawInputSource::kEvent, edm::RawInputSource::kStop, L1EventID_, evf::EvFDaqDirector::lockFULocal2(), maybeOpenNewLumiSection(), evf::EvFDaqDirector::newLumi, nextEvent(), evf::EvFDaqDirector::noFile, readSupervisor(), readSupervisorThread_, reportEventsThisLumiInSource(), edm::InputSource::resetLuminosityBlockAuxiliary(), evf::EvFDaqDirector::runEnded, runNumber_, edm::InputSource::setEventCached(), setMonState(), startedSupervisorThread_, startupCv_, startupLock_, edm_modernize_messagelogger::stat, AlCaHLTBitMon_QueryRunRegistry::string, evf::EvFDaqDirector::unlockFULocal2(), and useFileBroker_.

244  {
246  //this thread opens new files and dispatches reading to worker readers
247  //threadInit_.store(false,std::memory_order_release);
248  std::unique_lock<std::mutex> lk(startupLock_);
249  readSupervisorThread_ = std::make_unique<std::thread>(&FedRawDataInputSource::readSupervisor, this);
251  startupCv_.wait(lk);
252  }
253  //signal hltd to start event accounting
254  if (!currentLumiSection_)
257  switch (nextEvent()) {
259  //maybe create EoL file in working directory before ending run
260  struct stat buf;
261  if (!useFileBroker_ && currentLumiSection_ > 0) {
262  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
263  if (eolFound) {
265  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
266  if (!found) {
268  int eol_fd =
269  open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
270  close(eol_fd);
272  }
273  }
274  }
275  //also create EoR file in FU data directory
276  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(), &buf) == 0);
277  if (!eorFound) {
278  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(),
279  O_RDWR | O_CREAT,
280  S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
281  close(eor_fd);
282  }
284  eventsThisLumi_ = 0;
286  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
287  return Next::kStop;
288  }
290  //this is not reachable
291  return Next::kEvent;
292  }
294  //std::cout << "--------------NEW LUMI---------------" << std::endl;
295  return Next::kEvent;
296  }
297  default: {
298  if (!getLSFromFilename_) {
299  //get new lumi from file header
300  if (event_->lumi() > currentLumiSection_) {
302  eventsThisLumi_ = 0;
304  }
305  }
308  else
309  eventRunNumber_ = event_->run();
310  L1EventID_ = event_->event();
311 
312  setEventCached();
313 
314  return Next::kEvent;
315  }
316  }
317 }
void maybeOpenNewLumiSection(const uint32_t lumiSection)
std::string getEoLSFilePathOnFU(const unsigned int ls) const
std::unique_ptr< std::thread > readSupervisorThread_
std::string getEoRFilePathOnFU() const
std::string getEoLSFilePathOnBU(const unsigned int ls) const
void setMonState(evf::FastMonState::InputState state)
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:362
void createProcessingNotificationMaybe() const
Log< level::Info, false > LogInfo
std::unique_ptr< FRDEventMsgView > event_
std::condition_variable startupCv_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:345
evf::EvFDaqDirector * daqDirector_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
evf::EvFDaqDirector::FileStatus nextEvent()

◆ exceptionState()

bool FedRawDataInputSource::exceptionState ( )
inlineprivate

Definition at line 70 of file FedRawDataInputSource.h.

References setExceptionState_.

Referenced by InputFile::advance().

◆ fillDescriptions()

void FedRawDataInputSource::fillDescriptions ( edm::ConfigurationDescriptions descriptions)
static

Definition at line 219 of file FedRawDataInputSource.cc.

References edm::ConfigurationDescriptions::add(), and submitPVResolutionJobs::desc.

219  {
221  desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk");
222  desc.addUntracked<unsigned int>("eventChunkSize", 32)->setComment("Input buffer (chunk) size");
223  desc.addUntracked<unsigned int>("eventChunkBlock", 32)
224  ->setComment("Block size used in a single file read call (must be smaller or equal to buffer size)");
225  desc.addUntracked<unsigned int>("numBuffers", 2)->setComment("Number of buffers used for reading input");
226  desc.addUntracked<unsigned int>("maxBufferedFiles", 2)
227  ->setComment("Maximum number of simultaneously buffered raw files");
228  desc.addUntracked<unsigned int>("alwaysStartFromfirstLS", false)
229  ->setComment("Force source to start from LS 1 if server provides higher lumisection number");
230  desc.addUntracked<bool>("verifyChecksum", true)
231  ->setComment("Verify event CRC-32C checksum of FRDv5 and higher or Adler32 with v3 and v4");
232  desc.addUntracked<bool>("useL1EventID", false)
233  ->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
234  desc.addUntracked<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())
235  ->setComment("[min, max] range to search for TCDS FED ID in test setup");
236  desc.addUntracked<bool>("fileListMode", false)
237  ->setComment("Use fileNames parameter to directly specify raw files to open");
238  desc.addUntracked<std::vector<std::string>>("fileNames", std::vector<std::string>())
239  ->setComment("file list used when fileListMode is enabled");
240  desc.setAllowAnything();
241  descriptions.add("source", desc);
242 }
void add(std::string const &label, ParameterSetDescription const &psetDescription)

◆ fillFEDRawDataCollection()

edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection ( FEDRawDataCollection rawData,
bool &  tcdsInRange 
)
private

Definition at line 702 of file FedRawDataInputSource.cc.

References cms::cuda::assert(), FEDRawData::data(), event_, evf::evtn::evm_board_sense(), Exception, l1tstage2_dqm_sourceclient-live_cfg::fedId, FEDTrailer::fragmentLength(), evf::evtn::get(), evf::evtn::getgpshigh(), evf::evtn::getgpslow(), evf::evtn::gtpe_board_sense(), evf::evtn::gtpe_get(), GTPEventID_, FEDHeader::length, FEDTrailer::length, FEDNumbering::MAXFEDID, FEDNumbering::MAXTCDSuTCAFEDID, MAXTCDSuTCAFEDID_, FEDNumbering::MINTCDSuTCAFEDID, MINTCDSuTCAFEDID_, FEDNumbering::MINTriggerEGTPFEDID, FEDNumbering::MINTriggerGTPFEDID, l1tstage2_dqm_sourceclient-live_cfg::rawData, FEDRawData::resize(), FEDHeader::sourceID(), tcds_pointer_, and protons_cff::time.

Referenced by read().

702  {
704  timeval stv;
705  gettimeofday(&stv, nullptr);
706  time = stv.tv_sec;
707  time = (time << 32) + stv.tv_usec;
708  edm::Timestamp tstamp(time);
709 
710  uint32_t eventSize = event_->eventSize();
711  unsigned char* event = (unsigned char*)event_->payload();
712  GTPEventID_ = 0;
713  tcds_pointer_ = nullptr;
714  tcdsInRange = false;
715  uint16_t selectedTCDSFed = 0;
716  while (eventSize > 0) {
717  assert(eventSize >= FEDTrailer::length);
718  eventSize -= FEDTrailer::length;
719  const FEDTrailer fedTrailer(event + eventSize);
720  const uint32_t fedSize = fedTrailer.fragmentLength() << 3; //trailer length counts in 8 bytes
721  assert(eventSize >= fedSize - FEDHeader::length);
722  eventSize -= (fedSize - FEDHeader::length);
723  const FEDHeader fedHeader(event + eventSize);
724  const uint16_t fedId = fedHeader.sourceID();
726  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
727  } else if (fedId >= MINTCDSuTCAFEDID_ && fedId <= MAXTCDSuTCAFEDID_) {
728  if (!selectedTCDSFed) {
729  selectedTCDSFed = fedId;
730  tcds_pointer_ = event + eventSize;
732  tcdsInRange = true;
733  }
734  } else
735  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection")
736  << "Second TCDS FED ID " << fedId << " found. First ID: " << selectedTCDSFed;
737  }
739  if (evf::evtn::evm_board_sense(event + eventSize, fedSize))
740  GTPEventID_ = evf::evtn::get(event + eventSize, true);
741  else
742  GTPEventID_ = evf::evtn::get(event + eventSize, false);
743  //evf::evtn::evm_board_setformat(fedSize);
744  const uint64_t gpsl = evf::evtn::getgpslow(event + eventSize);
745  const uint64_t gpsh = evf::evtn::getgpshigh(event + eventSize);
746  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t>((gpsh << 32) + gpsl));
747  }
748  //take event ID from GTPE FED
750  if (evf::evtn::gtpe_board_sense(event + eventSize)) {
751  GTPEventID_ = evf::evtn::gtpe_get(event + eventSize);
752  }
753  }
754  FEDRawData& fedData = rawData.FEDData(fedId);
755  fedData.resize(fedSize);
756  memcpy(fedData.data(), event + eventSize, fedSize);
757  }
758  assert(eventSize == 0);
759 
760  return tstamp;
761 }
unsigned int getgpshigh(const unsigned char *)
bool gtpe_board_sense(const unsigned char *p)
static const uint32_t length
Definition: FEDTrailer.h:57
unsigned int get(const unsigned char *, bool)
static const uint32_t length
Definition: FEDHeader.h:54
assert(be >=bs)
void resize(size_t newsize)
Definition: FEDRawData.cc:28
unsigned long long TimeValue_t
Definition: Timestamp.h:21
bool evm_board_sense(const unsigned char *p, size_t size)
unsigned long long uint64_t
Definition: Time.h:13
std::unique_ptr< FRDEventMsgView > event_
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:24
unsigned int gtpe_get(const unsigned char *)
unsigned int getgpslow(const unsigned char *)
Definition: event.py:1

◆ getEventReport()

std::pair< bool, unsigned int > FedRawDataInputSource::getEventReport ( unsigned int  lumi,
bool  erase 
)

Definition at line 1588 of file FedRawDataInputSource.cc.

References CommonMethods::lock(), monlock_, runTheMatrix::ret, and sourceEventsReport_.

Referenced by evf::FastMonitoringService::preGlobalEndLumi().

1588  {
1589  std::lock_guard<std::mutex> lock(monlock_);
1590  auto itr = sourceEventsReport_.find(lumi);
1591  if (itr != sourceEventsReport_.end()) {
1592  std::pair<bool, unsigned int> ret(true, itr->second);
1593  if (erase)
1594  sourceEventsReport_.erase(itr);
1595  return ret;
1596  } else
1597  return std::pair<bool, unsigned int>(false, 0);
1598 }
ret
prodAgent to be discontinued
std::map< unsigned int, unsigned int > sourceEventsReport_

◆ getFile()

evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getFile ( unsigned int &  ls,
std::string &  nextFile,
uint32_t &  fsize,
uint64_t &  lockWaitTime 
)
private

Definition at line 1635 of file FedRawDataInputSource.cc.

References fileListIndex_, fileListLoopMode_, MillePedeFileConverter_cfg::fileName, fileNames_, loopModeIterationInc_, eostools::ls(), evf::EvFDaqDirector::newFile, castor_dqm_sourceclient_file_cfg::path, evf::EvFDaqDirector::runEnded, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by readSupervisor().

1638  {
1639  if (fileListIndex_ < fileNames_.size()) {
1640  nextFile = fileNames_[fileListIndex_];
1641  if (nextFile.find("file://") == 0)
1642  nextFile = nextFile.substr(7);
1643  else if (nextFile.find("file:") == 0)
1644  nextFile = nextFile.substr(5);
1645  std::filesystem::path fileName = nextFile;
1646  std::string fileStem = fileName.stem().string();
1647  if (fileStem.find("ls"))
1648  fileStem = fileStem.substr(fileStem.find("ls") + 2);
1649  if (fileStem.find('_'))
1650  fileStem = fileStem.substr(0, fileStem.find('_'));
1651 
1652  if (!fileListLoopMode_)
1653  ls = std::stoul(fileStem);
1654  else //always starting from LS 1 in loop mode
1655  ls = 1 + loopModeIterationInc_;
1656 
1657  //fsize = 0;
1658  //lockWaitTime = 0;
1659  fileListIndex_++;
1661  } else {
1662  if (!fileListLoopMode_)
1664  else {
1665  //loop through files until interrupted
1667  fileListIndex_ = 0;
1668  return getFile(ls, nextFile, fsize, lockWaitTime);
1669  }
1670  }
1671 }
std::vector< std::string > fileNames_
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
def ls(path, rec=False)
Definition: eostools.py:349

◆ getNextEvent()

evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent ( )
inlineprivate

Definition at line 365 of file FedRawDataInputSource.cc.

References cms::cuda::assert(), bufferInputRead_, chunkIsFree_, crc32c(), currentFile_, currentFileIndex_, currentLumiSection_, cvWakeup_, daqDirector_, detectedFRDversion_, event_, eventChunkSize_, eventsThisLumi_, Exception, fileDeleteLock_, fileListMode_, fileQueue_, filesToDelete_, fms_, FRDHeaderMaxVersion, FRDHeaderVersionSize, freeChunks_, getLSFromFilename_, evf::FastMonState::inCachedEvent, evf::FastMonState::inChecksumEvent, evf::FastMonState::inChunkReceived, evf::FastMonState::inNewLumi, evf::FastMonState::inProcessingFile, evf::FastMonState::inRunEnd, evf::FastMonState::inWaitChunk, evf::FastMonState::inWaitInput, evf::EvFDaqDirector::isSingleStreamThread(), dttmaxenums::L, maybeOpenNewLumiSection(), eostools::move(), mWakeup_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, readingFilesCount_, readNextChunkIntoBuffer(), reportEventsThisLumiInSource(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, evf::EvFDaqDirector::sameFile, evf::FastMonitoringService::setExceptionDetected(), setExceptionState_, setMonState(), singleBufferMode_, mps_update::status, threadError(), mps_check::timeout, and verifyChecksum_.

Referenced by nextEvent().

365  {
366  if (setExceptionState_)
367  threadError();
368  if (!currentFile_.get()) {
371  if (!fileQueue_.try_pop(currentFile_)) {
372  //sleep until wakeup (only in single-buffer mode) or timeout
373  std::unique_lock<std::mutex> lkw(mWakeup_);
374  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_.get())
376  }
377  status = currentFile_->status_;
380  currentFile_.reset();
381  return status;
382  } else if (status == evf::EvFDaqDirector::runAbort) {
383  throw cms::Exception("FedRawDataInputSource::getNextEvent")
384  << "Run has been aborted by the input source reader thread";
385  } else if (status == evf::EvFDaqDirector::newLumi) {
387  if (getLSFromFilename_) {
388  if (currentFile_->lumi_ > currentLumiSection_) {
390  eventsThisLumi_ = 0;
392  }
393  } else { //let this be picked up from next event
395  }
396  currentFile_.reset();
397  return status;
398  } else if (status == evf::EvFDaqDirector::newFile) {
400  } else
401  assert(false);
402  }
404 
405  //file is empty
406  if (!currentFile_->fileSize_) {
408  //try to open new lumi
409  assert(currentFile_->nChunks_ == 0);
410  if (getLSFromFilename_)
411  if (currentFile_->lumi_ > currentLumiSection_) {
413  eventsThisLumi_ = 0;
415  }
416  //immediately delete empty file
417  currentFile_.reset();
419  }
420 
421  //file is finished
422  if (currentFile_->bufferPosition_ == currentFile_->fileSize_) {
424  //release last chunk (it is never released elsewhere)
425  freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]);
426  if (currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_)) {
427  throw cms::Exception("FedRawDataInputSource::getNextEvent")
428  << "Fully processed " << currentFile_->nProcessed_ << " from the file " << currentFile_->fileName_
429  << " but according to BU JSON there should be " << currentFile_->nEvents_ << " events";
430  }
431  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
432  if (singleBufferMode_) {
433  std::unique_lock<std::mutex> lkw(mWakeup_);
434  cvWakeup_.notify_one();
435  }
436  bufferInputRead_ = 0;
438  //put the file in pending delete list;
439  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
440  filesToDelete_.push_back(std::pair<int, std::unique_ptr<InputFile>>(currentFileIndex_, std::move(currentFile_)));
441  } else {
442  //in single-thread and stream jobs, events are already processed
443  currentFile_.reset();
444  }
446  }
447 
448  //handle RAW file header
449  if (currentFile_->bufferPosition_ == 0 && currentFile_->rawHeaderSize_ > 0) {
450  if (currentFile_->fileSize_ <= currentFile_->rawHeaderSize_) {
451  if (currentFile_->fileSize_ < currentFile_->rawHeaderSize_)
452  throw cms::Exception("FedRawDataInputSource::getNextEvent")
453  << "Premature end of input file while reading file header";
454 
455  edm::LogWarning("FedRawDataInputSource")
456  << "File with only raw header and no events received in LS " << currentFile_->lumi_;
457  if (getLSFromFilename_)
458  if (currentFile_->lumi_ > currentLumiSection_) {
460  eventsThisLumi_ = 0;
462  }
463  }
464 
465  //advance buffer position to skip file header (chunk will be acquired later)
466  currentFile_->chunkPosition_ += currentFile_->rawHeaderSize_;
467  currentFile_->bufferPosition_ += currentFile_->rawHeaderSize_;
468  }
469 
470  //file is too short
471  if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < FRDHeaderVersionSize[detectedFRDversion_]) {
472  throw cms::Exception("FedRawDataInputSource::getNextEvent")
473  << "Premature end of input file while reading event header";
474  }
475  if (singleBufferMode_) {
476  //should already be there
478  while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
479  usleep(10000);
480  if (currentFile_->parent_->exceptionState() || setExceptionState_)
481  currentFile_->parent_->threadError();
482  }
484 
485  unsigned char* dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
486 
487  //conditions when read amount is not sufficient for the header to fit
491 
492  if (detectedFRDversion_ == 0) {
493  detectedFRDversion_ = *((uint16_t*)dataPosition);
495  throw cms::Exception("FedRawDataInputSource::getNextEvent")
496  << "Unknown FRD version -: " << detectedFRDversion_;
498  }
499 
500  //recalculate chunk position
501  dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
503  throw cms::Exception("FedRawDataInputSource::getNextEvent")
504  << "Premature end of input file while reading event header";
505  }
506  }
507 
508  event_ = std::make_unique<FRDEventMsgView>(dataPosition);
509  if (event_->size() > eventChunkSize_) {
510  throw cms::Exception("FedRawDataInputSource::getNextEvent")
511  << " event id:" << event_->event() << " lumi:" << event_->lumi() << " run:" << event_->run()
512  << " of size:" << event_->size() << " bytes does not fit into a chunk of size:" << eventChunkSize_
513  << " bytes";
514  }
515 
516  const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];
517 
518  if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize) {
519  throw cms::Exception("FedRawDataInputSource::getNextEvent")
520  << "Premature end of input file while reading event data";
521  }
522  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
524  //recalculate chunk position
525  dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
526  event_ = std::make_unique<FRDEventMsgView>(dataPosition);
527  }
528  currentFile_->bufferPosition_ += event_->size();
529  currentFile_->chunkPosition_ += event_->size();
530  //last chunk is released when this function is invoked next time
531 
532  }
533  //multibuffer mode:
534  else {
535  //wait for the current chunk to become added to the vector
537  while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
538  usleep(10000);
539  if (setExceptionState_)
540  threadError();
541  }
543 
544  //check if header is at the boundary of two chunks
545  chunkIsFree_ = false;
546  unsigned char* dataPosition;
547 
548  //read header, copy it to a single chunk if necessary
549  bool chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_]);
550 
551  event_ = std::make_unique<FRDEventMsgView>(dataPosition);
552  if (event_->size() > eventChunkSize_) {
553  throw cms::Exception("FedRawDataInputSource::getNextEvent")
554  << " event id:" << event_->event() << " lumi:" << event_->lumi() << " run:" << event_->run()
555  << " of size:" << event_->size() << " bytes does not fit into a chunk of size:" << eventChunkSize_
556  << " bytes";
557  }
558 
559  const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];
560 
561  if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize) {
562  throw cms::Exception("FedRawDataInputSource::getNextEvent")
563  << "Premature end of input file while reading event data";
564  }
565 
566  if (chunkEnd) {
567  //header was at the chunk boundary, we will have to move payload as well
568  currentFile_->moveToPreviousChunk(msgSize, FRDHeaderVersionSize[detectedFRDversion_]);
569  chunkIsFree_ = true;
570  } else {
571  //header was contiguous, but check if payload fits the chunk
572  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
573  //rewind to header start position
575  //copy event to a chunk start and move pointers
576 
578 
579  chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_] + msgSize);
580 
582 
583  assert(chunkEnd);
584  chunkIsFree_ = true;
585  //header is moved
586  event_ = std::make_unique<FRDEventMsgView>(dataPosition);
587  } else {
588  //everything is in a single chunk, only move pointers forward
589  chunkEnd = currentFile_->advance(dataPosition, msgSize);
590  assert(!chunkEnd);
591  chunkIsFree_ = false;
592  }
593  }
594  } //end multibuffer mode
596 
597  if (verifyChecksum_ && event_->version() >= 5) {
598  uint32_t crc = 0;
599  crc = crc32c(crc, (const unsigned char*)event_->payload(), event_->eventSize());
600  if (crc != event_->crc32c()) {
601  if (fms_)
603  throw cms::Exception("FedRawDataInputSource::getNextEvent")
604  << "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() << " but calculated 0x"
605  << crc;
606  }
607  } else if (verifyChecksum_ && event_->version() >= 3) {
608  uint32_t adler = adler32(0L, Z_NULL, 0);
609  adler = adler32(adler, (Bytef*)event_->payload(), event_->eventSize());
610 
611  if (adler != event_->adler32()) {
612  if (fms_)
614  throw cms::Exception("FedRawDataInputSource::getNextEvent")
615  << "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() << " but calculated 0x"
616  << adler;
617  }
618  }
620 
621  currentFile_->nProcessed_++;
622 
624 }
std::condition_variable cvWakeup_
constexpr size_t FRDHeaderMaxVersion
void setExceptionDetected(unsigned int ls)
void maybeOpenNewLumiSection(const uint32_t lumiSection)
tbb::concurrent_queue< std::unique_ptr< InputFile > > fileQueue_
int timeout
Definition: mps_check.py:53
assert(be >=bs)
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
void setMonState(evf::FastMonState::InputState state)
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
Definition: crc32c.cc:340
std::unique_ptr< FRDEventMsgView > event_
evf::EvFDaqDirector * daqDirector_
std::atomic< unsigned int > readingFilesCount_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
evf::FastMonitoringService * fms_
constexpr std::array< uint32, FRDHeaderMaxVersion+1 > FRDHeaderVersionSize
tbb::concurrent_queue< InputChunk * > freeChunks_
Log< level::Warning, false > LogWarning
void readNextChunkIntoBuffer(InputFile *file)
std::unique_ptr< InputFile > currentFile_
def move(src, dest)
Definition: eostools.py:511

◆ initFileList()

long FedRawDataInputSource::initFileList ( )
private

Definition at line 1600 of file FedRawDataInputSource.cc.

References a, b, mps_fire::end, cppFunctionSkipper::exception, MillePedeFileConverter_cfg::fileName, fileNames_, castor_dqm_sourceclient_file_cfg::path, jetUpdater_cfi::sort, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FedRawDataInputSource().

1600  {
1601  std::sort(fileNames_.begin(), fileNames_.end(), [](std::string a, std::string b) {
1602  if (a.rfind('/') != std::string::npos)
1603  a = a.substr(a.rfind('/'));
1604  if (b.rfind('/') != std::string::npos)
1605  b = b.substr(b.rfind('/'));
1606  return b > a;
1607  });
1608 
1609  if (!fileNames_.empty()) {
1610  //get run number from first file in the vector
1612  std::string fileStem = fileName.stem().string();
1613  if (fileStem.find("file://") == 0)
1614  fileStem = fileStem.substr(7);
1615  else if (fileStem.find("file:") == 0)
1616  fileStem = fileStem.substr(5);
1617  auto end = fileStem.find('_');
1618 
1619  if (fileStem.find("run") == 0) {
1620  std::string runStr = fileStem.substr(3, end - 3);
1621  try {
1622  //get long to support test run numbers < 2^32
1623  long rval = std::stol(runStr);
1624  edm::LogInfo("FedRawDataInputSource") << "Autodetected run number in fileListMode -: " << rval;
1625  return rval;
1626  } catch (const std::exception&) {
1627  edm::LogWarning("FedRawDataInputSource")
1628  << "Unable to autodetect run number in fileListMode from file -: " << fileName;
1629  }
1630  }
1631  }
1632  return -1;
1633 }
std::vector< std::string > fileNames_
Log< level::Info, false > LogInfo
double b
Definition: hdecay.h:118
double a
Definition: hdecay.h:119
Log< level::Warning, false > LogWarning

◆ maybeOpenNewLumiSection()

void FedRawDataInputSource::maybeOpenNewLumiSection ( const uint32_t  lumiSection)
private

Definition at line 319 of file FedRawDataInputSource.cc.

References visDQMUpload::buf, evf::EvFDaqDirector::createBoLSFile(), currentLumiSection_, daqDirector_, newFWLiteAna::found, evf::EvFDaqDirector::getEoLSFilePathOnFU(), edm::Timestamp::invalidTimestamp(), evf::EvFDaqDirector::lockFULocal2(), edm::InputSource::luminosityBlock(), edm::InputSource::luminosityBlockAuxiliary(), processHistoryID_, edm::InputSource::resetLuminosityBlockAuxiliary(), edm::InputSource::run(), edm::InputSource::runAuxiliary(), edm::InputSource::setLuminosityBlockAuxiliary(), edm_modernize_messagelogger::stat, AlCaHLTBitMon_QueryRunRegistry::string, evf::EvFDaqDirector::unlockFULocal2(), and useFileBroker_.

Referenced by checkNext(), and getNextEvent().

319  {
320  if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
321  if (!useFileBroker_) {
322  if (currentLumiSection_ > 0) {
324  struct stat buf;
325  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
326  if (!found) {
328  int eol_fd =
329  open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
330  close(eol_fd);
331  daqDirector_->createBoLSFile(lumiSection, false);
333  }
334  } else
335  daqDirector_->createBoLSFile(lumiSection, true); //needed for initial lumisection
336  }
337 
338  currentLumiSection_ = lumiSection;
339 
341 
342  timeval tv;
343  gettimeofday(&tv, nullptr);
344  const edm::Timestamp lsopentime((unsigned long long)tv.tv_sec * 1000000 + (unsigned long long)tv.tv_usec);
345 
347  runAuxiliary()->run(), lumiSection, lsopentime, edm::Timestamp::invalidTimestamp());
348 
349  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
350  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
351 
352  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: " << lumiSection;
353  }
354 }
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:232
static Timestamp invalidTimestamp()
Definition: Timestamp.h:75
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:457
std::string getEoLSFilePathOnFU(const unsigned int ls) const
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:337
edm::ProcessHistoryID processHistoryID_
Log< level::Info, false > LogInfo
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:345
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:462
evf::EvFDaqDirector * daqDirector_
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:235
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const

◆ nextEvent()

evf::EvFDaqDirector::FileStatus FedRawDataInputSource::nextEvent ( )
inlineprivate

Definition at line 356 of file FedRawDataInputSource.cc.

References getNextEvent(), svgfig::load(), evf::EvFDaqDirector::noFile, edm::shutdown_flag, and mps_update::status.

Referenced by checkNext().

356  {
359  if (edm::shutdown_flag.load(std::memory_order_relaxed))
360  break;
361  }
362  return status;
363 }
volatile std::atomic< bool > shutdown_flag
def load(fileName)
Definition: svgfig.py:547
evf::EvFDaqDirector::FileStatus getNextEvent()

◆ read()

void FedRawDataInputSource::read ( edm::EventPrincipal eventPrincipal)
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 626 of file FedRawDataInputSource.cc.

References printConversionInfo::aux, edm::DaqProvenanceHelper::branchDescription(), checkEvery_, chunkIsFree_, runTheMatrix::const, currentFile_, currentFileIndex_, currentLumiSection_, daqProvenanceHelper_, edm::DaqProvenanceHelper::dummyProvenance(), event_, eventID_, eventRunNumber_, eventsThisLumi_, Exception, fileDeleteLock_, fileListLoopMode_, filesToDelete_, fillFEDRawDataCollection(), fms_, freeChunks_, GTPEventID_, mps_fire::i, evf::FastMonState::inNoRequest, evf::FastMonState::inReadCleanup, evf::FastMonState::inReadEvent, evf::FastMonitoringService::isExceptionOnData(), L1EventID_, FEDHeader::length, edm::RawInputSource::makeEvent(), evf::evtn::makeEventAuxiliary(), eostools::move(), nStreams_, edm::EventAuxiliary::PhysicsTrigger, edm::InputSource::processGUID(), processHistoryID_, edm::EventPrincipal::put(), l1tstage2_dqm_sourceclient-live_cfg::rawData, setMonState(), streamFileTracker_, edm::EventPrincipal::streamID(), AlCaHLTBitMon_QueryRunRegistry::string, tcds_pointer_, FEDHeader::triggerType(), and useL1EventID_.

Referenced by edmIntegrityCheck.PublishToFileSystem::get(), readNextChunkIntoBuffer(), and readWorker().

626  {
628  std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
629  bool tcdsInRange;
630  edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData, tcdsInRange);
631 
632  if (useL1EventID_) {
635  aux.setProcessHistoryID(processHistoryID_);
636  makeEvent(eventPrincipal, aux);
637  } else if (tcds_pointer_ == nullptr) {
638  if (!GTPEventID_) {
639  throw cms::Exception("FedRawDataInputSource::read")
640  << "No TCDS or GTP FED in event with FEDHeader EID -: " << L1EventID_;
641  }
644  aux.setProcessHistoryID(processHistoryID_);
645  makeEvent(eventPrincipal, aux);
646  } else {
647  const FEDHeader fedHeader(tcds_pointer_);
648  tcds::Raw_v1 const* tcds = reinterpret_cast<tcds::Raw_v1 const*>(tcds_pointer_ + FEDHeader::length);
653  event_->isRealData(),
654  static_cast<edm::EventAuxiliary::ExperimentType>(fedHeader.triggerType()),
655  processGUID(),
657  !tcdsInRange);
658  aux.setProcessHistoryID(processHistoryID_);
659  makeEvent(eventPrincipal, aux);
660  }
661 
662  std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
663 
665 
666  eventsThisLumi_++;
668 
669  //resize vector if needed
670  while (streamFileTracker_.size() <= eventPrincipal.streamID())
671  streamFileTracker_.push_back(-1);
672 
673  streamFileTracker_[eventPrincipal.streamID()] = currentFileIndex_;
674 
675  //this old file check runs no more often than every 10 events
676  if (!((currentFile_->nProcessed_ - 1) % (checkEvery_))) {
677  //delete files that are not in processing
678  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
679  auto it = filesToDelete_.begin();
680  while (it != filesToDelete_.end()) {
681  bool fileIsBeingProcessed = false;
682  for (unsigned int i = 0; i < nStreams_; i++) {
683  if (it->first == streamFileTracker_.at(i)) {
684  fileIsBeingProcessed = true;
685  break;
686  }
687  }
688  if (!fileIsBeingProcessed && (!fms_ || !fms_->isExceptionOnData(it->second->lumi_))) {
689  std::string fileToDelete = it->second->fileName_;
690  it = filesToDelete_.erase(it);
691  } else
692  it++;
693  }
694  }
695  if (chunkIsFree_)
696  freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_ - 1]);
697  chunkIsFree_ = false;
699  return;
700 }
static const uint32_t length
Definition: FEDHeader.h:54
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection &rawData, bool &tcdsInRange)
StreamID streamID() const
bool isExceptionOnData(unsigned int ls)
edm::EventAuxiliary makeEventAuxiliary(const tcds::Raw_v1 *, unsigned int runNumber, unsigned int lumiSection, bool isRealData, const edm::EventAuxiliary::ExperimentType &, const std::string &processGUID, bool verifyLumiSection, bool suppressWarning)
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance) const
Definition: TCDSRaw.h:16
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:197
std::vector< int > streamFileTracker_
ProductProvenance const & dummyProvenance() const
edm::ProcessHistoryID processHistoryID_
const edm::DaqProvenanceHelper daqProvenanceHelper_
void setMonState(evf::FastMonState::InputState state)
BranchDescription const & branchDescription() const
std::unique_ptr< FRDEventMsgView > event_
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
evf::FastMonitoringService * fms_
tbb::concurrent_queue< InputChunk * > freeChunks_
std::unique_ptr< InputFile > currentFile_
def move(src, dest)
Definition: eostools.py:511

◆ readNextChunkIntoBuffer()

void FedRawDataInputSource::readNextChunkIntoBuffer ( InputFile file)
private

Definition at line 1508 of file FedRawDataInputSource.cc.

References bufferInputRead_, eventChunkBlock_, eventChunkSize_, Exception, geometryDiff::file, fileDescriptor_, mps_fire::i, dqmdumpme::last, LogDebug, read(), and readBlocks_.

Referenced by getNextEvent().

1508  {
1509  uint32_t existingSize = 0;
1510 
1511  if (fileDescriptor_ < 0) {
1512  bufferInputRead_ = 0;
1513  if (file->rawFd_ == -1) {
1514  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1515  if (file->rawHeaderSize_)
1516  lseek(fileDescriptor_, file->rawHeaderSize_, SEEK_SET);
1517  } else
1518  fileDescriptor_ = file->rawFd_;
1519 
1520  //skip header size in destination buffer (chunk position was already adjusted)
1521  bufferInputRead_ += file->rawHeaderSize_;
1522  existingSize += file->rawHeaderSize_;
1523 
1524  if (fileDescriptor_ >= 0)
1525  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1526  else {
1527  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer")
1528  << "failed to open file " << std::endl
1529  << file->fileName_ << " fd:" << fileDescriptor_;
1530  }
1531  //fill chunk (skipping file header if present)
1532  for (unsigned int i = 0; i < readBlocks_; i++) {
1533  const ssize_t last = ::read(fileDescriptor_,
1534  (void*)(file->chunks_[0]->buf_ + existingSize),
1535  eventChunkBlock_ - (i == readBlocks_ - 1 ? existingSize : 0));
1537  existingSize += last;
1538  }
1539 
1540  } else {
1541  //continue reading
1542  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1543  for (unsigned int i = 0; i < readBlocks_; i++) {
1544  const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1546  existingSize += last;
1547  }
1548  } else {
1549  //event didn't fit in last chunk, so leftover must be moved to the beginning and completed
1550  uint32_t existingSizeLeft = eventChunkSize_ - file->chunkPosition_;
1551  memmove((void*)file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSizeLeft);
1552 
1553  //calculate amount of data that can be added
1554  const uint32_t blockcount = file->chunkPosition_ / eventChunkBlock_;
1555  const uint32_t leftsize = file->chunkPosition_ % eventChunkBlock_;
1556 
1557  for (uint32_t i = 0; i < blockcount; i++) {
1558  const ssize_t last =
1559  ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), eventChunkBlock_);
1561  existingSizeLeft += last;
1562  }
1563  if (leftsize) {
1564  const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), leftsize);
1566  }
1567  file->chunkPosition_ = 0; //data was moved to beginning of the chunk
1568  }
1569  }
1570  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1571  if (fileDescriptor_ != -1) {
1572  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1573  close(fileDescriptor_);
1574  file->rawFd_ = fileDescriptor_ = -1;
1575  }
1576  }
1577 }
void read(edm::EventPrincipal &eventPrincipal) override
#define LogDebug(id)

◆ readSupervisor()

void FedRawDataInputSource::readSupervisor ( )
private

Definition at line 765 of file FedRawDataInputSource.cc.

References alwaysStartFromFirstLS_, cms::cuda::assert(), cvReader_, cvWakeup_, daqDirector_, relativeConstraints::empty, eventChunkSize_, fileListMode_, fileQueue_, fms_, freeChunks_, getFile(), getLSFromFilename_, evf::EvFDaqDirector::getLumisectionToStart(), evf::EvFDaqDirector::getNextFromFileBroker(), evf::EvFDaqDirector::getStartLumisectionFromEnv(), evf::EvFDaqDirector::grabNextJsonFileAndUnlock(), evf::EvFDaqDirector::grabNextJsonFromRaw(), mps_fire::i, dqmiodatasetharvest::inf, InputFile, evf::EvFDaqDirector::inputThrottled(), evf::FastMonState::inRunEnd, evf::FastMonState::inSupBusy, evf::FastMonState::inSupFileLimit, evf::FastMonState::inSupLockPolling, evf::FastMonState::inSupNewFile, evf::FastMonState::inSupNewFileWaitChunk, evf::FastMonState::inSupNewFileWaitChunkCopying, evf::FastMonState::inSupNewFileWaitThread, evf::FastMonState::inSupNewFileWaitThreadCopying, evf::FastMonState::inSupNoFile, evf::FastMonState::inSupWaitFreeChunk, evf::FastMonState::inSupWaitFreeChunkCopying, evf::FastMonState::inSupWaitFreeThread, evf::FastMonState::inSupWaitFreeThreadCopying, createfilelist::int, evf::FastMonState::inThrottled, dqmiolumiharvest::j, LogDebug, eostools::ls(), evf::EvFDaqDirector::lumisectionDiscarded(), maxBufferedFiles_, SiStripPI::min, eostools::move(), mReader_, mWakeup_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, evf::EvFDaqDirector::numConcurrentLumis(), evf::EvFDaqDirector::parseFRDFileHeader(), castor_dqm_sourceclient_file_cfg::path, funct::pow(), quit_threads_, InputChunk::readComplete_, readingFilesCount_, evf::FastMonitoringService::reportLockWait(), InputChunk::reset(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, setExceptionState_, setMonStateSup(), edm::shutdown_flag, singleBufferMode_, evf::FastMonitoringService::startedLookingForFile(), startupCv_, startupLock_, edm_modernize_messagelogger::stat, mps_update::status, evf::FastMonitoringService::stoppedLookingForFile(), AlCaHLTBitMon_QueryRunRegistry::string, thread_quit_signal, tid_active_, mps_check::timeout, evf::EvFDaqDirector::unlockFULocal(), evf::EvFDaqDirector::updateFuLock(), useFileBroker_, workerJob_, workerPool_, and workerThreads_.

Referenced by checkNext().

765  {
766  bool stop = false;
767  unsigned int currentLumiSection = 0;
768  //threadInit_.exchange(true,std::memory_order_acquire);
769 
770  {
771  std::unique_lock<std::mutex> lk(startupLock_);
772  startupCv_.notify_one();
773  }
774 
775  uint32_t ls = 0;
776  uint32_t monLS = 1;
777  uint32_t lockCount = 0;
778  uint64_t sumLockWaitTimeUs = 0.;
779 
780  while (!stop) {
781  //wait for at least one free thread and chunk
782  int counter = 0;
783 
784  while ((workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty() ||
786  //report state to monitoring
787  if (fms_) {
788  bool copy_active = false;
789  for (auto j : tid_active_)
790  if (j)
791  copy_active = true;
794  else if (freeChunks_.empty()) {
795  if (copy_active)
797  else
799  } else {
800  if (copy_active)
802  else
804  }
805  }
806  std::unique_lock<std::mutex> lkw(mWakeup_);
807  //sleep until woken up by condition or a timeout
808  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
809  counter++;
810  //if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads...";
811  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
812  } else {
813  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
814  }
815  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
816  stop = true;
817  break;
818  }
819  }
820  //if this is reached, there are enough buffers and threads to proceed or processing is instructed to stop
821 
822  if (stop)
823  break;
824 
825  //look for a new file
826  std::string nextFile;
827  uint32_t fileSizeIndex;
828  int64_t fileSizeFromMetadata;
829 
830  if (fms_) {
833  }
834 
836  uint16_t rawHeaderSize = 0;
837  uint32_t lsFromRaw = 0;
838  int32_t serverEventsInNewFile = -1;
839  int rawFd = -1;
840 
841  int backoff_exp = 0;
842 
843  //entering loop which tries to grab new file from ramdisk
845  //check if hltd has signalled to throttle input
846  counter = 0;
847  while (daqDirector_->inputThrottled()) {
848  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed))
849  break;
850 
851  unsigned int nConcurrentLumis = daqDirector_->numConcurrentLumis();
852  unsigned int nOtherLumis = nConcurrentLumis > 0 ? nConcurrentLumis - 1 : 0;
853  unsigned int checkLumiStart = currentLumiSection > nOtherLumis ? currentLumiSection - nOtherLumis : 1;
854  bool hasDiscardedLumi = false;
855  for (unsigned int i = checkLumiStart; i <= currentLumiSection; i++) {
857  edm::LogWarning("FedRawDataInputSource") << "Source detected that the lumisection is discarded -: " << i;
858  hasDiscardedLumi = true;
859  break;
860  }
861  }
862  if (hasDiscardedLumi)
863  break;
864 
866 
867  if (!(counter % 50))
868  edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, reading files is paused...";
869  usleep(100000);
870  counter++;
871  }
872 
873  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
874  stop = true;
875  break;
876  }
877 
878  assert(rawFd == -1);
879  uint64_t thisLockWaitTimeUs = 0.;
881  if (fileListMode_) {
882  //return LS if LS not set, otherwise return file
883  status = getFile(ls, nextFile, fileSizeIndex, thisLockWaitTimeUs);
885  uint16_t rawDataType;
887  rawFd,
888  rawHeaderSize,
889  rawDataType,
890  lsFromRaw,
891  serverEventsInNewFile,
892  fileSizeFromMetadata,
893  false,
894  false,
895  false) != 0) {
896  //error
897  setExceptionState_ = true;
898  stop = true;
899  break;
900  }
901  if (!getLSFromFilename_)
902  ls = lsFromRaw;
903  }
904  } else if (!useFileBroker_)
906  ls, nextFile, fileSizeIndex, rawHeaderSize, thisLockWaitTimeUs, setExceptionState_);
907  else {
908  status = daqDirector_->getNextFromFileBroker(currentLumiSection,
909  ls,
910  nextFile,
911  rawFd,
912  rawHeaderSize,
913  serverEventsInNewFile,
914  fileSizeFromMetadata,
915  thisLockWaitTimeUs);
916  }
917 
919 
920  //cycle through all remaining LS even if no files get assigned
921  if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
923 
924  //monitoring of lock wait time
925  if (thisLockWaitTimeUs > 0.)
926  sumLockWaitTimeUs += thisLockWaitTimeUs;
927  lockCount++;
928  if (ls > monLS) {
929  monLS = ls;
930  if (lockCount)
931  if (fms_)
932  fms_->reportLockWait(monLS, sumLockWaitTimeUs, lockCount);
933  lockCount = 0;
934  sumLockWaitTimeUs = 0;
935  }
936 
937  //check again for any remaining index/EoLS files after EoR file is seen
940  usleep(100000);
941  //now all files should have appeared in ramdisk, check again if any raw files were left behind
943  ls, nextFile, fileSizeIndex, rawHeaderSize, thisLockWaitTimeUs, setExceptionState_);
944  if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
946  }
947 
949  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runEnded));
950  fileQueue_.push(std::move(inf));
951  stop = true;
952  break;
953  }
954 
955  //error from filelocking function
957  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
958  fileQueue_.push(std::move(inf));
959  stop = true;
960  break;
961  }
962  //queue new lumisection
963  if (getLSFromFilename_) {
964  if (ls > currentLumiSection) {
965  if (!useFileBroker_) {
966  //file locking
967  //setMonStateSup(inSupNewLumi);
968  currentLumiSection = ls;
969  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
970  fileQueue_.push(std::move(inf));
971  } else {
972  //new file service
973  if (currentLumiSection == 0 && !alwaysStartFromFirstLS_) {
975  //start transitions from LS specified by env, continue if not reached
976  if (ls < daqDirector_->getStartLumisectionFromEnv()) {
977  //skip file if from earlier LS than specified by env
978  if (rawFd != -1) {
979  close(rawFd);
980  rawFd = -1;
981  }
983  continue;
984  } else {
985  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, ls));
986  fileQueue_.push(std::move(inf));
987  }
988  } else if (ls < 100) {
989  //look at last LS file on disk to start from that lumisection (only within first 100 LS)
990  unsigned int lsToStart = daqDirector_->getLumisectionToStart();
991 
992  for (unsigned int nextLS = std::min(lsToStart, ls); nextLS <= ls; nextLS++) {
993  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
994  fileQueue_.push(std::move(inf));
995  }
996  } else {
997  //start from current LS
998  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, ls));
999  fileQueue_.push(std::move(inf));
1000  }
1001  } else {
1002  //queue all lumisections after last one seen to avoid gaps
1003  for (unsigned int nextLS = currentLumiSection + 1; nextLS <= ls; nextLS++) {
1004  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
1005  fileQueue_.push(std::move(inf));
1006  }
1007  }
1008  currentLumiSection = ls;
1009  }
1010  }
1011  //else
1012  if (currentLumiSection > 0 && ls < currentLumiSection) {
1013  edm::LogError("FedRawDataInputSource")
1014  << "Got old LS (" << ls << ") file from EvFDAQDirector! Expected LS:" << currentLumiSection
1015  << ". Aborting execution." << std::endl;
1016  if (rawFd != -1)
1017  close(rawFd);
1018  rawFd = -1;
1019  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
1020  fileQueue_.push(std::move(inf));
1021  stop = true;
1022  break;
1023  }
1024  }
1025 
1026  int dbgcount = 0;
1029  dbgcount++;
1030  if (!(dbgcount % 20))
1031  LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
1032  if (!useFileBroker_)
1033  usleep(100000);
1034  else {
1035  backoff_exp = std::min(4, backoff_exp); // max 1.6 seconds
1036  //backoff_exp=0; // disabled!
1037  int sleeptime = (int)(100000. * pow(2, backoff_exp));
1038  usleep(sleeptime);
1039  backoff_exp++;
1040  }
1041  } else
1042  backoff_exp = 0;
1043  }
1044  //end of file grab loop, parse result
1047  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
1048 
1049  std::string rawFile;
1050  //file service will report raw extension
1051  if (useFileBroker_ || rawHeaderSize)
1052  rawFile = nextFile;
1053  else {
1054  std::filesystem::path rawFilePath(nextFile);
1055  rawFile = rawFilePath.replace_extension(".raw").string();
1056  }
1057 
1058  struct stat st;
1059  int stat_res = stat(rawFile.c_str(), &st);
1060  if (stat_res == -1) {
1061  edm::LogError("FedRawDataInputSource") << "Can not stat file (" << errno << "):-" << rawFile << std::endl;
1062  setExceptionState_ = true;
1063  break;
1064  }
1065  uint64_t fileSize = st.st_size;
1066 
1067  if (fms_) {
1071  }
1072  int eventsInNewFile;
1073  if (fileListMode_) {
1074  if (fileSize == 0)
1075  eventsInNewFile = 0;
1076  else
1077  eventsInNewFile = -1;
1078  } else {
1080  if (!useFileBroker_) {
1081  if (rawHeaderSize) {
1082  int rawFdEmpty = -1;
1083  uint16_t rawHeaderCheck;
1084  bool fileFound;
1085  eventsInNewFile = daqDirector_->grabNextJsonFromRaw(
1086  nextFile, rawFdEmpty, rawHeaderCheck, fileSizeFromMetadata, fileFound, 0, true);
1087  assert(fileFound && rawHeaderCheck == rawHeaderSize);
1089  } else
1090  eventsInNewFile = daqDirector_->grabNextJsonFileAndUnlock(nextFile);
1091  } else
1092  eventsInNewFile = serverEventsInNewFile;
1093  assert(eventsInNewFile >= 0);
1094  assert((eventsInNewFile > 0) ==
1095  (fileSize > rawHeaderSize)); //file without events must be empty or contain only header
1096  }
1097 
1098  if (!singleBufferMode_) {
1099  //calculate number of needed chunks
1100  unsigned int neededChunks = fileSize / eventChunkSize_;
1101  if (fileSize % eventChunkSize_)
1102  neededChunks++;
1103 
1104  std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1105  ls,
1106  rawFile,
1107  !fileListMode_,
1108  rawFd,
1109  fileSize,
1110  rawHeaderSize,
1111  neededChunks,
1112  eventsInNewFile,
1113  this));
1115  auto newInputFilePtr = newInputFile.get();
1116  fileQueue_.push(std::move(newInputFile));
1117 
1118  for (unsigned int i = 0; i < neededChunks; i++) {
1119  if (fms_) {
1120  bool copy_active = false;
1121  for (auto j : tid_active_)
1122  if (j)
1123  copy_active = true;
1124  if (copy_active)
1126  else
1128  }
1129  //get thread
1130  unsigned int newTid = 0xffffffff;
1131  while (!workerPool_.try_pop(newTid)) {
1132  usleep(100000);
1133  if (quit_threads_.load(std::memory_order_relaxed)) {
1134  stop = true;
1135  break;
1136  }
1137  }
1138 
1139  if (fms_) {
1140  bool copy_active = false;
1141  for (auto j : tid_active_)
1142  if (j)
1143  copy_active = true;
1144  if (copy_active)
1146  else
1148  }
1149  InputChunk* newChunk = nullptr;
1150  while (!freeChunks_.try_pop(newChunk)) {
1151  usleep(100000);
1152  if (quit_threads_.load(std::memory_order_relaxed)) {
1153  stop = true;
1154  break;
1155  }
1156  }
1157 
1158  if (newChunk == nullptr) {
1159  //return unused tid if we received shutdown (nullptr chunk)
1160  if (newTid != 0xffffffff)
1161  workerPool_.push(newTid);
1162  stop = true;
1163  break;
1164  }
1165  if (stop)
1166  break;
1168 
1169  std::unique_lock<std::mutex> lk(mReader_);
1170 
1171  unsigned int toRead = eventChunkSize_;
1172  if (i == neededChunks - 1 && fileSize % eventChunkSize_)
1173  toRead = fileSize % eventChunkSize_;
1174  newChunk->reset(i * eventChunkSize_, toRead, i);
1175 
1176  workerJob_[newTid].first = newInputFilePtr;
1177  workerJob_[newTid].second = newChunk;
1178 
1179  //wake up the worker thread
1180  cvReader_[newTid]->notify_one();
1181  }
1182  } else {
1183  if (!eventsInNewFile) {
1184  if (rawFd) {
1185  close(rawFd);
1186  rawFd = -1;
1187  }
1188  //still queue file for lumi update
1189  std::unique_lock<std::mutex> lkw(mWakeup_);
1190  //TODO: also file with only file header fits in this edge case. Check if read correctly in single buffer mode
1191  std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1192  ls,
1193  rawFile,
1194  !fileListMode_,
1195  rawFd,
1196  fileSize,
1197  rawHeaderSize,
1198  (rawHeaderSize > 0),
1199  0,
1200  this));
1202  fileQueue_.push(std::move(newInputFile));
1203  cvWakeup_.notify_one();
1204  break;
1205  }
1206  //in single-buffer mode put single chunk in the file and let the main thread read the file
1207  InputChunk* newChunk = nullptr;
1208  //should be available immediately
1209  while (!freeChunks_.try_pop(newChunk)) {
1210  usleep(100000);
1211  if (quit_threads_.load(std::memory_order_relaxed)) {
1212  stop = true;
1213  break;
1214  }
1215  }
1216 
1217  if (newChunk == nullptr) {
1218  stop = true;
1219  }
1220 
1221  if (stop)
1222  break;
1223 
1224  std::unique_lock<std::mutex> lkw(mWakeup_);
1225 
1226  unsigned int toRead = eventChunkSize_;
1227  if (fileSize % eventChunkSize_)
1228  toRead = fileSize % eventChunkSize_;
1229  newChunk->reset(0, toRead, 0);
1230  newChunk->readComplete_ = true;
1231 
1232  //push file and wakeup main thread
1233  std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1234  ls,
1235  rawFile,
1236  !fileListMode_,
1237  rawFd,
1238  fileSize,
1239  rawHeaderSize,
1240  1,
1241  eventsInNewFile,
1242  this));
1243  newInputFile->chunks_[0] = newChunk;
1245  fileQueue_.push(std::move(newInputFile));
1246  cvWakeup_.notify_one();
1247  }
1248  }
1249  }
1251  //make sure threads finish reading
1252  unsigned numFinishedThreads = 0;
1253  while (numFinishedThreads < workerThreads_.size()) {
1254  unsigned tid = 0;
1255  while (!workerPool_.try_pop(tid)) {
1256  usleep(10000);
1257  }
1258  std::unique_lock<std::mutex> lk(mReader_);
1259  thread_quit_signal[tid] = true;
1260  cvReader_[tid]->notify_one();
1261  numFinishedThreads++;
1262  }
1263  for (unsigned int i = 0; i < workerThreads_.size(); i++) {
1264  workerThreads_[i]->join();
1265  delete workerThreads_[i];
1266  }
1267 }
std::condition_variable cvWakeup_
tbb::concurrent_queue< unsigned int > workerPool_
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
volatile std::atomic< bool > shutdown_flag
tbb::concurrent_queue< std::unique_ptr< InputFile > > fileQueue_
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
bool lumisectionDiscarded(unsigned int ls)
Log< level::Error, false > LogError
unsigned int numConcurrentLumis() const
int timeout
Definition: mps_check.py:53
assert(be >=bs)
std::vector< std::condition_variable * > cvReader_
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint16_t &rawDataType, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs, bool requireHeader=true)
void reset(uint64_t newOffset, uint64_t toRead, unsigned int fileIndex)
void setMonStateSup(evf::FastMonState::InputState state)
int grabNextJsonFileAndUnlock(std::filesystem::path const &jsonSourcePath)
std::vector< std::thread * > workerThreads_
std::atomic< bool > readComplete_
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, uint64_t &lockWaitTime, bool &setExceptionState)
def ls(path, rec=False)
Definition: eostools.py:349
unsigned int getStartLumisectionFromEnv() const
unsigned long long uint64_t
Definition: Time.h:13
std::condition_variable startupCv_
void stoppedLookingForFile(unsigned int lumi)
evf::EvFDaqDirector * daqDirector_
std::vector< unsigned int > thread_quit_signal
unsigned int getLumisectionToStart() const
std::atomic< unsigned int > readingFilesCount_
evf::FastMonitoringService * fms_
std::vector< unsigned int > tid_active_
tbb::concurrent_queue< InputChunk * > freeChunks_
Log< level::Warning, false > LogWarning
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile, bool requireHeader=true)
Power< A, B >::type pow(const A &a, const B &b)
Definition: Power.h:29
def move(src, dest)
Definition: eostools.py:511
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
#define LogDebug(id)

◆ readWorker()

void FedRawDataInputSource::readWorker ( unsigned int  tid)
private

Definition at line 1269 of file FedRawDataInputSource.cc.

References cms::cuda::assert(), InputChunk::buf_, cvReader_, detectedFRDversion_, change_name::diff, mps_fire::end, eventChunkBlock_, geometryDiff::file, InputChunk::fileIndex_, dqmdumpme::first, FRDHeaderMaxVersion, mps_fire::i, caHitNtupletGeneratorKernels::if(), dqmdumpme::last, LogDebug, SiStripPI::min, mReader_, submitPVValidationJobs::now, numConcurrentReads_, InputChunk::offset_, read(), readBlocks_, InputChunk::readComplete_, edm::second(), setExceptionState_, runEdmFileComparison::skipped, command_line::start, startupCv_, startupLock_, thread_quit_signal, threadInit_, tid_active_, InputChunk::usedSize_, workerJob_, and workerPool_.

Referenced by FedRawDataInputSource().

1269  {
1270  bool init = true;
1271  threadInit_.exchange(true, std::memory_order_acquire);
1272 
1273  while (true) {
1274  tid_active_[tid] = false;
1275  std::unique_lock<std::mutex> lk(mReader_);
1276  workerJob_[tid].first = nullptr;
1277  workerJob_[tid].first = nullptr;
1278 
1279  assert(!thread_quit_signal[tid]); //should never get it here
1280  workerPool_.push(tid);
1281 
1282  if (init) {
1283  std::unique_lock<std::mutex> lk(startupLock_);
1284  init = false;
1285  startupCv_.notify_one();
1286  }
1287  cvReader_[tid]->wait(lk);
1288 
1289  if (thread_quit_signal[tid])
1290  return;
1291  tid_active_[tid] = true;
1292 
1293  InputFile* file;
1294  InputChunk* chunk;
1295 
1296  assert(workerJob_[tid].first != nullptr && workerJob_[tid].second != nullptr);
1297 
1298  file = workerJob_[tid].first;
1299  chunk = workerJob_[tid].second;
1300 
1301  //skip reading initial header size in first chunk if inheriting file descriptor (already set at appropriate position)
1302  unsigned int bufferLeft = (chunk->offset_ == 0 && file->rawFd_ != -1) ? file->rawHeaderSize_ : 0;
1303 
1304  //if only one worker thread exists, use single fd for all operations
1305  //if more worker threads exist, use rawFd_ for only the first read operation and then close file
1306  int fileDescriptor;
1307  bool fileOpenedHere = false;
1308 
1309  if (numConcurrentReads_ == 1) {
1310  fileDescriptor = file->rawFd_;
1311  if (fileDescriptor == -1) {
1312  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1313  fileOpenedHere = true;
1314  file->rawFd_ = fileDescriptor;
1315  }
1316  } else {
1317  if (chunk->offset_ == 0) {
1318  fileDescriptor = file->rawFd_;
1319  file->rawFd_ = -1;
1320  if (fileDescriptor == -1) {
1321  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1322  fileOpenedHere = true;
1323  }
1324  } else {
1325  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1326  fileOpenedHere = true;
1327  }
1328  }
1329 
1330  if (fileDescriptor < 0) {
1331  edm::LogError("FedRawDataInputSource") << "readWorker failed to open file -: " << file->fileName_
1332  << " fd:" << fileDescriptor << " error: " << strerror(errno);
1333  setExceptionState_ = true;
1334  continue;
1335  }
1336  if (fileOpenedHere) { //fast forward to this chunk position
1337  off_t pos = 0;
1338  pos = lseek(fileDescriptor, chunk->offset_, SEEK_SET);
1339  if (pos == -1) {
1340  edm::LogError("FedRawDataInputSource")
1341  << "readWorker failed to seek file -: " << file->fileName_ << " fd:" << fileDescriptor << " to offset "
1342  << chunk->offset_ << " error: " << strerror(errno);
1343  setExceptionState_ = true;
1344  continue;
1345  }
1346  }
1347 
1348  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_
1349  << " at offset " << lseek(fileDescriptor, 0, SEEK_CUR);
1350 
1351  unsigned int skipped = bufferLeft;
1353  for (unsigned int i = 0; i < readBlocks_; i++) {
1354  ssize_t last;
1355 
1356  //protect against reading into next block
1357  last = ::read(fileDescriptor,
1358  (void*)(chunk->buf_ + bufferLeft),
1359  std::min(chunk->usedSize_ - bufferLeft, (uint64_t)eventChunkBlock_));
1360 
1361  if (last < 0) {
1362  edm::LogError("FedRawDataInputSource") << "readWorker failed to read file -: " << file->fileName_
1363  << " fd:" << fileDescriptor << " error: " << strerror(errno);
1364  setExceptionState_ = true;
1365  break;
1366  }
1367  if (last > 0)
1368  bufferLeft += last;
1369  if (last < eventChunkBlock_) { //last read
1370  //check if this is last block, then total read size must match file size
1371  if (!(chunk->usedSize_ - skipped == i * eventChunkBlock_ + (unsigned int)last)) {
1372  edm::LogError("FedRawDataInputSource")
1373  << "readWorker failed to read file -: " << file->fileName_ << " fd:" << fileDescriptor << " last:" << last
1374  << " expectedChunkSize:" << chunk->usedSize_
1375  << " readChunkSize:" << (skipped + i * eventChunkBlock_ + last) << " skipped:" << skipped
1376  << " block:" << (i + 1) << "/" << readBlocks_ << " error: " << strerror(errno);
1377  setExceptionState_ = true;
1378  }
1379  break;
1380  }
1381  }
1382  if (setExceptionState_)
1383  continue;
1384 
1386  auto diff = end - start;
1387  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1388  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1389  << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1390  << " GB/s)";
1391 
1392  if (chunk->offset_ + bufferLeft == file->fileSize_) { //file reading finished using same fd
1393  close(fileDescriptor);
1394  fileDescriptor = -1;
1395  if (numConcurrentReads_ == 1)
1396  file->rawFd_ = -1;
1397  }
1398  if (numConcurrentReads_ > 1 && fileDescriptor != -1)
1399  close(fileDescriptor);
1400 
1401  //detect FRD event version. Skip file Header if it exists
1402  if (detectedFRDversion_ == 0 && chunk->offset_ == 0) {
1403  detectedFRDversion_ = *((uint16_t*)(chunk->buf_ + file->rawHeaderSize_));
1404  }
1406  chunk->readComplete_ =
1407  true; //this is atomic to secure the sequential buffer fill before becoming available for processing)
1408  file->chunks_[chunk->fileIndex_] = chunk; //put the completed chunk in the file chunk vector at predetermined index
1409  }
1410 }
Definition: start.py:1
void read(edm::EventPrincipal &eventPrincipal) override
constexpr size_t FRDHeaderMaxVersion
tbb::concurrent_queue< unsigned int > workerPool_
std::vector< ReaderInfo > workerJob_
Log< level::Error, false > LogError
assert(be >=bs)
std::vector< std::condition_variable * > cvReader_
U second(std::pair< T, U > const &p)
unsigned char * buf_
unsigned int fileIndex_
std::atomic< bool > readComplete_
Definition: init.py:1
unsigned long long uint64_t
Definition: Time.h:13
std::condition_variable startupCv_
std::atomic< bool > threadInit_
std::vector< unsigned int > thread_quit_signal
std::vector< unsigned int > tid_active_
#define LogDebug(id)

◆ reportEventsThisLumiInSource()

void FedRawDataInputSource::reportEventsThisLumiInSource ( unsigned int  lumi,
unsigned int  events 
)
private

Definition at line 1579 of file FedRawDataInputSource.cc.

References events, CommonMethods::lock(), monlock_, and sourceEventsReport_.

Referenced by checkNext(), and getNextEvent().

1579  {
1580  std::lock_guard<std::mutex> lock(monlock_);
1581  auto itr = sourceEventsReport_.find(lumi);
1582  if (itr != sourceEventsReport_.end())
1583  itr->second += events;
1584  else
1586 }
std::map< unsigned int, unsigned int > sourceEventsReport_
int events

◆ rewind_()

void FedRawDataInputSource::rewind_ ( )
overrideprivatevirtual

Reimplemented from edm::InputSource.

Definition at line 763 of file FedRawDataInputSource.cc.

763 {}

◆ setMonState()

void FedRawDataInputSource::setMonState ( evf::FastMonState::InputState  state)
inlineprotected

Definition at line 1417 of file FedRawDataInputSource.cc.

References fms_, evf::FastMonitoringService::setInState(), and edm::InputSource::state().

Referenced by InputFile::advance(), checkNext(), getNextEvent(), and read().

1417  {
1418  if (fms_)
1419  fms_->setInState(state);
1420 }
ItemType state() const
Definition: InputSource.h:332
evf::FastMonitoringService * fms_
void setInState(FastMonState::InputState inputState)

◆ setMonStateSup()

void FedRawDataInputSource::setMonStateSup ( evf::FastMonState::InputState  state)
inlineprotected

Definition at line 1422 of file FedRawDataInputSource.cc.

References fms_, evf::FastMonitoringService::setInStateSup(), and edm::InputSource::state().

Referenced by readSupervisor().

1422  {
1423  if (fms_)
1425 }
void setInStateSup(FastMonState::InputState inputState)
ItemType state() const
Definition: InputSource.h:332
evf::FastMonitoringService * fms_

◆ threadError()

void FedRawDataInputSource::threadError ( )
private

Definition at line 1412 of file FedRawDataInputSource.cc.

References Exception, and quit_threads_.

Referenced by InputFile::advance(), and getNextEvent().

1412  {
1413  quit_threads_ = true;
1414  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1415 }
std::atomic< bool > quit_threads_

Friends And Related Function Documentation

◆ InputChunk

friend struct InputChunk
friend

Definition at line 44 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

◆ InputFile

friend class InputFile
friend

Definition at line 43 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

Member Data Documentation

◆ alwaysStartFromFirstLS_

const bool FedRawDataInputSource::alwaysStartFromFirstLS_
private

Definition at line 100 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

◆ bufferInputRead_

uint32_t FedRawDataInputSource::bufferInputRead_ = 0
private

Definition at line 181 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readNextChunkIntoBuffer().

◆ checkEvery_

unsigned int FedRawDataInputSource::checkEvery_ = 10
private

Definition at line 172 of file FedRawDataInputSource.h.

Referenced by read().

◆ chunkIsFree_

bool FedRawDataInputSource::chunkIsFree_ = false
private

Definition at line 144 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

◆ currentFile_

std::unique_ptr<InputFile> FedRawDataInputSource::currentFile_
private

Definition at line 143 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), read(), and ~FedRawDataInputSource().

◆ currentFileIndex_

int FedRawDataInputSource::currentFileIndex_ = -1
private

Definition at line 166 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

◆ currentLumiSection_

unsigned int FedRawDataInputSource::currentLumiSection_
private

Definition at line 123 of file FedRawDataInputSource.h.

Referenced by checkNext(), getNextEvent(), maybeOpenNewLumiSection(), and read().

◆ cvReader_

std::vector<std::condition_variable*> FedRawDataInputSource::cvReader_
private

◆ cvWakeup_

std::condition_variable FedRawDataInputSource::cvWakeup_
private

Definition at line 176 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

◆ daqDirector_

evf::EvFDaqDirector* FedRawDataInputSource::daqDirector_ = nullptr
private

◆ daqProvenanceHelper_

const edm::DaqProvenanceHelper FedRawDataInputSource::daqProvenanceHelper_
private

Definition at line 116 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and read().

◆ defPath_

std::string FedRawDataInputSource::defPath_
private

Definition at line 88 of file FedRawDataInputSource.h.

◆ detectedFRDversion_

uint16_t FedRawDataInputSource::detectedFRDversion_ = 0
private

Definition at line 142 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readWorker().

◆ event_

std::unique_ptr<FRDEventMsgView> FedRawDataInputSource::event_
private

Definition at line 118 of file FedRawDataInputSource.h.

Referenced by checkNext(), fillFEDRawDataCollection(), getNextEvent(), and read().

◆ eventChunkBlock_

unsigned int FedRawDataInputSource::eventChunkBlock_
private

◆ eventChunkSize_

unsigned int FedRawDataInputSource::eventChunkSize_
private

◆ eventID_

edm::EventID FedRawDataInputSource::eventID_
private

Definition at line 120 of file FedRawDataInputSource.h.

Referenced by read().

◆ eventRunNumber_

uint32_t FedRawDataInputSource::eventRunNumber_ = 0
private

Definition at line 124 of file FedRawDataInputSource.h.

Referenced by checkNext(), and read().

◆ eventsThisLumi_

unsigned int FedRawDataInputSource::eventsThisLumi_
private

Definition at line 128 of file FedRawDataInputSource.h.

Referenced by checkNext(), getNextEvent(), and read().

◆ eventsThisRun_

unsigned long FedRawDataInputSource::eventsThisRun_ = 0
private

Definition at line 129 of file FedRawDataInputSource.h.

◆ fileDeleteLock_

std::mutex FedRawDataInputSource::fileDeleteLock_
private

Definition at line 169 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and read().

◆ fileDescriptor_

int FedRawDataInputSource::fileDescriptor_ = -1
private

Definition at line 180 of file FedRawDataInputSource.h.

Referenced by readNextChunkIntoBuffer().

◆ fileListIndex_

unsigned int FedRawDataInputSource::fileListIndex_ = 0
private

Definition at line 109 of file FedRawDataInputSource.h.

Referenced by getFile().

◆ fileListLoopMode_

const bool FedRawDataInputSource::fileListLoopMode_
private

Definition at line 110 of file FedRawDataInputSource.h.

Referenced by checkNext(), FedRawDataInputSource(), getFile(), and read().

◆ fileListMode_

const bool FedRawDataInputSource::fileListMode_
private

◆ fileNames_

std::vector<std::string> FedRawDataInputSource::fileNames_
private

Definition at line 104 of file FedRawDataInputSource.h.

Referenced by getFile(), and initFileList().

◆ fileNamesToDelete_

std::list<std::pair<int, std::string> > FedRawDataInputSource::fileNamesToDelete_
private

Definition at line 168 of file FedRawDataInputSource.h.

◆ fileQueue_

tbb::concurrent_queue<std::unique_ptr<InputFile> > FedRawDataInputSource::fileQueue_
private

Definition at line 154 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

◆ filesToDelete_

std::list<std::pair<int, std::unique_ptr<InputFile> > > FedRawDataInputSource::filesToDelete_
private

◆ fms_

evf::FastMonitoringService* FedRawDataInputSource::fms_ = nullptr
private

◆ freeChunks_

tbb::concurrent_queue<InputChunk*> FedRawDataInputSource::freeChunks_
private

Definition at line 153 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), read(), and readSupervisor().

◆ fuOutputDir_

std::string FedRawDataInputSource::fuOutputDir_
private

Definition at line 114 of file FedRawDataInputSource.h.

◆ getLSFromFilename_

const bool FedRawDataInputSource::getLSFromFilename_
private

Definition at line 99 of file FedRawDataInputSource.h.

Referenced by checkNext(), getNextEvent(), and readSupervisor().

◆ GTPEventID_

uint32_t FedRawDataInputSource::GTPEventID_ = 0
private

Definition at line 125 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

◆ L1EventID_

uint32_t FedRawDataInputSource::L1EventID_ = 0
private

Definition at line 126 of file FedRawDataInputSource.h.

Referenced by checkNext(), and read().

◆ loopModeIterationInc_

unsigned int FedRawDataInputSource::loopModeIterationInc_ = 0
private

Definition at line 111 of file FedRawDataInputSource.h.

Referenced by getFile().

◆ maxBufferedFiles_

unsigned int FedRawDataInputSource::maxBufferedFiles_
private

Definition at line 94 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

◆ MAXTCDSuTCAFEDID_

uint16_t FedRawDataInputSource::MAXTCDSuTCAFEDID_ = FEDNumbering::MAXTCDSuTCAFEDID
private

Definition at line 132 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and fillFEDRawDataCollection().

◆ MINTCDSuTCAFEDID_

uint16_t FedRawDataInputSource::MINTCDSuTCAFEDID_ = FEDNumbering::MINTCDSuTCAFEDID
private

Definition at line 131 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and fillFEDRawDataCollection().

◆ monlock_

std::mutex FedRawDataInputSource::monlock_
private

Definition at line 186 of file FedRawDataInputSource.h.

Referenced by getEventReport(), and reportEventsThisLumiInSource().

◆ mReader_

std::mutex FedRawDataInputSource::mReader_
private

Definition at line 156 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), readWorker(), and ~FedRawDataInputSource().

◆ mWakeup_

std::mutex FedRawDataInputSource::mWakeup_
private

Definition at line 175 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

◆ nStreams_

unsigned int FedRawDataInputSource::nStreams_ = 0
private

Definition at line 171 of file FedRawDataInputSource.h.

Referenced by read().

◆ numBuffers_

unsigned int FedRawDataInputSource::numBuffers_
private

Definition at line 93 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

◆ numConcurrentReads_

unsigned int FedRawDataInputSource::numConcurrentReads_
private

◆ processHistoryID_

edm::ProcessHistoryID FedRawDataInputSource::processHistoryID_
private

Definition at line 121 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), maybeOpenNewLumiSection(), and read().

◆ quit_threads_

std::atomic<bool> FedRawDataInputSource::quit_threads_
private

◆ readBlocks_

unsigned int FedRawDataInputSource::readBlocks_
private

◆ readingFilesCount_

std::atomic<unsigned int> FedRawDataInputSource::readingFilesCount_
private

Definition at line 96 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

◆ readSupervisorThread_

std::unique_ptr<std::thread> FedRawDataInputSource::readSupervisorThread_
private

Definition at line 147 of file FedRawDataInputSource.h.

Referenced by checkNext(), and ~FedRawDataInputSource().

◆ runNumber_

edm::RunNumber_t FedRawDataInputSource::runNumber_
private

Definition at line 113 of file FedRawDataInputSource.h.

Referenced by checkNext(), and FedRawDataInputSource().

◆ setExceptionState_

bool FedRawDataInputSource::setExceptionState_ = false
private

Definition at line 162 of file FedRawDataInputSource.h.

Referenced by exceptionState(), getNextEvent(), readSupervisor(), and readWorker().

◆ singleBufferMode_

bool FedRawDataInputSource::singleBufferMode_
private

Definition at line 179 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

◆ sourceEventsReport_

std::map<unsigned int, unsigned int> FedRawDataInputSource::sourceEventsReport_
private

Definition at line 185 of file FedRawDataInputSource.h.

Referenced by getEventReport(), and reportEventsThisLumiInSource().

◆ startedSupervisorThread_

bool FedRawDataInputSource::startedSupervisorThread_ = false
private

Definition at line 146 of file FedRawDataInputSource.h.

Referenced by checkNext(), and ~FedRawDataInputSource().

◆ startupCv_

std::condition_variable FedRawDataInputSource::startupCv_
private

◆ startupLock_

std::mutex FedRawDataInputSource::startupLock_
private

◆ streamFileTracker_

std::vector<int> FedRawDataInputSource::streamFileTracker_
private

Definition at line 170 of file FedRawDataInputSource.h.

Referenced by read().

◆ tcds_pointer_

unsigned char* FedRawDataInputSource::tcds_pointer_
private

Definition at line 127 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

◆ testTCDSFEDRange_

const std::vector<unsigned int> FedRawDataInputSource::testTCDSFEDRange_
private

Definition at line 103 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

◆ thread_quit_signal

std::vector<unsigned int> FedRawDataInputSource::thread_quit_signal
private

◆ threadInit_

std::atomic<bool> FedRawDataInputSource::threadInit_
private

Definition at line 183 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and readWorker().

◆ tid_active_

std::vector<unsigned int> FedRawDataInputSource::tid_active_
private

Definition at line 158 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), readSupervisor(), and readWorker().

◆ useFileBroker_

bool FedRawDataInputSource::useFileBroker_
private

◆ useL1EventID_

const bool FedRawDataInputSource::useL1EventID_
private

Definition at line 102 of file FedRawDataInputSource.h.

Referenced by read().

◆ verifyChecksum_

const bool FedRawDataInputSource::verifyChecksum_
private

Definition at line 101 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

◆ workerJob_

std::vector<ReaderInfo> FedRawDataInputSource::workerJob_
private

Definition at line 151 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), readSupervisor(), and readWorker().

◆ workerPool_

tbb::concurrent_queue<unsigned int> FedRawDataInputSource::workerPool_
private

Definition at line 150 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), and readWorker().

◆ workerThreads_

std::vector<std::thread*> FedRawDataInputSource::workerThreads_
private