CMS 3D CMS Logo

List of all members | Public Member Functions | Static Public Member Functions | Protected Member Functions | Private Types | Private Member Functions | Private Attributes | Friends
FedRawDataInputSource Class Reference

#include <FedRawDataInputSource.h>

Inheritance diagram for FedRawDataInputSource:
edm::RawInputSource edm::InputSource

Public Member Functions

 FedRawDataInputSource (edm::ParameterSet const &, edm::InputSourceDescription const &)
 
std::pair< bool, unsigned int > getEventReport (unsigned int lumi, bool erase)
 
 ~FedRawDataInputSource () override
 
- Public Member Functions inherited from edm::RawInputSource
 RawInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
 
 ~RawInputSource () override
 
- Public Member Functions inherited from edm::InputSource
std::shared_ptr< ActivityRegistryactReg () const
 Accessor for Activity Registry. More...
 
std::shared_ptr< BranchIDListHelper const > branchIDListHelper () const
 Accessors for branchIDListHelper. More...
 
std::shared_ptr< BranchIDListHelper > & branchIDListHelper ()
 
void closeFile (FileBlock *, bool cleaningUpAfterException)
 close current file More...
 
void doBeginJob ()
 Called by framework at beginning of job. More...
 
virtual void doBeginLumi (LuminosityBlockPrincipal &lbp, ProcessContext const *)
 Called by framework at beginning of lumi block. More...
 
virtual void doBeginRun (RunPrincipal &rp, ProcessContext const *)
 Called by framework at beginning of run. More...
 
void doEndJob ()
 Called by framework at end of job. More...
 
void fillProcessBlockHelper ()
 Fill the ProcessBlockHelper with info for the current file. More...
 
ProcessingController::ForwardState forwardState () const
 
bool goToEvent (EventID const &eventID)
 
 InputSource (ParameterSet const &, InputSourceDescription const &)
 Constructor. More...
 
 InputSource (InputSource const &)=delete
 
void issueReports (EventID const &eventID, StreamID streamID)
 issue an event report More...
 
LuminosityBlockNumber_t luminosityBlock () const
 Accessor for current luminosity block number. More...
 
std::shared_ptr< LuminosityBlockAuxiliaryluminosityBlockAuxiliary () const
 Called by the framework to merge or insert lumi in principal cache. More...
 
int maxEvents () const
 
int maxLuminosityBlocks () const
 
ModuleDescription const & moduleDescription () const
 Accessor for 'module' description. More...
 
ItemType nextItemType ()
 Advances the source to the next item. More...
 
bool nextProcessBlock (ProcessBlockPrincipal &)
 Next process block, return false if there is none, sets the processName in the principal. More...
 
InputSourceoperator= (InputSource const &)=delete
 
std::shared_ptr< ProcessBlockHelper const > processBlockHelper () const
 Accessors for processBlockHelper. More...
 
std::shared_ptr< ProcessBlockHelper > & processBlockHelper ()
 
ProcessConfiguration const & processConfiguration () const
 Accessor for Process Configuration. More...
 
std::string const & processGUID () const
 Accessor for global process identifier. More...
 
ProcessHistoryRegistry const & processHistoryRegistry () const
 Accessors for process history registry. More...
 
ProcessHistoryRegistryprocessHistoryRegistry ()
 
ProcessingMode processingMode () const
 RunsLumisAndEvents (default), RunsAndLumis, or Runs. More...
 
std::shared_ptr< ProductRegistry const > productRegistry () const
 Accessors for product registry. More...
 
bool randomAccess () const
 
void readAndMergeLumi (LuminosityBlockPrincipal &lbp)
 Read next luminosity block (same as a prior lumi) More...
 
void readAndMergeRun (RunPrincipal &rp)
 Read next run (same as a prior run) More...
 
void readEvent (EventPrincipal &ep, StreamContext &)
 Read next event. More...
 
bool readEvent (EventPrincipal &ep, EventID const &, StreamContext &)
 Read a specific event. More...
 
std::shared_ptr< FileBlockreadFile ()
 Read next file. More...
 
void readLuminosityBlock (LuminosityBlockPrincipal &lumiPrincipal, HistoryAppender &historyAppender)
 Read next luminosity block (new lumi) More...
 
std::shared_ptr< LuminosityBlockAuxiliaryreadLuminosityBlockAuxiliary ()
 Read next luminosity block Auxilary. More...
 
void readProcessBlock (ProcessBlockPrincipal &)
 Read next process block. More...
 
void readRun (RunPrincipal &runPrincipal, HistoryAppender &historyAppender)
 Read next run (new run) More...
 
std::shared_ptr< RunAuxiliaryreadRunAuxiliary ()
 Read next run Auxiliary. More...
 
ProcessHistoryID const & reducedProcessHistoryID () const
 
virtual void registerProducts ()
 Register any produced products. More...
 
int remainingEvents () const
 
int remainingLuminosityBlocks () const
 
void repeat ()
 Reset the remaining number of events/lumis to the maximum number. More...
 
std::pair< SharedResourcesAcquirer *, std::recursive_mutex * > resourceSharedWithDelayedReader ()
 Returns nullptr if no resource shared between the Source and a DelayedReader. More...
 
ProcessingController::ReverseState reverseState () const
 
void rewind ()
 Begin again at the first event. More...
 
RunNumber_t run () const
 Accessor for current run number. More...
 
std::shared_ptr< RunAuxiliaryrunAuxiliary () const
 Called by the framework to merge or insert run in principal cache. More...
 
void setLuminosityBlockNumber_t (LuminosityBlockNumber_t lb)
 Set the luminosity block ID. More...
 
void setRunNumber (RunNumber_t r)
 Set the run number. More...
 
void skipEvents (int offset)
 
void switchTo (std::shared_ptr< ProductRegistry > iOther)
 switch to a different ProductRegistry. More...
 
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper () const
 Accessors for thinnedAssociationsHelper. More...
 
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper ()
 
Timestamp const & timestamp () const
 Accessor for the current time, as seen by the input source. More...
 
virtual ~InputSource () noexcept(false)
 Destructor. More...
 

Static Public Member Functions

static void fillDescriptions (edm::ConfigurationDescriptions &descriptions)
 
- Static Public Member Functions inherited from edm::RawInputSource
static void fillDescription (ParameterSetDescription &description)
 
- Static Public Member Functions inherited from edm::InputSource
static const std::string & baseType ()
 
static void fillDescription (ParameterSetDescription &desc)
 
static void fillDescriptions (ConfigurationDescriptions &descriptions)
 
static void prevalidate (ConfigurationDescriptions &)
 

Protected Member Functions

Next checkNext () override
 
void read (edm::EventPrincipal &eventPrincipal) override
 
void setMonState (evf::FastMonState::InputState state)
 
void setMonStateSup (evf::FastMonState::InputState state)
 
- Protected Member Functions inherited from edm::RawInputSource
void makeEvent (EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
 
void setInputFileTransitionsEachEvent ()
 
- Protected Member Functions inherited from edm::InputSource
virtual void beginJob ()
 Begin protected makes it easier to do template programming. More...
 
void decreaseRemainingEventsBy (int iSkipped)
 
bool eventCached () const
 
bool newLumi () const
 
bool newRun () const
 
ProcessHistoryRegistryprocessHistoryRegistryForUpdate ()
 
ProductRegistryproductRegistryUpdate ()
 
void reset () const
 
void resetEventCached ()
 
void resetLuminosityBlockAuxiliary (bool isNewLumi=true) const
 
void resetNewLumi ()
 
void resetNewRun ()
 
void resetRunAuxiliary (bool isNewRun=true) const
 
void setEventCached ()
 Called by the framework to merge or ached() const {return eventCached_;}. More...
 
void setLuminosityBlockAuxiliary (LuminosityBlockAuxiliary *lbp)
 
void setNewLumi ()
 
void setNewRun ()
 
void setRunAuxiliary (RunAuxiliary *rp)
 
void setTimestamp (Timestamp const &theTime)
 To set the current time, as seen by the input source. More...
 
virtual void skip (int offset)
 
ItemType state () const
 

Private Types

typedef std::pair< InputFile *, InputChunk * > ReaderInfo
 

Private Member Functions

bool exceptionState ()
 
edm::Timestamp fillFEDRawDataCollection (FEDRawDataCollection &rawData, bool &tcdsInRange)
 
evf::EvFDaqDirector::FileStatus getFile (unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
 
evf::EvFDaqDirector::FileStatus getNextEvent ()
 
long initFileList ()
 
void maybeOpenNewLumiSection (const uint32_t lumiSection)
 
evf::EvFDaqDirector::FileStatus nextEvent ()
 
void readNextChunkIntoBuffer (InputFile *file)
 
void readSupervisor ()
 
void readWorker (unsigned int tid)
 
void reportEventsThisLumiInSource (unsigned int lumi, unsigned int events)
 
void rewind_ () override
 
void threadError ()
 

Private Attributes

const bool alwaysStartFromFirstLS_
 
uint32_t bufferInputRead_ = 0
 
unsigned int checkEvery_ = 10
 
bool chunkIsFree_ = false
 
std::unique_ptr< InputFilecurrentFile_
 
int currentFileIndex_ = -1
 
unsigned int currentLumiSection_
 
std::vector< std::unique_ptr< std::condition_variable > > cvReader_
 
std::condition_variable cvWakeup_
 
evf::EvFDaqDirectordaqDirector_ = nullptr
 
const edm::DaqProvenanceHelper daqProvenanceHelper_
 
std::string defPath_
 
uint16_t detectedFRDversion_ = 0
 
std::unique_ptr< FRDEventMsgViewevent_
 
unsigned int eventChunkBlock_
 
unsigned int eventChunkSize_
 
edm::EventID eventID_
 
uint32_t eventRunNumber_ = 0
 
unsigned int eventsThisLumi_
 
unsigned long eventsThisRun_ = 0
 
std::mutex fileDeleteLock_
 
int fileDescriptor_ = -1
 
unsigned int fileListIndex_ = 0
 
const bool fileListLoopMode_
 
const bool fileListMode_
 
std::vector< std::string > fileNames_
 
std::list< std::pair< int, std::string > > fileNamesToDelete_
 
tbb::concurrent_queue< std::unique_ptr< InputFile > > fileQueue_
 
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
 
evf::FastMonitoringServicefms_ = nullptr
 
tbb::concurrent_queue< InputChunk * > freeChunks_
 
std::string fuOutputDir_
 
const bool getLSFromFilename_
 
uint32_t GTPEventID_ = 0
 
uint32_t L1EventID_ = 0
 
unsigned int loopModeIterationInc_ = 0
 
unsigned int maxBufferedFiles_
 
uint16_t MAXTCDSuTCAFEDID_ = FEDNumbering::MAXTCDSuTCAFEDID
 
uint16_t MINTCDSuTCAFEDID_ = FEDNumbering::MINTCDSuTCAFEDID
 
std::mutex monlock_
 
std::mutex mReader_
 
std::mutex mWakeup_
 
unsigned int numBuffers_
 
unsigned int numConcurrentReads_
 
edm::ProcessHistoryID processHistoryID_
 
std::atomic< bool > quit_threads_
 
unsigned int readBlocks_
 
std::atomic< unsigned int > readingFilesCount_
 
std::unique_ptr< std::thread > readSupervisorThread_
 
edm::RunNumber_t runNumber_
 
bool setExceptionState_ = false
 
bool singleBufferMode_
 
std::map< unsigned int, unsigned int > sourceEventsReport_
 
bool startedSupervisorThread_ = false
 
std::condition_variable startupCv_
 
std::mutex startupLock_
 
std::vector< int > streamFileTracker_
 
unsigned char * tcds_pointer_
 
const std::vector< unsigned int > testTCDSFEDRange_
 
std::vector< unsigned int > thread_quit_signal
 
std::atomic< bool > threadInit_
 
std::vector< unsigned int > tid_active_
 
bool useFileBroker_
 
const bool useL1EventID_
 
const bool verifyChecksum_
 
std::vector< ReaderInfoworkerJob_
 
tbb::concurrent_queue< unsigned int > workerPool_
 
std::vector< std::thread * > workerThreads_
 

Friends

struct InputChunk
 
class InputFile
 

Additional Inherited Members

- Public Types inherited from edm::RawInputSource
enum  Next { Next::kEvent, Next::kFile, Next::kStop }
 
- Public Types inherited from edm::InputSource
enum  ItemType {
  IsInvalid, IsStop, IsFile, IsRun,
  IsLumi, IsEvent, IsRepeat, IsSynchronize
}
 
enum  ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents }
 
- Public Attributes inherited from edm::InputSource
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> postEventReadFromSourceSignal_
 
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> preEventReadFromSourceSignal_
 

Detailed Description

Definition at line 42 of file FedRawDataInputSource.h.

Member Typedef Documentation

◆ ReaderInfo

Definition at line 140 of file FedRawDataInputSource.h.

Constructor & Destructor Documentation

◆ FedRawDataInputSource()

FedRawDataInputSource::FedRawDataInputSource ( edm::ParameterSet const &  pset,
edm::InputSourceDescription const &  desc 
)
explicit

Definition at line 51 of file FedRawDataInputSource.cc.

References cms::cuda::assert(), edm::Timestamp::beginOfTime(), crc32c_hw_test(), cvReader_, daqDirector_, edm::DaqProvenanceHelper::daqInit(), daqProvenanceHelper_, eventChunkBlock_, eventChunkSize_, Exception, fileDeleteLock_, fileListLoopMode_, fileListMode_, filesToDelete_, fms_, freeChunks_, mps_fire::i, evf::FastMonState::inInit, initFileList(), InputChunk, edm::Timestamp::invalidTimestamp(), MAXTCDSuTCAFEDID_, MINTCDSuTCAFEDID_, numBuffers_, numConcurrentReads_, Utilities::operator, processHistoryID_, edm::InputSource::processHistoryRegistryForUpdate(), edm::InputSource::productRegistryUpdate(), quit_threads_, readBlocks_, readingFilesCount_, readWorker(), edm::InputSource::runAuxiliary(), runNumber_, evf::EvFDaqDirector::setDeleteTracking(), evf::EvFDaqDirector::setFMS(), evf::FastMonitoringService::setInputSource(), evf::FastMonitoringService::setInState(), evf::FastMonitoringService::setInStateSup(), edm::InputSource::setNewRun(), edm::InputSource::setRunAuxiliary(), singleBufferMode_, startupCv_, startupLock_, testTCDSFEDRange_, thread_quit_signal, tid_active_, evf::EvFDaqDirector::useFileBroker(), useFileBroker_, workerJob_, and workerThreads_.

53  defPath_(pset.getUntrackedParameter<std::string>("buDefPath", "")),
54  eventChunkSize_(pset.getUntrackedParameter<unsigned int>("eventChunkSize", 32) * 1048576),
55  eventChunkBlock_(pset.getUntrackedParameter<unsigned int>("eventChunkBlock", 32) * 1048576),
56  numBuffers_(pset.getUntrackedParameter<unsigned int>("numBuffers", 2)),
57  maxBufferedFiles_(pset.getUntrackedParameter<unsigned int>("maxBufferedFiles", 2)),
58  getLSFromFilename_(pset.getUntrackedParameter<bool>("getLSFromFilename", true)),
59  alwaysStartFromFirstLS_(pset.getUntrackedParameter<bool>("alwaysStartFromFirstLS", false)),
60  verifyChecksum_(pset.getUntrackedParameter<bool>("verifyChecksum", true)),
61  useL1EventID_(pset.getUntrackedParameter<bool>("useL1EventID", false)),
63  pset.getUntrackedParameter<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())),
64  fileNames_(pset.getUntrackedParameter<std::vector<std::string>>("fileNames", std::vector<std::string>())),
65  fileListMode_(pset.getUntrackedParameter<bool>("fileListMode", false)),
66  fileListLoopMode_(pset.getUntrackedParameter<bool>("fileListLoopMode", false)),
69  eventID_(),
72  tcds_pointer_(nullptr),
73  eventsThisLumi_(0) {
74  char thishost[256];
75  gethostname(thishost, 255);
76  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: " << std::endl
77  << (eventChunkSize_ / 1048576) << " MB on host " << thishost;
78 
79  if (!testTCDSFEDRange_.empty()) {
80  if (testTCDSFEDRange_.size() != 2) {
81  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection")
82  << "Invalid TCDS Test FED range parameter";
83  }
86  }
87 
88  long autoRunNumber = -1;
89  if (fileListMode_) {
90  autoRunNumber = initFileList();
91  if (!fileListLoopMode_) {
92  if (autoRunNumber < 0)
93  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") << "Run number not found from filename";
94  //override run number
95  runNumber_ = (edm::RunNumber_t)autoRunNumber;
96  edm::Service<evf::EvFDaqDirector>()->overrideRunNumber((unsigned int)autoRunNumber);
97  }
98  }
99 
101  setNewRun();
102  //todo:autodetect from file name (assert if names differ)
104 
105  //make sure that chunk size is N * block size
110 
111  if (!numBuffers_)
112  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource")
113  << "no reading enabled with numBuffers parameter 0";
114 
117  readingFilesCount_ = 0;
118 
119  if (!crc32c_hw_test())
120  edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
121 
122  //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
123  if (fileListMode_) {
124  try {
126  } catch (cms::Exception const&) {
127  edm::LogInfo("FedRawDataInputSource") << "No FastMonitoringService found in the configuration";
128  }
129  } else {
131  if (!fms_) {
132  throw cms::Exception("FedRawDataInputSource") << "FastMonitoringService not found";
133  }
134  }
135 
137  if (!daqDirector_)
138  cms::Exception("FedRawDataInputSource") << "EvFDaqDirector not found";
139 
141  if (useFileBroker_)
142  edm::LogInfo("FedRawDataInputSource") << "EvFDaqDirector/Source configured to use file service";
143  //set DaqDirector to delete files in preGlobalEndLumi callback
145  if (fms_) {
147  fms_->setInputSource(this);
150  }
151  //should delete chunks when run stops
152  for (unsigned int i = 0; i < numBuffers_; i++) {
154  }
155 
156  quit_threads_ = false;
157 
158  //prepare data shared by threads
159  for (unsigned int i = 0; i < numConcurrentReads_; i++) {
160  thread_quit_signal.push_back(false);
161  workerJob_.push_back(ReaderInfo(nullptr, nullptr));
162  cvReader_.push_back(std::make_unique<std::condition_variable>());
163  tid_active_.push_back(0);
164  }
165 
166  //start threads
167  for (unsigned int i = 0; i < numConcurrentReads_; i++) {
168  //wait for each thread to complete initialization
169  std::unique_lock<std::mutex> lk(startupLock_);
170  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker, this, i));
171  startupCv_.wait(lk);
172  }
173 
174  runAuxiliary()->setProcessHistoryID(processHistoryID_);
175 }
std::vector< std::string > fileNames_
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:232
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:330
static Timestamp invalidTimestamp()
Definition: Timestamp.h:75
bool crc32c_hw_test()
Definition: crc32c.cc:354
bool useFileBroker() const
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
std::pair< InputFile *, InputChunk * > ReaderInfo
assert(be >=bs)
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
static Timestamp beginOfTime()
Definition: Timestamp.h:77
const std::vector< unsigned int > testTCDSFEDRange_
edm::ProcessHistoryID processHistoryID_
const edm::DaqProvenanceHelper daqProvenanceHelper_
std::vector< std::thread * > workerThreads_
void setInputSource(FedRawDataInputSource *inputSource)
Log< level::Info, false > LogInfo
def getRunNumber(filename)
void setInStateSup(FastMonState::InputState inputState)
std::condition_variable startupCv_
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:331
evf::EvFDaqDirector * daqDirector_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:333
void readWorker(unsigned int tid)
std::vector< unsigned int > thread_quit_signal
std::atomic< unsigned int > readingFilesCount_
std::vector< std::unique_ptr< std::condition_variable > > cvReader_
evf::FastMonitoringService * fms_
std::vector< unsigned int > tid_active_
unsigned int RunNumber_t
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
tbb::concurrent_queue< InputChunk * > freeChunks_
void setFMS(evf::FastMonitoringService *fms)
void setInState(FastMonState::InputState inputState)

◆ ~FedRawDataInputSource()

FedRawDataInputSource::~FedRawDataInputSource ( )
override

Definition at line 177 of file FedRawDataInputSource.cc.

References currentFile_, cvReader_, evf::FastMonitoringService::exceptionDetected(), filesToDelete_, fms_, mps_fire::i, evf::FastMonitoringService::isExceptionOnData(), mReader_, quit_threads_, readSupervisorThread_, startedSupervisorThread_, thread_quit_signal, and workerThreads_.

177  {
178  quit_threads_ = true;
179 
180  //delete any remaining open files
181  if (!fms_ || !fms_->exceptionDetected()) {
182  for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++)
183  it->second.reset();
184  } else {
185  //skip deleting files with exception
186  for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++) {
187  //it->second->unsetDeleteFile();
188  if (fms_->isExceptionOnData(it->second->lumi_))
189  it->second->unsetDeleteFile();
190  else
191  it->second.reset();
192  }
193  //disable deleting current file with exception
194  if (currentFile_.get())
195  if (fms_->isExceptionOnData(currentFile_->lumi_))
196  currentFile_->unsetDeleteFile();
197  }
198 
200  readSupervisorThread_->join();
201  } else {
202  //join aux threads in case the supervisor thread was not started
203  for (unsigned int i = 0; i < workerThreads_.size(); i++) {
204  std::unique_lock<std::mutex> lk(mReader_);
205  thread_quit_signal[i] = true;
206  cvReader_[i]->notify_one();
207  lk.unlock();
208  workerThreads_[i]->join();
209  delete workerThreads_[i];
210  }
211  }
212 }
std::atomic< bool > quit_threads_
std::unique_ptr< std::thread > readSupervisorThread_
bool isExceptionOnData(unsigned int ls)
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
std::vector< std::thread * > workerThreads_
std::vector< unsigned int > thread_quit_signal
std::vector< std::unique_ptr< std::condition_variable > > cvReader_
evf::FastMonitoringService * fms_
std::unique_ptr< InputFile > currentFile_

Member Function Documentation

◆ checkNext()

edm::RawInputSource::Next FedRawDataInputSource::checkNext ( )
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 239 of file FedRawDataInputSource.cc.

References visDQMUpload::buf, evf::EvFDaqDirector::createProcessingNotificationMaybe(), currentLumiSection_, daqDirector_, event_, eventRunNumber_, eventsThisLumi_, fileListLoopMode_, fileListMode_, newFWLiteAna::found, evf::EvFDaqDirector::getEoLSFilePathOnBU(), evf::EvFDaqDirector::getEoLSFilePathOnFU(), evf::EvFDaqDirector::getEoRFilePathOnFU(), getLSFromFilename_, evf::FastMonState::inWaitInput, edm::RawInputSource::kEvent, edm::RawInputSource::kStop, L1EventID_, evf::EvFDaqDirector::lockFULocal2(), maybeOpenNewLumiSection(), evf::EvFDaqDirector::newLumi, nextEvent(), evf::EvFDaqDirector::noFile, readSupervisor(), readSupervisorThread_, reportEventsThisLumiInSource(), edm::InputSource::resetLuminosityBlockAuxiliary(), evf::EvFDaqDirector::runEnded, runNumber_, edm::InputSource::setEventCached(), setMonState(), startedSupervisorThread_, startupCv_, startupLock_, edm_modernize_messagelogger::stat, AlCaHLTBitMon_QueryRunRegistry::string, evf::EvFDaqDirector::unlockFULocal2(), and useFileBroker_.

239  {
241  //this thread opens new files and dispatches reading to worker readers
242  std::unique_lock<std::mutex> lk(startupLock_);
243  readSupervisorThread_ = std::make_unique<std::thread>(&FedRawDataInputSource::readSupervisor, this);
245  startupCv_.wait(lk);
246  }
247  //signal hltd to start event accounting
248  if (!currentLumiSection_)
251  switch (nextEvent()) {
253  //maybe create EoL file in working directory before ending run
254  struct stat buf;
255  if (!useFileBroker_ && currentLumiSection_ > 0) {
256  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
257  if (eolFound) {
259  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
260  if (!found) {
262  int eol_fd =
263  open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
264  close(eol_fd);
266  }
267  }
268  }
269  //also create EoR file in FU data directory
270  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(), &buf) == 0);
271  if (!eorFound) {
272  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(),
273  O_RDWR | O_CREAT,
274  S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
275  close(eor_fd);
276  }
278  eventsThisLumi_ = 0;
280  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
281  return Next::kStop;
282  }
284  //this is not reachable
285  return Next::kEvent;
286  }
288  //std::cout << "--------------NEW LUMI---------------" << std::endl;
289  return Next::kEvent;
290  }
291  default: {
292  if (!getLSFromFilename_) {
293  //get new lumi from file header
294  if (event_->lumi() > currentLumiSection_) {
296  eventsThisLumi_ = 0;
298  }
299  }
302  else
303  eventRunNumber_ = event_->run();
304  L1EventID_ = event_->event();
305 
306  setEventCached();
307 
308  return Next::kEvent;
309  }
310  }
311 }
void maybeOpenNewLumiSection(const uint32_t lumiSection)
std::string getEoLSFilePathOnFU(const unsigned int ls) const
std::unique_ptr< std::thread > readSupervisorThread_
std::string getEoRFilePathOnFU() const
std::string getEoLSFilePathOnBU(const unsigned int ls) const
void setMonState(evf::FastMonState::InputState state)
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:362
void createProcessingNotificationMaybe() const
Log< level::Info, false > LogInfo
std::unique_ptr< FRDEventMsgView > event_
std::condition_variable startupCv_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:345
evf::EvFDaqDirector * daqDirector_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
evf::EvFDaqDirector::FileStatus nextEvent()

◆ exceptionState()

bool FedRawDataInputSource::exceptionState ( )
inlineprivate

Definition at line 70 of file FedRawDataInputSource.h.

References setExceptionState_.

Referenced by InputFile::advance().

◆ fillDescriptions()

void FedRawDataInputSource::fillDescriptions ( edm::ConfigurationDescriptions descriptions)
static

Definition at line 214 of file FedRawDataInputSource.cc.

References edm::ConfigurationDescriptions::add(), and submitPVResolutionJobs::desc.

214  {
216  desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk");
217  desc.addUntracked<unsigned int>("eventChunkSize", 32)->setComment("Input buffer (chunk) size");
218  desc.addUntracked<unsigned int>("eventChunkBlock", 32)
219  ->setComment("Block size used in a single file read call (must be smaller or equal to buffer size)");
220  desc.addUntracked<unsigned int>("numBuffers", 2)->setComment("Number of buffers used for reading input");
221  desc.addUntracked<unsigned int>("maxBufferedFiles", 2)
222  ->setComment("Maximum number of simultaneously buffered raw files");
223  desc.addUntracked<unsigned int>("alwaysStartFromfirstLS", false)
224  ->setComment("Force source to start from LS 1 if server provides higher lumisection number");
225  desc.addUntracked<bool>("verifyChecksum", true)
226  ->setComment("Verify event CRC-32C checksum of FRDv5 and higher or Adler32 with v3 and v4");
227  desc.addUntracked<bool>("useL1EventID", false)
228  ->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
229  desc.addUntracked<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())
230  ->setComment("[min, max] range to search for TCDS FED ID in test setup");
231  desc.addUntracked<bool>("fileListMode", false)
232  ->setComment("Use fileNames parameter to directly specify raw files to open");
233  desc.addUntracked<std::vector<std::string>>("fileNames", std::vector<std::string>())
234  ->setComment("file list used when fileListMode is enabled");
235  desc.setAllowAnything();
236  descriptions.add("source", desc);
237 }
void add(std::string const &label, ParameterSetDescription const &psetDescription)

◆ fillFEDRawDataCollection()

edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection ( FEDRawDataCollection rawData,
bool &  tcdsInRange 
)
private

Definition at line 708 of file FedRawDataInputSource.cc.

References cms::cuda::assert(), FEDRawData::data(), event_, evf::evtn::evm_board_sense(), Exception, l1tstage2_dqm_sourceclient-live_cfg::fedId, FEDTrailer::fragmentLength(), evf::evtn::get(), evf::evtn::getgpshigh(), evf::evtn::getgpslow(), evf::evtn::gtpe_board_sense(), evf::evtn::gtpe_get(), GTPEventID_, FEDHeader::length, FEDTrailer::length, FEDNumbering::MAXFEDID, FEDNumbering::MAXTCDSuTCAFEDID, MAXTCDSuTCAFEDID_, FEDNumbering::MINTCDSuTCAFEDID, MINTCDSuTCAFEDID_, FEDNumbering::MINTriggerEGTPFEDID, FEDNumbering::MINTriggerGTPFEDID, l1tstage2_dqm_sourceclient-live_cfg::rawData, FEDRawData::resize(), FEDHeader::sourceID(), tcds_pointer_, and hcalRecHitTable_cff::time.

Referenced by read().

708  {
710  timeval stv;
711  gettimeofday(&stv, nullptr);
712  time = stv.tv_sec;
713  time = (time << 32) + stv.tv_usec;
714  edm::Timestamp tstamp(time);
715 
716  uint32_t eventSize = event_->eventSize();
717  unsigned char* event = (unsigned char*)event_->payload();
718  GTPEventID_ = 0;
719  tcds_pointer_ = nullptr;
720  tcdsInRange = false;
721  uint16_t selectedTCDSFed = 0;
722  while (eventSize > 0) {
723  assert(eventSize >= FEDTrailer::length);
724  eventSize -= FEDTrailer::length;
725  const FEDTrailer fedTrailer(event + eventSize);
726  const uint32_t fedSize = fedTrailer.fragmentLength() << 3; //trailer length counts in 8 bytes
727  assert(eventSize >= fedSize - FEDHeader::length);
728  eventSize -= (fedSize - FEDHeader::length);
729  const FEDHeader fedHeader(event + eventSize);
730  const uint16_t fedId = fedHeader.sourceID();
732  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
733  } else if (fedId >= MINTCDSuTCAFEDID_ && fedId <= MAXTCDSuTCAFEDID_) {
734  if (!selectedTCDSFed) {
735  selectedTCDSFed = fedId;
736  tcds_pointer_ = event + eventSize;
738  tcdsInRange = true;
739  }
740  } else
741  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection")
742  << "Second TCDS FED ID " << fedId << " found. First ID: " << selectedTCDSFed;
743  }
745  if (evf::evtn::evm_board_sense(event + eventSize, fedSize))
746  GTPEventID_ = evf::evtn::get(event + eventSize, true);
747  else
748  GTPEventID_ = evf::evtn::get(event + eventSize, false);
749  //evf::evtn::evm_board_setformat(fedSize);
750  const uint64_t gpsl = evf::evtn::getgpslow(event + eventSize);
751  const uint64_t gpsh = evf::evtn::getgpshigh(event + eventSize);
752  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t>((gpsh << 32) + gpsl));
753  }
754  //take event ID from GTPE FED
756  if (evf::evtn::gtpe_board_sense(event + eventSize)) {
757  GTPEventID_ = evf::evtn::gtpe_get(event + eventSize);
758  }
759  }
760  FEDRawData& fedData = rawData.FEDData(fedId);
761  fedData.resize(fedSize);
762  memcpy(fedData.data(), event + eventSize, fedSize);
763  }
764  assert(eventSize == 0);
765 
766  return tstamp;
767 }
unsigned int getgpshigh(const unsigned char *)
bool gtpe_board_sense(const unsigned char *p)
static const uint32_t length
Definition: FEDTrailer.h:57
unsigned int get(const unsigned char *, bool)
static const uint32_t length
Definition: FEDHeader.h:54
assert(be >=bs)
void resize(size_t newsize)
Definition: FEDRawData.cc:28
unsigned long long TimeValue_t
Definition: Timestamp.h:21
bool evm_board_sense(const unsigned char *p, size_t size)
unsigned long long uint64_t
Definition: Time.h:13
std::unique_ptr< FRDEventMsgView > event_
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:24
unsigned int gtpe_get(const unsigned char *)
unsigned int getgpslow(const unsigned char *)
Definition: event.py:1

◆ getEventReport()

std::pair< bool, unsigned int > FedRawDataInputSource::getEventReport ( unsigned int  lumi,
bool  erase 
)

Definition at line 1598 of file FedRawDataInputSource.cc.

References CommonMethods::lock(), monlock_, runTheMatrix::ret, and sourceEventsReport_.

Referenced by evf::FastMonitoringService::preGlobalEndLumi().

1598  {
1599  std::lock_guard<std::mutex> lock(monlock_);
1600  auto itr = sourceEventsReport_.find(lumi);
1601  if (itr != sourceEventsReport_.end()) {
1602  std::pair<bool, unsigned int> ret(true, itr->second);
1603  if (erase)
1604  sourceEventsReport_.erase(itr);
1605  return ret;
1606  } else
1607  return std::pair<bool, unsigned int>(false, 0);
1608 }
ret
prodAgent to be discontinued
std::map< unsigned int, unsigned int > sourceEventsReport_

◆ getFile()

evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getFile ( unsigned int &  ls,
std::string &  nextFile,
uint32_t &  fsize,
uint64_t &  lockWaitTime 
)
private

Definition at line 1645 of file FedRawDataInputSource.cc.

References fileListIndex_, fileListLoopMode_, MillePedeFileConverter_cfg::fileName, fileNames_, loopModeIterationInc_, eostools::ls(), evf::EvFDaqDirector::newFile, castor_dqm_sourceclient_file_cfg::path, evf::EvFDaqDirector::runEnded, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by readSupervisor().

1648  {
1649  if (fileListIndex_ < fileNames_.size()) {
1650  nextFile = fileNames_[fileListIndex_];
1651  if (nextFile.find("file://") == 0)
1652  nextFile = nextFile.substr(7);
1653  else if (nextFile.find("file:") == 0)
1654  nextFile = nextFile.substr(5);
1655  std::filesystem::path fileName = nextFile;
1656  std::string fileStem = fileName.stem().string();
1657  if (fileStem.find("ls"))
1658  fileStem = fileStem.substr(fileStem.find("ls") + 2);
1659  if (fileStem.find('_'))
1660  fileStem = fileStem.substr(0, fileStem.find('_'));
1661 
1662  if (!fileListLoopMode_)
1663  ls = std::stoul(fileStem);
1664  else //always starting from LS 1 in loop mode
1665  ls = 1 + loopModeIterationInc_;
1666 
1667  //fsize = 0;
1668  //lockWaitTime = 0;
1669  fileListIndex_++;
1671  } else {
1672  if (!fileListLoopMode_)
1674  else {
1675  //loop through files until interrupted
1677  fileListIndex_ = 0;
1678  return getFile(ls, nextFile, fsize, lockWaitTime);
1679  }
1680  }
1681 }
std::vector< std::string > fileNames_
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
def ls(path, rec=False)
Definition: eostools.py:349

◆ getNextEvent()

evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent ( )
inlineprivate

Definition at line 359 of file FedRawDataInputSource.cc.

References cms::cuda::assert(), bufferInputRead_, chunkIsFree_, crc32c(), currentFile_, currentFileIndex_, currentLumiSection_, cvWakeup_, daqDirector_, detectedFRDversion_, event_, eventChunkSize_, eventsThisLumi_, Exception, fileDeleteLock_, fileListMode_, fileQueue_, filesToDelete_, fms_, FRDHeaderMaxVersion, FRDHeaderVersionSize, freeChunks_, getLSFromFilename_, evf::FastMonState::inCachedEvent, evf::FastMonState::inChecksumEvent, evf::FastMonState::inChunkReceived, evf::FastMonState::inNewLumi, evf::FastMonState::inProcessingFile, evf::FastMonState::inRunEnd, evf::FastMonState::inWaitChunk, evf::FastMonState::inWaitInput, evf::EvFDaqDirector::isSingleStreamThread(), dttmaxenums::L, maybeOpenNewLumiSection(), eostools::move(), mWakeup_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, readingFilesCount_, readNextChunkIntoBuffer(), reportEventsThisLumiInSource(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, evf::EvFDaqDirector::sameFile, evf::FastMonitoringService::setExceptionDetected(), setExceptionState_, setMonState(), singleBufferMode_, mps_update::status, threadError(), mps_check::timeout, and verifyChecksum_.

Referenced by nextEvent().

359  {
360  if (setExceptionState_)
361  threadError();
362  if (!currentFile_.get()) {
365  if (!fileQueue_.try_pop(currentFile_)) {
366  //sleep until wakeup (only in single-buffer mode) or timeout
367  std::unique_lock<std::mutex> lkw(mWakeup_);
368  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_.get())
370  }
371  status = currentFile_->status_;
374  currentFile_.reset();
375  return status;
376  } else if (status == evf::EvFDaqDirector::runAbort) {
377  throw cms::Exception("FedRawDataInputSource::getNextEvent")
378  << "Run has been aborted by the input source reader thread";
379  } else if (status == evf::EvFDaqDirector::newLumi) {
381  if (getLSFromFilename_) {
382  if (currentFile_->lumi_ > currentLumiSection_) {
384  eventsThisLumi_ = 0;
386  }
387  } else { //let this be picked up from next event
389  }
390  currentFile_.reset();
391  return status;
392  } else if (status == evf::EvFDaqDirector::newFile) {
394  } else
395  assert(false);
396  }
398 
399  //file is empty
400  if (!currentFile_->fileSize_) {
402  //try to open new lumi
403  assert(currentFile_->nChunks_ == 0);
404  if (getLSFromFilename_)
405  if (currentFile_->lumi_ > currentLumiSection_) {
407  eventsThisLumi_ = 0;
409  }
410  //immediately delete empty file
411  currentFile_.reset();
413  }
414 
415  //file is finished
416  if (currentFile_->bufferPosition_ == currentFile_->fileSize_) {
418  //release last chunk (it is never released elsewhere)
419  freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]);
420  if (currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_)) {
421  throw cms::Exception("FedRawDataInputSource::getNextEvent")
422  << "Fully processed " << currentFile_->nProcessed_ << " from the file " << currentFile_->fileName_
423  << " but according to BU JSON there should be " << currentFile_->nEvents_ << " events";
424  }
425  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
426  if (singleBufferMode_) {
427  std::unique_lock<std::mutex> lkw(mWakeup_);
428  cvWakeup_.notify_one();
429  }
430  bufferInputRead_ = 0;
432  //put the file in pending delete list;
433  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
434  filesToDelete_.push_back(std::pair<int, std::unique_ptr<InputFile>>(currentFileIndex_, std::move(currentFile_)));
435  } else {
436  //in single-thread and stream jobs, events are already processed
437  currentFile_.reset();
438  }
440  }
441 
442  //handle RAW file header
443  if (currentFile_->bufferPosition_ == 0 && currentFile_->rawHeaderSize_ > 0) {
444  if (currentFile_->fileSize_ <= currentFile_->rawHeaderSize_) {
445  if (currentFile_->fileSize_ < currentFile_->rawHeaderSize_)
446  throw cms::Exception("FedRawDataInputSource::getNextEvent")
447  << "Premature end of input file while reading file header";
448 
449  edm::LogWarning("FedRawDataInputSource")
450  << "File with only raw header and no events received in LS " << currentFile_->lumi_;
451  if (getLSFromFilename_)
452  if (currentFile_->lumi_ > currentLumiSection_) {
454  eventsThisLumi_ = 0;
456  }
457  }
458 
459  //advance buffer position to skip file header (chunk will be acquired later)
460  currentFile_->chunkPosition_ += currentFile_->rawHeaderSize_;
461  currentFile_->bufferPosition_ += currentFile_->rawHeaderSize_;
462  }
463 
464  //file is too short
465  if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < FRDHeaderVersionSize[detectedFRDversion_]) {
466  throw cms::Exception("FedRawDataInputSource::getNextEvent")
467  << "Premature end of input file while reading event header";
468  }
469  if (singleBufferMode_) {
470  //should already be there
472  while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
473  usleep(10000);
474  if (currentFile_->parent_->exceptionState() || setExceptionState_)
475  currentFile_->parent_->threadError();
476  }
478 
479  unsigned char* dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
480 
481  //conditions when read amount is not sufficient for the header to fit
485 
486  if (detectedFRDversion_ == 0) {
487  detectedFRDversion_ = *((uint16_t*)dataPosition);
489  throw cms::Exception("FedRawDataInputSource::getNextEvent")
490  << "Unknown FRD version -: " << detectedFRDversion_;
492  }
493 
494  //recalculate chunk position
495  dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
497  throw cms::Exception("FedRawDataInputSource::getNextEvent")
498  << "Premature end of input file while reading event header";
499  }
500  }
501 
502  event_ = std::make_unique<FRDEventMsgView>(dataPosition);
503  if (event_->size() > eventChunkSize_) {
504  throw cms::Exception("FedRawDataInputSource::getNextEvent")
505  << " event id:" << event_->event() << " lumi:" << event_->lumi() << " run:" << event_->run()
506  << " of size:" << event_->size() << " bytes does not fit into a chunk of size:" << eventChunkSize_
507  << " bytes";
508  }
509 
510  const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];
511 
512  if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize) {
513  throw cms::Exception("FedRawDataInputSource::getNextEvent")
514  << "Premature end of input file while reading event data";
515  }
516  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
518  //recalculate chunk position
519  dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
520  event_ = std::make_unique<FRDEventMsgView>(dataPosition);
521  }
522  currentFile_->bufferPosition_ += event_->size();
523  currentFile_->chunkPosition_ += event_->size();
524  //last chunk is released when this function is invoked next time
525 
526  }
527  //multibuffer mode:
528  else {
529  //wait for the current chunk to become added to the vector
531  while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
532  usleep(10000);
533  if (setExceptionState_)
534  threadError();
535  }
537 
538  //check if header is at the boundary of two chunks
539  chunkIsFree_ = false;
540  unsigned char* dataPosition;
541 
542  //read header, copy it to a single chunk if necessary
544  throw cms::Exception("FedRawDataInputSource::getNextEvent")
545  << "Premature end of input file (missing:"
547  << ") while reading event data for next event header";
548  bool chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_]);
549 
550  event_ = std::make_unique<FRDEventMsgView>(dataPosition);
551  if (event_->size() > eventChunkSize_) {
552  throw cms::Exception("FedRawDataInputSource::getNextEvent")
553  << " event id:" << event_->event() << " lumi:" << event_->lumi() << " run:" << event_->run()
554  << " of size:" << event_->size() << " bytes does not fit into a chunk of size:" << eventChunkSize_
555  << " bytes";
556  }
557 
558  const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];
559 
560  if (currentFile_->fileSizeLeft() < msgSize) {
561  throw cms::Exception("FedRawDataInputSource::getNextEvent")
562  << "Premature end of input file (missing:" << (msgSize - currentFile_->fileSizeLeft())
563  << ") while reading event data for event " << event_->event() << " lumi:" << event_->lumi();
564  }
565 
566  if (chunkEnd) {
567  //header was at the chunk boundary, we will have to move payload as well
568  currentFile_->moveToPreviousChunk(msgSize, FRDHeaderVersionSize[detectedFRDversion_]);
569  chunkIsFree_ = true;
570  } else {
571  //header was contiguous, but check if payload fits the chunk
572  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
573  //rewind to header start position
575  //copy event to a chunk start and move pointers
576 
578 
579  chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_] + msgSize);
580 
582 
583  assert(chunkEnd);
584  chunkIsFree_ = true;
585  //header is moved
586  event_ = std::make_unique<FRDEventMsgView>(dataPosition);
587  } else {
588  //everything is in a single chunk, only move pointers forward
589  chunkEnd = currentFile_->advance(dataPosition, msgSize);
590  assert(!chunkEnd);
591  chunkIsFree_ = false;
592  }
593  }
594  //sanity-check check that the buffer position has not exceeded file size after preparing event
595  if (currentFile_->fileSize_ < currentFile_->bufferPosition_) {
596  throw cms::Exception("FedRawDataInputSource::getNextEvent")
597  << "Exceeded file size by " << currentFile_->bufferPosition_ - currentFile_->fileSize_
598  << " after reading last event declared size of " << event_->size() << " bytes";
599  }
600  } //end multibuffer mode
602 
603  if (verifyChecksum_ && event_->version() >= 5) {
604  uint32_t crc = 0;
605  crc = crc32c(crc, (const unsigned char*)event_->payload(), event_->eventSize());
606  if (crc != event_->crc32c()) {
607  if (fms_)
609  throw cms::Exception("FedRawDataInputSource::getNextEvent")
610  << "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() << " but calculated 0x"
611  << crc;
612  }
613  } else if (verifyChecksum_ && event_->version() >= 3) {
614  uint32_t adler = adler32(0L, Z_NULL, 0);
615  adler = adler32(adler, (Bytef*)event_->payload(), event_->eventSize());
616 
617  if (adler != event_->adler32()) {
618  if (fms_)
620  throw cms::Exception("FedRawDataInputSource::getNextEvent")
621  << "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() << " but calculated 0x"
622  << adler;
623  }
624  }
626 
627  currentFile_->nProcessed_++;
628 
630 }
std::condition_variable cvWakeup_
constexpr size_t FRDHeaderMaxVersion
void setExceptionDetected(unsigned int ls)
void maybeOpenNewLumiSection(const uint32_t lumiSection)
tbb::concurrent_queue< std::unique_ptr< InputFile > > fileQueue_
int timeout
Definition: mps_check.py:53
assert(be >=bs)
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
void setMonState(evf::FastMonState::InputState state)
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
Definition: crc32c.cc:340
std::unique_ptr< FRDEventMsgView > event_
evf::EvFDaqDirector * daqDirector_
std::atomic< unsigned int > readingFilesCount_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
evf::FastMonitoringService * fms_
constexpr std::array< uint32, FRDHeaderMaxVersion+1 > FRDHeaderVersionSize
tbb::concurrent_queue< InputChunk * > freeChunks_
Log< level::Warning, false > LogWarning
void readNextChunkIntoBuffer(InputFile *file)
std::unique_ptr< InputFile > currentFile_
def move(src, dest)
Definition: eostools.py:511

◆ initFileList()

long FedRawDataInputSource::initFileList ( )
private

Definition at line 1610 of file FedRawDataInputSource.cc.

References a, b, mps_fire::end, cppFunctionSkipper::exception, MillePedeFileConverter_cfg::fileName, fileNames_, castor_dqm_sourceclient_file_cfg::path, jetUpdater_cfi::sort, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FedRawDataInputSource().

1610  {
1611  std::sort(fileNames_.begin(), fileNames_.end(), [](std::string a, std::string b) {
1612  if (a.rfind('/') != std::string::npos)
1613  a = a.substr(a.rfind('/'));
1614  if (b.rfind('/') != std::string::npos)
1615  b = b.substr(b.rfind('/'));
1616  return b > a;
1617  });
1618 
1619  if (!fileNames_.empty()) {
1620  //get run number from first file in the vector
1622  std::string fileStem = fileName.stem().string();
1623  if (fileStem.find("file://") == 0)
1624  fileStem = fileStem.substr(7);
1625  else if (fileStem.find("file:") == 0)
1626  fileStem = fileStem.substr(5);
1627  auto end = fileStem.find('_');
1628 
1629  if (fileStem.find("run") == 0) {
1630  std::string runStr = fileStem.substr(3, end - 3);
1631  try {
1632  //get long to support test run numbers < 2^32
1633  long rval = std::stol(runStr);
1634  edm::LogInfo("FedRawDataInputSource") << "Autodetected run number in fileListMode -: " << rval;
1635  return rval;
1636  } catch (const std::exception&) {
1637  edm::LogWarning("FedRawDataInputSource")
1638  << "Unable to autodetect run number in fileListMode from file -: " << fileName;
1639  }
1640  }
1641  }
1642  return -1;
1643 }
std::vector< std::string > fileNames_
Log< level::Info, false > LogInfo
double b
Definition: hdecay.h:120
double a
Definition: hdecay.h:121
Log< level::Warning, false > LogWarning

◆ maybeOpenNewLumiSection()

void FedRawDataInputSource::maybeOpenNewLumiSection ( const uint32_t  lumiSection)
private

Definition at line 313 of file FedRawDataInputSource.cc.

References visDQMUpload::buf, evf::EvFDaqDirector::createBoLSFile(), currentLumiSection_, daqDirector_, newFWLiteAna::found, evf::EvFDaqDirector::getEoLSFilePathOnFU(), edm::Timestamp::invalidTimestamp(), evf::EvFDaqDirector::lockFULocal2(), edm::InputSource::luminosityBlock(), edm::InputSource::luminosityBlockAuxiliary(), processHistoryID_, edm::InputSource::resetLuminosityBlockAuxiliary(), edm::InputSource::run(), edm::InputSource::runAuxiliary(), edm::InputSource::setLuminosityBlockAuxiliary(), edm_modernize_messagelogger::stat, AlCaHLTBitMon_QueryRunRegistry::string, evf::EvFDaqDirector::unlockFULocal2(), and useFileBroker_.

Referenced by checkNext(), and getNextEvent().

313  {
314  if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
315  if (!useFileBroker_) {
316  if (currentLumiSection_ > 0) {
318  struct stat buf;
319  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
320  if (!found) {
322  int eol_fd =
323  open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
324  close(eol_fd);
325  daqDirector_->createBoLSFile(lumiSection, false);
327  }
328  } else
329  daqDirector_->createBoLSFile(lumiSection, true); //needed for initial lumisection
330  }
331 
332  currentLumiSection_ = lumiSection;
333 
335 
336  timeval tv;
337  gettimeofday(&tv, nullptr);
338  const edm::Timestamp lsopentime((unsigned long long)tv.tv_sec * 1000000 + (unsigned long long)tv.tv_usec);
339 
341  runAuxiliary()->run(), lumiSection, lsopentime, edm::Timestamp::invalidTimestamp());
342 
343  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
344  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
345 
346  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: " << lumiSection;
347  }
348 }
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:232
static Timestamp invalidTimestamp()
Definition: Timestamp.h:75
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:457
std::string getEoLSFilePathOnFU(const unsigned int ls) const
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:337
edm::ProcessHistoryID processHistoryID_
Log< level::Info, false > LogInfo
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:345
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:462
evf::EvFDaqDirector * daqDirector_
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:235
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const

◆ nextEvent()

evf::EvFDaqDirector::FileStatus FedRawDataInputSource::nextEvent ( )
inlineprivate

Definition at line 350 of file FedRawDataInputSource.cc.

References getNextEvent(), svgfig::load(), evf::EvFDaqDirector::noFile, edm::shutdown_flag, and mps_update::status.

Referenced by checkNext().

350  {
353  if (edm::shutdown_flag.load(std::memory_order_relaxed))
354  break;
355  }
356  return status;
357 }
volatile std::atomic< bool > shutdown_flag
def load(fileName)
Definition: svgfig.py:547
evf::EvFDaqDirector::FileStatus getNextEvent()

◆ read()

void FedRawDataInputSource::read ( edm::EventPrincipal eventPrincipal)
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 632 of file FedRawDataInputSource.cc.

References printConversionInfo::aux, edm::DaqProvenanceHelper::branchDescription(), checkEvery_, chunkIsFree_, runTheMatrix::const, currentFile_, currentFileIndex_, currentLumiSection_, daqProvenanceHelper_, edm::DaqProvenanceHelper::dummyProvenance(), event_, eventID_, eventRunNumber_, eventsThisLumi_, Exception, fileDeleteLock_, fileListLoopMode_, filesToDelete_, fillFEDRawDataCollection(), fms_, freeChunks_, GTPEventID_, mps_fire::i, evf::FastMonState::inNoRequest, evf::FastMonState::inReadCleanup, evf::FastMonState::inReadEvent, evf::FastMonitoringService::isExceptionOnData(), L1EventID_, FEDHeader::length, edm::RawInputSource::makeEvent(), evf::evtn::makeEventAuxiliary(), eostools::move(), edm::EventAuxiliary::PhysicsTrigger, edm::InputSource::processGUID(), processHistoryID_, edm::EventPrincipal::put(), l1tstage2_dqm_sourceclient-live_cfg::rawData, setMonState(), streamFileTracker_, edm::EventPrincipal::streamID(), AlCaHLTBitMon_QueryRunRegistry::string, tcds_pointer_, FEDHeader::triggerType(), and useL1EventID_.

Referenced by edmIntegrityCheck.PublishToFileSystem::get(), readNextChunkIntoBuffer(), and readWorker().

632  {
634  std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
635  bool tcdsInRange;
636  edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData, tcdsInRange);
637 
638  if (useL1EventID_) {
641  aux.setProcessHistoryID(processHistoryID_);
642  makeEvent(eventPrincipal, aux);
643  } else if (tcds_pointer_ == nullptr) {
644  if (!GTPEventID_) {
645  throw cms::Exception("FedRawDataInputSource::read")
646  << "No TCDS or GTP FED in event with FEDHeader EID -: " << L1EventID_;
647  }
650  aux.setProcessHistoryID(processHistoryID_);
651  makeEvent(eventPrincipal, aux);
652  } else {
653  const FEDHeader fedHeader(tcds_pointer_);
654  tcds::Raw_v1 const* tcds = reinterpret_cast<tcds::Raw_v1 const*>(tcds_pointer_ + FEDHeader::length);
659  event_->isRealData(),
660  static_cast<edm::EventAuxiliary::ExperimentType>(fedHeader.triggerType()),
661  processGUID(),
663  !tcdsInRange);
664  aux.setProcessHistoryID(processHistoryID_);
665  makeEvent(eventPrincipal, aux);
666  }
667 
668  std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
669 
671 
672  eventsThisLumi_++;
674 
675  //resize vector if needed
676  while (streamFileTracker_.size() <= eventPrincipal.streamID())
677  streamFileTracker_.push_back(-1);
678 
679  streamFileTracker_[eventPrincipal.streamID()] = currentFileIndex_;
680 
681  //this old file check runs no more often than every 10 events
682  if (!((currentFile_->nProcessed_ - 1) % (checkEvery_))) {
683  //delete files that are not in processing
684  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
685  auto it = filesToDelete_.begin();
686  while (it != filesToDelete_.end()) {
687  bool fileIsBeingProcessed = false;
688  for (unsigned int i = 0; i < streamFileTracker_.size(); i++) {
689  if (it->first == streamFileTracker_.at(i)) {
690  fileIsBeingProcessed = true;
691  break;
692  }
693  }
694  if (!fileIsBeingProcessed && (!fms_ || !fms_->isExceptionOnData(it->second->lumi_))) {
695  std::string fileToDelete = it->second->fileName_;
696  it = filesToDelete_.erase(it);
697  } else
698  it++;
699  }
700  }
701  if (chunkIsFree_)
702  freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_ - 1]);
703  chunkIsFree_ = false;
705  return;
706 }
static const uint32_t length
Definition: FEDHeader.h:54
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection &rawData, bool &tcdsInRange)
StreamID streamID() const
bool isExceptionOnData(unsigned int ls)
edm::EventAuxiliary makeEventAuxiliary(const tcds::Raw_v1 *, unsigned int runNumber, unsigned int lumiSection, bool isRealData, const edm::EventAuxiliary::ExperimentType &, const std::string &processGUID, bool verifyLumiSection, bool suppressWarning)
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance) const
Definition: TCDSRaw.h:16
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:197
std::vector< int > streamFileTracker_
ProductProvenance const & dummyProvenance() const
edm::ProcessHistoryID processHistoryID_
const edm::DaqProvenanceHelper daqProvenanceHelper_
void setMonState(evf::FastMonState::InputState state)
BranchDescription const & branchDescription() const
std::unique_ptr< FRDEventMsgView > event_
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
evf::FastMonitoringService * fms_
tbb::concurrent_queue< InputChunk * > freeChunks_
std::unique_ptr< InputFile > currentFile_
def move(src, dest)
Definition: eostools.py:511

◆ readNextChunkIntoBuffer()

void FedRawDataInputSource::readNextChunkIntoBuffer ( InputFile file)
private

Definition at line 1518 of file FedRawDataInputSource.cc.

References bufferInputRead_, eventChunkBlock_, eventChunkSize_, Exception, geometryDiff::file, fileDescriptor_, mps_fire::i, dqmdumpme::last, LogDebug, read(), and readBlocks_.

Referenced by getNextEvent().

1518  {
1519  uint32_t existingSize = 0;
1520 
1521  if (fileDescriptor_ < 0) {
1522  bufferInputRead_ = 0;
1523  if (file->rawFd_ == -1) {
1524  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1525  if (file->rawHeaderSize_)
1526  lseek(fileDescriptor_, file->rawHeaderSize_, SEEK_SET);
1527  } else
1528  fileDescriptor_ = file->rawFd_;
1529 
1530  //skip header size in destination buffer (chunk position was already adjusted)
1531  bufferInputRead_ += file->rawHeaderSize_;
1532  existingSize += file->rawHeaderSize_;
1533 
1534  if (fileDescriptor_ >= 0)
1535  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1536  else {
1537  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer")
1538  << "failed to open file " << std::endl
1539  << file->fileName_ << " fd:" << fileDescriptor_;
1540  }
1541  //fill chunk (skipping file header if present)
1542  for (unsigned int i = 0; i < readBlocks_; i++) {
1543  const ssize_t last = ::read(fileDescriptor_,
1544  (void*)(file->chunks_[0]->buf_ + existingSize),
1545  eventChunkBlock_ - (i == readBlocks_ - 1 ? existingSize : 0));
1547  existingSize += last;
1548  }
1549 
1550  } else {
1551  //continue reading
1552  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1553  for (unsigned int i = 0; i < readBlocks_; i++) {
1554  const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1556  existingSize += last;
1557  }
1558  } else {
1559  //event didn't fit in last chunk, so leftover must be moved to the beginning and completed
1560  uint32_t existingSizeLeft = eventChunkSize_ - file->chunkPosition_;
1561  memmove((void*)file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSizeLeft);
1562 
1563  //calculate amount of data that can be added
1564  const uint32_t blockcount = file->chunkPosition_ / eventChunkBlock_;
1565  const uint32_t leftsize = file->chunkPosition_ % eventChunkBlock_;
1566 
1567  for (uint32_t i = 0; i < blockcount; i++) {
1568  const ssize_t last =
1569  ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), eventChunkBlock_);
1571  existingSizeLeft += last;
1572  }
1573  if (leftsize) {
1574  const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), leftsize);
1576  }
1577  file->chunkPosition_ = 0; //data was moved to beginning of the chunk
1578  }
1579  }
1580  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1581  if (fileDescriptor_ != -1) {
1582  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1583  close(fileDescriptor_);
1584  file->rawFd_ = fileDescriptor_ = -1;
1585  }
1586  }
1587 }
void read(edm::EventPrincipal &eventPrincipal) override
#define LogDebug(id)

◆ readSupervisor()

void FedRawDataInputSource::readSupervisor ( )
private

Definition at line 771 of file FedRawDataInputSource.cc.

References alwaysStartFromFirstLS_, cms::cuda::assert(), cvReader_, cvWakeup_, daqDirector_, relativeConstraints::empty, eventChunkSize_, fileListMode_, fileQueue_, fms_, freeChunks_, getFile(), getLSFromFilename_, evf::EvFDaqDirector::getLumisectionToStart(), evf::EvFDaqDirector::getNextFromFileBroker(), evf::EvFDaqDirector::getStartLumisectionFromEnv(), evf::EvFDaqDirector::grabNextJsonFileAndUnlock(), evf::EvFDaqDirector::grabNextJsonFromRaw(), mps_fire::i, dqmiodatasetharvest::inf, InputFile, evf::EvFDaqDirector::inputThrottled(), evf::FastMonState::inRunEnd, evf::FastMonState::inSupBusy, evf::FastMonState::inSupFileLimit, evf::FastMonState::inSupLockPolling, evf::FastMonState::inSupNewFile, evf::FastMonState::inSupNewFileWaitChunk, evf::FastMonState::inSupNewFileWaitChunkCopying, evf::FastMonState::inSupNewFileWaitThread, evf::FastMonState::inSupNewFileWaitThreadCopying, evf::FastMonState::inSupNoFile, evf::FastMonState::inSupWaitFreeChunk, evf::FastMonState::inSupWaitFreeChunkCopying, evf::FastMonState::inSupWaitFreeThread, evf::FastMonState::inSupWaitFreeThreadCopying, createfilelist::int, evf::FastMonState::inThrottled, dqmiolumiharvest::j, LogDebug, eostools::ls(), evf::EvFDaqDirector::lumisectionDiscarded(), maxBufferedFiles_, SiStripPI::min, eostools::move(), mReader_, mWakeup_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, evf::EvFDaqDirector::numConcurrentLumis(), evf::EvFDaqDirector::parseFRDFileHeader(), castor_dqm_sourceclient_file_cfg::path, conifer::pow(), quit_threads_, InputChunk::readComplete_, readingFilesCount_, evf::FastMonitoringService::reportLockWait(), InputChunk::reset(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, setExceptionState_, setMonStateSup(), edm::shutdown_flag, singleBufferMode_, evf::FastMonitoringService::startedLookingForFile(), startupCv_, startupLock_, edm_modernize_messagelogger::stat, mps_update::status, evf::FastMonitoringService::stoppedLookingForFile(), AlCaHLTBitMon_QueryRunRegistry::string, thread_quit_signal, tid_active_, mps_check::timeout, evf::EvFDaqDirector::unlockFULocal(), evf::EvFDaqDirector::updateFuLock(), useFileBroker_, workerJob_, workerPool_, and workerThreads_.

Referenced by checkNext().

771  {
772  bool stop = false;
773  unsigned int currentLumiSection = 0;
774 
775  {
776  std::unique_lock<std::mutex> lk(startupLock_);
777  startupCv_.notify_one();
778  }
779 
780  uint32_t ls = 0;
781  uint32_t monLS = 1;
782  uint32_t lockCount = 0;
783  uint64_t sumLockWaitTimeUs = 0.;
784 
785  while (!stop) {
786  //wait for at least one free thread and chunk
787  int counter = 0;
788 
789  while ((workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty() ||
791  //report state to monitoring
792  if (fms_) {
793  bool copy_active = false;
794  for (auto j : tid_active_)
795  if (j)
796  copy_active = true;
799  else if (freeChunks_.empty()) {
800  if (copy_active)
802  else
804  } else {
805  if (copy_active)
807  else
809  }
810  }
811  std::unique_lock<std::mutex> lkw(mWakeup_);
812  //sleep until woken up by condition or a timeout
813  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
814  counter++;
815  if (!(counter % 6000)) {
816  edm::LogWarning("FedRawDataInputSource")
817  << "No free chunks or threads. Worker pool empty:" << workerPool_.empty()
818  << ", free chunks empty:" << freeChunks_.empty() << ", number of files buffered:" << readingFilesCount_
819  << " / " << maxBufferedFiles_;
820  }
821  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
822  } else {
823  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
824  }
825  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
826  stop = true;
827  break;
828  }
829  }
830  //if this is reached, there are enough buffers and threads to proceed or processing is instructed to stop
831 
832  if (stop)
833  break;
834 
835  //look for a new file
836  std::string nextFile;
837  uint32_t fileSizeIndex;
838  int64_t fileSizeFromMetadata;
839 
840  if (fms_) {
843  }
844 
846  uint16_t rawHeaderSize = 0;
847  uint32_t lsFromRaw = 0;
848  int32_t serverEventsInNewFile = -1;
849  int rawFd = -1;
850 
851  int backoff_exp = 0;
852 
853  //entering loop which tries to grab new file from ramdisk
855  //check if hltd has signalled to throttle input
856  counter = 0;
857  while (daqDirector_->inputThrottled()) {
858  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed))
859  break;
860 
861  unsigned int nConcurrentLumis = daqDirector_->numConcurrentLumis();
862  unsigned int nOtherLumis = nConcurrentLumis > 0 ? nConcurrentLumis - 1 : 0;
863  unsigned int checkLumiStart = currentLumiSection > nOtherLumis ? currentLumiSection - nOtherLumis : 1;
864  bool hasDiscardedLumi = false;
865  for (unsigned int i = checkLumiStart; i <= currentLumiSection; i++) {
867  edm::LogWarning("FedRawDataInputSource") << "Source detected that the lumisection is discarded -: " << i;
868  hasDiscardedLumi = true;
869  break;
870  }
871  }
872  if (hasDiscardedLumi)
873  break;
874 
876 
877  if (!(counter % 50))
878  edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, reading files is paused...";
879  usleep(100000);
880  counter++;
881  }
882 
883  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
884  stop = true;
885  break;
886  }
887 
888  assert(rawFd == -1);
889  uint64_t thisLockWaitTimeUs = 0.;
891  if (fileListMode_) {
892  //return LS if LS not set, otherwise return file
893  status = getFile(ls, nextFile, fileSizeIndex, thisLockWaitTimeUs);
895  uint16_t rawDataType;
897  rawFd,
898  rawHeaderSize,
899  rawDataType,
900  lsFromRaw,
901  serverEventsInNewFile,
902  fileSizeFromMetadata,
903  false,
904  false,
905  false) != 0) {
906  //error
907  setExceptionState_ = true;
908  stop = true;
909  break;
910  }
911  if (!getLSFromFilename_)
912  ls = lsFromRaw;
913  }
914  } else if (!useFileBroker_)
916  ls, nextFile, fileSizeIndex, rawHeaderSize, thisLockWaitTimeUs, setExceptionState_);
917  else {
918  status = daqDirector_->getNextFromFileBroker(currentLumiSection,
919  ls,
920  nextFile,
921  rawFd,
922  rawHeaderSize,
923  serverEventsInNewFile,
924  fileSizeFromMetadata,
925  thisLockWaitTimeUs);
926  }
927 
929 
930  //cycle through all remaining LS even if no files get assigned
931  if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
933 
934  //monitoring of lock wait time
935  if (thisLockWaitTimeUs > 0.)
936  sumLockWaitTimeUs += thisLockWaitTimeUs;
937  lockCount++;
938  if (ls > monLS) {
939  monLS = ls;
940  if (lockCount)
941  if (fms_)
942  fms_->reportLockWait(monLS, sumLockWaitTimeUs, lockCount);
943  lockCount = 0;
944  sumLockWaitTimeUs = 0;
945  }
946 
947  //check again for any remaining index/EoLS files after EoR file is seen
950  usleep(100000);
951  //now all files should have appeared in ramdisk, check again if any raw files were left behind
953  ls, nextFile, fileSizeIndex, rawHeaderSize, thisLockWaitTimeUs, setExceptionState_);
954  if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
956  }
957 
959  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runEnded));
960  fileQueue_.push(std::move(inf));
961  stop = true;
962  break;
963  }
964 
965  //error from filelocking function
967  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
968  fileQueue_.push(std::move(inf));
969  stop = true;
970  break;
971  }
972  //queue new lumisection
973  if (getLSFromFilename_) {
974  if (ls > currentLumiSection) {
975  if (!useFileBroker_) {
976  //file locking
977  //setMonStateSup(inSupNewLumi);
978  currentLumiSection = ls;
979  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
980  fileQueue_.push(std::move(inf));
981  } else {
982  //new file service
983  if (currentLumiSection == 0 && !alwaysStartFromFirstLS_) {
985  //start transitions from LS specified by env, continue if not reached
986  if (ls < daqDirector_->getStartLumisectionFromEnv()) {
987  //skip file if from earlier LS than specified by env
988  if (rawFd != -1) {
989  close(rawFd);
990  rawFd = -1;
991  }
993  continue;
994  } else {
995  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, ls));
996  fileQueue_.push(std::move(inf));
997  }
998  } else if (ls < 100) {
999  //look at last LS file on disk to start from that lumisection (only within first 100 LS)
1000  unsigned int lsToStart = daqDirector_->getLumisectionToStart();
1001 
1002  for (unsigned int nextLS = std::min(lsToStart, ls); nextLS <= ls; nextLS++) {
1003  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
1004  fileQueue_.push(std::move(inf));
1005  }
1006  } else {
1007  //start from current LS
1008  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, ls));
1009  fileQueue_.push(std::move(inf));
1010  }
1011  } else {
1012  //queue all lumisections after last one seen to avoid gaps
1013  for (unsigned int nextLS = currentLumiSection + 1; nextLS <= ls; nextLS++) {
1014  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
1015  fileQueue_.push(std::move(inf));
1016  }
1017  }
1018  currentLumiSection = ls;
1019  }
1020  }
1021  //else
1022  if (currentLumiSection > 0 && ls < currentLumiSection) {
1023  edm::LogError("FedRawDataInputSource")
1024  << "Got old LS (" << ls << ") file from EvFDAQDirector! Expected LS:" << currentLumiSection
1025  << ". Aborting execution." << std::endl;
1026  if (rawFd != -1)
1027  close(rawFd);
1028  rawFd = -1;
1029  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
1030  fileQueue_.push(std::move(inf));
1031  stop = true;
1032  break;
1033  }
1034  }
1035 
1036  int dbgcount = 0;
1039  dbgcount++;
1040  if (!(dbgcount % 20))
1041  LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
1042  if (!useFileBroker_)
1043  usleep(100000);
1044  else {
1045  backoff_exp = std::min(4, backoff_exp); // max 1.6 seconds
1046  //backoff_exp=0; // disabled!
1047  int sleeptime = (int)(100000. * pow(2, backoff_exp));
1048  usleep(sleeptime);
1049  backoff_exp++;
1050  }
1051  } else
1052  backoff_exp = 0;
1053  }
1054  //end of file grab loop, parse result
1057  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
1058 
1059  std::string rawFile;
1060  //file service will report raw extension
1061  if (useFileBroker_ || rawHeaderSize)
1062  rawFile = nextFile;
1063  else {
1064  std::filesystem::path rawFilePath(nextFile);
1065  rawFile = rawFilePath.replace_extension(".raw").string();
1066  }
1067 
1068  struct stat st;
1069  int stat_res = stat(rawFile.c_str(), &st);
1070  if (stat_res == -1) {
1071  edm::LogError("FedRawDataInputSource") << "Can not stat file (" << errno << "):-" << rawFile << std::endl;
1072  setExceptionState_ = true;
1073  break;
1074  }
1075  uint64_t fileSize = st.st_size;
1076 
1077  if (fms_) {
1081  }
1082  int eventsInNewFile;
1083  if (fileListMode_) {
1084  if (fileSize == 0)
1085  eventsInNewFile = 0;
1086  else
1087  eventsInNewFile = -1;
1088  } else {
1090  if (!useFileBroker_) {
1091  if (rawHeaderSize) {
1092  int rawFdEmpty = -1;
1093  uint16_t rawHeaderCheck;
1094  bool fileFound;
1095  eventsInNewFile = daqDirector_->grabNextJsonFromRaw(
1096  nextFile, rawFdEmpty, rawHeaderCheck, fileSizeFromMetadata, fileFound, 0, true);
1097  assert(fileFound && rawHeaderCheck == rawHeaderSize);
1099  } else
1100  eventsInNewFile = daqDirector_->grabNextJsonFileAndUnlock(nextFile);
1101  } else
1102  eventsInNewFile = serverEventsInNewFile;
1103  assert(eventsInNewFile >= 0);
1104  assert((eventsInNewFile > 0) ==
1105  (fileSize > rawHeaderSize)); //file without events must be empty or contain only header
1106  }
1107 
1108  if (!singleBufferMode_) {
1109  //calculate number of needed chunks
1110  unsigned int neededChunks = fileSize / eventChunkSize_;
1111  if (fileSize % eventChunkSize_)
1112  neededChunks++;
1113 
1114  std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1115  ls,
1116  rawFile,
1117  !fileListMode_,
1118  rawFd,
1119  fileSize,
1120  rawHeaderSize,
1121  neededChunks,
1122  eventsInNewFile,
1123  this));
1125  auto newInputFilePtr = newInputFile.get();
1126  fileQueue_.push(std::move(newInputFile));
1127 
1128  for (unsigned int i = 0; i < neededChunks; i++) {
1129  if (fms_) {
1130  bool copy_active = false;
1131  for (auto j : tid_active_)
1132  if (j)
1133  copy_active = true;
1134  if (copy_active)
1136  else
1138  }
1139  //get thread
1140  unsigned int newTid = 0xffffffff;
1141  while (!workerPool_.try_pop(newTid)) {
1142  usleep(100000);
1143  if (quit_threads_.load(std::memory_order_relaxed)) {
1144  stop = true;
1145  break;
1146  }
1147  }
1148 
1149  if (fms_) {
1150  bool copy_active = false;
1151  for (auto j : tid_active_)
1152  if (j)
1153  copy_active = true;
1154  if (copy_active)
1156  else
1158  }
1159  InputChunk* newChunk = nullptr;
1160  while (!freeChunks_.try_pop(newChunk)) {
1161  usleep(100000);
1162  if (quit_threads_.load(std::memory_order_relaxed)) {
1163  stop = true;
1164  break;
1165  }
1166  }
1167 
1168  if (newChunk == nullptr) {
1169  //return unused tid if we received shutdown (nullptr chunk)
1170  if (newTid != 0xffffffff)
1171  workerPool_.push(newTid);
1172  stop = true;
1173  break;
1174  }
1175  if (stop)
1176  break;
1178 
1179  std::unique_lock<std::mutex> lk(mReader_);
1180 
1181  unsigned int toRead = eventChunkSize_;
1182  if (i == neededChunks - 1 && fileSize % eventChunkSize_)
1183  toRead = fileSize % eventChunkSize_;
1184  newChunk->reset(i * eventChunkSize_, toRead, i);
1185 
1186  workerJob_[newTid].first = newInputFilePtr;
1187  workerJob_[newTid].second = newChunk;
1188 
1189  //wake up the worker thread
1190  cvReader_[newTid]->notify_one();
1191  }
1192  } else {
1193  if (!eventsInNewFile) {
1194  if (rawFd) {
1195  close(rawFd);
1196  rawFd = -1;
1197  }
1198  //still queue file for lumi update
1199  std::unique_lock<std::mutex> lkw(mWakeup_);
1200  //TODO: also file with only file header fits in this edge case. Check if read correctly in single buffer mode
1201  std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1202  ls,
1203  rawFile,
1204  !fileListMode_,
1205  rawFd,
1206  fileSize,
1207  rawHeaderSize,
1208  (rawHeaderSize > 0),
1209  0,
1210  this));
1212  fileQueue_.push(std::move(newInputFile));
1213  cvWakeup_.notify_one();
1214  break;
1215  }
1216  //in single-buffer mode put single chunk in the file and let the main thread read the file
1217  InputChunk* newChunk = nullptr;
1218  //should be available immediately
1219  while (!freeChunks_.try_pop(newChunk)) {
1220  usleep(100000);
1221  if (quit_threads_.load(std::memory_order_relaxed)) {
1222  stop = true;
1223  break;
1224  }
1225  }
1226 
1227  if (newChunk == nullptr) {
1228  stop = true;
1229  }
1230 
1231  if (stop)
1232  break;
1233 
1234  std::unique_lock<std::mutex> lkw(mWakeup_);
1235 
1236  unsigned int toRead = eventChunkSize_;
1237  if (fileSize % eventChunkSize_)
1238  toRead = fileSize % eventChunkSize_;
1239  newChunk->reset(0, toRead, 0);
1240  newChunk->readComplete_ = true;
1241 
1242  //push file and wakeup main thread
1243  std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1244  ls,
1245  rawFile,
1246  !fileListMode_,
1247  rawFd,
1248  fileSize,
1249  rawHeaderSize,
1250  1,
1251  eventsInNewFile,
1252  this));
1253  newInputFile->chunks_[0] = newChunk;
1255  fileQueue_.push(std::move(newInputFile));
1256  cvWakeup_.notify_one();
1257  }
1258  }
1259  }
1261  //make sure threads finish reading
1262  unsigned numFinishedThreads = 0;
1263  while (numFinishedThreads < workerThreads_.size()) {
1264  unsigned tid = 0;
1265  while (!workerPool_.try_pop(tid)) {
1266  usleep(10000);
1267  }
1268  std::unique_lock<std::mutex> lk(mReader_);
1269  thread_quit_signal[tid] = true;
1270  cvReader_[tid]->notify_one();
1271  numFinishedThreads++;
1272  }
1273  for (unsigned int i = 0; i < workerThreads_.size(); i++) {
1274  workerThreads_[i]->join();
1275  delete workerThreads_[i];
1276  }
1277 }
std::condition_variable cvWakeup_
tbb::concurrent_queue< unsigned int > workerPool_
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
volatile std::atomic< bool > shutdown_flag
tbb::concurrent_queue< std::unique_ptr< InputFile > > fileQueue_
constexpr int pow(int x)
Definition: conifer.h:24
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
bool lumisectionDiscarded(unsigned int ls)
Log< level::Error, false > LogError
unsigned int numConcurrentLumis() const
int timeout
Definition: mps_check.py:53
assert(be >=bs)
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint16_t &rawDataType, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs, bool requireHeader=true)
void reset(uint64_t newOffset, uint64_t toRead, unsigned int fileIndex)
void setMonStateSup(evf::FastMonState::InputState state)
int grabNextJsonFileAndUnlock(std::filesystem::path const &jsonSourcePath)
std::vector< std::thread * > workerThreads_
std::atomic< bool > readComplete_
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, uint64_t &lockWaitTime, bool &setExceptionState)
def ls(path, rec=False)
Definition: eostools.py:349
unsigned int getStartLumisectionFromEnv() const
unsigned long long uint64_t
Definition: Time.h:13
std::condition_variable startupCv_
void stoppedLookingForFile(unsigned int lumi)
evf::EvFDaqDirector * daqDirector_
std::vector< unsigned int > thread_quit_signal
unsigned int getLumisectionToStart() const
std::atomic< unsigned int > readingFilesCount_
std::vector< std::unique_ptr< std::condition_variable > > cvReader_
evf::FastMonitoringService * fms_
std::vector< unsigned int > tid_active_
tbb::concurrent_queue< InputChunk * > freeChunks_
Log< level::Warning, false > LogWarning
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile, bool requireHeader=true)
def move(src, dest)
Definition: eostools.py:511
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
#define LogDebug(id)

◆ readWorker()

void FedRawDataInputSource::readWorker ( unsigned int  tid)
private

Definition at line 1279 of file FedRawDataInputSource.cc.

References cms::cuda::assert(), InputChunk::buf_, cvReader_, detectedFRDversion_, change_name::diff, mps_fire::end, eventChunkBlock_, geometryDiff::file, InputChunk::fileIndex_, dqmdumpme::first, FRDHeaderMaxVersion, mps_fire::i, caHitNtupletGeneratorKernels::if(), dqmdumpme::last, LogDebug, SiStripPI::min, mReader_, submitPVValidationJobs::now, numConcurrentReads_, InputChunk::offset_, read(), readBlocks_, InputChunk::readComplete_, edm::second(), setExceptionState_, runEdmFileComparison::skipped, command_line::start, startupCv_, startupLock_, thread_quit_signal, tid_active_, InputChunk::usedSize_, workerJob_, and workerPool_.

Referenced by FedRawDataInputSource().

1279  {
1280  bool init = true;
1281 
1282  while (true) {
1283  tid_active_[tid] = false;
1284  std::unique_lock<std::mutex> lk(mReader_);
1285  workerJob_[tid].first = nullptr;
1286  workerJob_[tid].first = nullptr;
1287 
1288  assert(!thread_quit_signal[tid]); //should never get it here
1289  workerPool_.push(tid);
1290 
1291  if (init) {
1292  std::unique_lock<std::mutex> lk(startupLock_);
1293  init = false;
1294  startupCv_.notify_one();
1295  }
1296  cvReader_[tid]->wait(lk);
1297 
1298  if (thread_quit_signal[tid])
1299  return;
1300  tid_active_[tid] = true;
1301 
1302  InputFile* file;
1303  InputChunk* chunk;
1304 
1305  assert(workerJob_[tid].first != nullptr && workerJob_[tid].second != nullptr);
1306 
1307  file = workerJob_[tid].first;
1308  chunk = workerJob_[tid].second;
1309 
1310  //skip reading initial header size in first chunk if inheriting file descriptor (already set at appropriate position)
1311  unsigned int bufferLeft = (chunk->offset_ == 0 && file->rawFd_ != -1) ? file->rawHeaderSize_ : 0;
1312 
1313  //if only one worker thread exists, use single fd for all operations
1314  //if more worker threads exist, use rawFd_ for only the first read operation and then close file
1315  int fileDescriptor;
1316  bool fileOpenedHere = false;
1317 
1318  if (numConcurrentReads_ == 1) {
1319  fileDescriptor = file->rawFd_;
1320  if (fileDescriptor == -1) {
1321  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1322  fileOpenedHere = true;
1323  file->rawFd_ = fileDescriptor;
1324  }
1325  } else {
1326  if (chunk->offset_ == 0) {
1327  fileDescriptor = file->rawFd_;
1328  file->rawFd_ = -1;
1329  if (fileDescriptor == -1) {
1330  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1331  fileOpenedHere = true;
1332  }
1333  } else {
1334  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1335  fileOpenedHere = true;
1336  }
1337  }
1338 
1339  if (fileDescriptor < 0) {
1340  edm::LogError("FedRawDataInputSource") << "readWorker failed to open file -: " << file->fileName_
1341  << " fd:" << fileDescriptor << " error: " << strerror(errno);
1342  setExceptionState_ = true;
1343  continue;
1344  }
1345  if (fileOpenedHere) { //fast forward to this chunk position
1346  off_t pos = 0;
1347  pos = lseek(fileDescriptor, chunk->offset_, SEEK_SET);
1348  if (pos == -1) {
1349  edm::LogError("FedRawDataInputSource")
1350  << "readWorker failed to seek file -: " << file->fileName_ << " fd:" << fileDescriptor << " to offset "
1351  << chunk->offset_ << " error: " << strerror(errno);
1352  setExceptionState_ = true;
1353  continue;
1354  }
1355  }
1356 
1357  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_
1358  << " at offset " << lseek(fileDescriptor, 0, SEEK_CUR);
1359 
1360  unsigned int skipped = bufferLeft;
1362  for (unsigned int i = 0; i < readBlocks_; i++) {
1363  ssize_t last;
1364 
1365  //protect against reading into next block
1366  last = ::read(fileDescriptor,
1367  (void*)(chunk->buf_ + bufferLeft),
1368  std::min(chunk->usedSize_ - bufferLeft, (uint64_t)eventChunkBlock_));
1369 
1370  if (last < 0) {
1371  edm::LogError("FedRawDataInputSource") << "readWorker failed to read file -: " << file->fileName_
1372  << " fd:" << fileDescriptor << " error: " << strerror(errno);
1373  setExceptionState_ = true;
1374  break;
1375  }
1376  if (last > 0)
1377  bufferLeft += last;
1378  if (last < eventChunkBlock_) { //last read
1379  //check if this is last block, then total read size must match file size
1380  if (!(chunk->usedSize_ - skipped == i * eventChunkBlock_ + (unsigned int)last)) {
1381  edm::LogError("FedRawDataInputSource")
1382  << "readWorker failed to read file -: " << file->fileName_ << " fd:" << fileDescriptor << " last:" << last
1383  << " expectedChunkSize:" << chunk->usedSize_
1384  << " readChunkSize:" << (skipped + i * eventChunkBlock_ + last) << " skipped:" << skipped
1385  << " block:" << (i + 1) << "/" << readBlocks_ << " error: " << strerror(errno);
1386  setExceptionState_ = true;
1387  }
1388  break;
1389  }
1390  }
1391  if (setExceptionState_)
1392  continue;
1393 
1395  auto diff = end - start;
1396  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1397  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1398  << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1399  << " GB/s)";
1400 
1401  if (chunk->offset_ + bufferLeft == file->fileSize_) { //file reading finished using same fd
1402  close(fileDescriptor);
1403  fileDescriptor = -1;
1404  if (numConcurrentReads_ == 1)
1405  file->rawFd_ = -1;
1406  }
1407  if (numConcurrentReads_ > 1 && fileDescriptor != -1)
1408  close(fileDescriptor);
1409 
1410  //detect FRD event version. Skip file Header if it exists
1411  if (detectedFRDversion_ == 0 && chunk->offset_ == 0) {
1412  detectedFRDversion_ = *((uint16_t*)(chunk->buf_ + file->rawHeaderSize_));
1413  }
1415  chunk->readComplete_ =
1416  true; //this is atomic to secure the sequential buffer fill before becoming available for processing)
1417  file->chunks_[chunk->fileIndex_] = chunk; //put the completed chunk in the file chunk vector at predetermined index
1418  }
1419 }
Definition: start.py:1
void read(edm::EventPrincipal &eventPrincipal) override
constexpr size_t FRDHeaderMaxVersion
tbb::concurrent_queue< unsigned int > workerPool_
std::vector< ReaderInfo > workerJob_
Log< level::Error, false > LogError
assert(be >=bs)
U second(std::pair< T, U > const &p)
unsigned char * buf_
unsigned int fileIndex_
std::atomic< bool > readComplete_
Definition: init.py:1
unsigned long long uint64_t
Definition: Time.h:13
std::condition_variable startupCv_
std::vector< unsigned int > thread_quit_signal
std::vector< std::unique_ptr< std::condition_variable > > cvReader_
std::vector< unsigned int > tid_active_
#define LogDebug(id)

◆ reportEventsThisLumiInSource()

void FedRawDataInputSource::reportEventsThisLumiInSource ( unsigned int  lumi,
unsigned int  events 
)
private

Definition at line 1589 of file FedRawDataInputSource.cc.

References events, CommonMethods::lock(), monlock_, and sourceEventsReport_.

Referenced by checkNext(), and getNextEvent().

1589  {
1590  std::lock_guard<std::mutex> lock(monlock_);
1591  auto itr = sourceEventsReport_.find(lumi);
1592  if (itr != sourceEventsReport_.end())
1593  itr->second += events;
1594  else
1596 }
std::map< unsigned int, unsigned int > sourceEventsReport_
int events

◆ rewind_()

void FedRawDataInputSource::rewind_ ( )
overrideprivatevirtual

Reimplemented from edm::InputSource.

Definition at line 769 of file FedRawDataInputSource.cc.

769 {}

◆ setMonState()

void FedRawDataInputSource::setMonState ( evf::FastMonState::InputState  state)
inlineprotected

Definition at line 1426 of file FedRawDataInputSource.cc.

References fms_, evf::FastMonitoringService::setInState(), and edm::InputSource::state().

Referenced by InputFile::advance(), checkNext(), getNextEvent(), and read().

1426  {
1427  if (fms_)
1428  fms_->setInState(state);
1429 }
ItemType state() const
Definition: InputSource.h:332
evf::FastMonitoringService * fms_
void setInState(FastMonState::InputState inputState)

◆ setMonStateSup()

void FedRawDataInputSource::setMonStateSup ( evf::FastMonState::InputState  state)
inlineprotected

Definition at line 1431 of file FedRawDataInputSource.cc.

References fms_, evf::FastMonitoringService::setInStateSup(), and edm::InputSource::state().

Referenced by readSupervisor().

1431  {
1432  if (fms_)
1434 }
void setInStateSup(FastMonState::InputState inputState)
ItemType state() const
Definition: InputSource.h:332
evf::FastMonitoringService * fms_

◆ threadError()

void FedRawDataInputSource::threadError ( )
private

Definition at line 1421 of file FedRawDataInputSource.cc.

References Exception, and quit_threads_.

Referenced by InputFile::advance(), and getNextEvent().

1421  {
1422  quit_threads_ = true;
1423  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1424 }
std::atomic< bool > quit_threads_

Friends And Related Function Documentation

◆ InputChunk

friend struct InputChunk
friend

Definition at line 44 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

◆ InputFile

friend class InputFile
friend

Definition at line 43 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

Member Data Documentation

◆ alwaysStartFromFirstLS_

const bool FedRawDataInputSource::alwaysStartFromFirstLS_
private

Definition at line 100 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

◆ bufferInputRead_

uint32_t FedRawDataInputSource::bufferInputRead_ = 0
private

Definition at line 180 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readNextChunkIntoBuffer().

◆ checkEvery_

unsigned int FedRawDataInputSource::checkEvery_ = 10
private

Definition at line 171 of file FedRawDataInputSource.h.

Referenced by read().

◆ chunkIsFree_

bool FedRawDataInputSource::chunkIsFree_ = false
private

Definition at line 144 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

◆ currentFile_

std::unique_ptr<InputFile> FedRawDataInputSource::currentFile_
private

Definition at line 143 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), read(), and ~FedRawDataInputSource().

◆ currentFileIndex_

int FedRawDataInputSource::currentFileIndex_ = -1
private

Definition at line 166 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

◆ currentLumiSection_

unsigned int FedRawDataInputSource::currentLumiSection_
private

Definition at line 123 of file FedRawDataInputSource.h.

Referenced by checkNext(), getNextEvent(), maybeOpenNewLumiSection(), and read().

◆ cvReader_

std::vector<std::unique_ptr<std::condition_variable> > FedRawDataInputSource::cvReader_
private

◆ cvWakeup_

std::condition_variable FedRawDataInputSource::cvWakeup_
private

Definition at line 175 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

◆ daqDirector_

evf::EvFDaqDirector* FedRawDataInputSource::daqDirector_ = nullptr
private

◆ daqProvenanceHelper_

const edm::DaqProvenanceHelper FedRawDataInputSource::daqProvenanceHelper_
private

Definition at line 116 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and read().

◆ defPath_

std::string FedRawDataInputSource::defPath_
private

Definition at line 88 of file FedRawDataInputSource.h.

◆ detectedFRDversion_

uint16_t FedRawDataInputSource::detectedFRDversion_ = 0
private

Definition at line 142 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readWorker().

◆ event_

std::unique_ptr<FRDEventMsgView> FedRawDataInputSource::event_
private

Definition at line 118 of file FedRawDataInputSource.h.

Referenced by checkNext(), fillFEDRawDataCollection(), getNextEvent(), and read().

◆ eventChunkBlock_

unsigned int FedRawDataInputSource::eventChunkBlock_
private

◆ eventChunkSize_

unsigned int FedRawDataInputSource::eventChunkSize_
private

◆ eventID_

edm::EventID FedRawDataInputSource::eventID_
private

Definition at line 120 of file FedRawDataInputSource.h.

Referenced by read().

◆ eventRunNumber_

uint32_t FedRawDataInputSource::eventRunNumber_ = 0
private

Definition at line 124 of file FedRawDataInputSource.h.

Referenced by checkNext(), and read().

◆ eventsThisLumi_

unsigned int FedRawDataInputSource::eventsThisLumi_
private

Definition at line 128 of file FedRawDataInputSource.h.

Referenced by checkNext(), getNextEvent(), and read().

◆ eventsThisRun_

unsigned long FedRawDataInputSource::eventsThisRun_ = 0
private

Definition at line 129 of file FedRawDataInputSource.h.

◆ fileDeleteLock_

std::mutex FedRawDataInputSource::fileDeleteLock_
private

Definition at line 169 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and read().

◆ fileDescriptor_

int FedRawDataInputSource::fileDescriptor_ = -1
private

Definition at line 179 of file FedRawDataInputSource.h.

Referenced by readNextChunkIntoBuffer().

◆ fileListIndex_

unsigned int FedRawDataInputSource::fileListIndex_ = 0
private

Definition at line 109 of file FedRawDataInputSource.h.

Referenced by getFile().

◆ fileListLoopMode_

const bool FedRawDataInputSource::fileListLoopMode_
private

Definition at line 110 of file FedRawDataInputSource.h.

Referenced by checkNext(), FedRawDataInputSource(), getFile(), and read().

◆ fileListMode_

const bool FedRawDataInputSource::fileListMode_
private

◆ fileNames_

std::vector<std::string> FedRawDataInputSource::fileNames_
private

Definition at line 104 of file FedRawDataInputSource.h.

Referenced by getFile(), and initFileList().

◆ fileNamesToDelete_

std::list<std::pair<int, std::string> > FedRawDataInputSource::fileNamesToDelete_
private

Definition at line 168 of file FedRawDataInputSource.h.

◆ fileQueue_

tbb::concurrent_queue<std::unique_ptr<InputFile> > FedRawDataInputSource::fileQueue_
private

Definition at line 154 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

◆ filesToDelete_

std::list<std::pair<int, std::unique_ptr<InputFile> > > FedRawDataInputSource::filesToDelete_
private

◆ fms_

evf::FastMonitoringService* FedRawDataInputSource::fms_ = nullptr
private

◆ freeChunks_

tbb::concurrent_queue<InputChunk*> FedRawDataInputSource::freeChunks_
private

Definition at line 153 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), read(), and readSupervisor().

◆ fuOutputDir_

std::string FedRawDataInputSource::fuOutputDir_
private

Definition at line 114 of file FedRawDataInputSource.h.

◆ getLSFromFilename_

const bool FedRawDataInputSource::getLSFromFilename_
private

Definition at line 99 of file FedRawDataInputSource.h.

Referenced by checkNext(), getNextEvent(), and readSupervisor().

◆ GTPEventID_

uint32_t FedRawDataInputSource::GTPEventID_ = 0
private

Definition at line 125 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

◆ L1EventID_

uint32_t FedRawDataInputSource::L1EventID_ = 0
private

Definition at line 126 of file FedRawDataInputSource.h.

Referenced by checkNext(), and read().

◆ loopModeIterationInc_

unsigned int FedRawDataInputSource::loopModeIterationInc_ = 0
private

Definition at line 111 of file FedRawDataInputSource.h.

Referenced by getFile().

◆ maxBufferedFiles_

unsigned int FedRawDataInputSource::maxBufferedFiles_
private

Definition at line 94 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

◆ MAXTCDSuTCAFEDID_

uint16_t FedRawDataInputSource::MAXTCDSuTCAFEDID_ = FEDNumbering::MAXTCDSuTCAFEDID
private

Definition at line 132 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and fillFEDRawDataCollection().

◆ MINTCDSuTCAFEDID_

uint16_t FedRawDataInputSource::MINTCDSuTCAFEDID_ = FEDNumbering::MINTCDSuTCAFEDID
private

Definition at line 131 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and fillFEDRawDataCollection().

◆ monlock_

std::mutex FedRawDataInputSource::monlock_
private

Definition at line 185 of file FedRawDataInputSource.h.

Referenced by getEventReport(), and reportEventsThisLumiInSource().

◆ mReader_

std::mutex FedRawDataInputSource::mReader_
private

Definition at line 156 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), readWorker(), and ~FedRawDataInputSource().

◆ mWakeup_

std::mutex FedRawDataInputSource::mWakeup_
private

Definition at line 174 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

◆ numBuffers_

unsigned int FedRawDataInputSource::numBuffers_
private

Definition at line 93 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

◆ numConcurrentReads_

unsigned int FedRawDataInputSource::numConcurrentReads_
private

Definition at line 95 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and readWorker().

◆ processHistoryID_

edm::ProcessHistoryID FedRawDataInputSource::processHistoryID_
private

Definition at line 121 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), maybeOpenNewLumiSection(), and read().

◆ quit_threads_

std::atomic<bool> FedRawDataInputSource::quit_threads_
private

◆ readBlocks_

unsigned int FedRawDataInputSource::readBlocks_
private

◆ readingFilesCount_

std::atomic<unsigned int> FedRawDataInputSource::readingFilesCount_
private

Definition at line 96 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

◆ readSupervisorThread_

std::unique_ptr<std::thread> FedRawDataInputSource::readSupervisorThread_
private

Definition at line 147 of file FedRawDataInputSource.h.

Referenced by checkNext(), and ~FedRawDataInputSource().

◆ runNumber_

edm::RunNumber_t FedRawDataInputSource::runNumber_
private

Definition at line 113 of file FedRawDataInputSource.h.

Referenced by checkNext(), and FedRawDataInputSource().

◆ setExceptionState_

bool FedRawDataInputSource::setExceptionState_ = false
private

Definition at line 162 of file FedRawDataInputSource.h.

Referenced by exceptionState(), getNextEvent(), readSupervisor(), and readWorker().

◆ singleBufferMode_

bool FedRawDataInputSource::singleBufferMode_
private

Definition at line 178 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

◆ sourceEventsReport_

std::map<unsigned int, unsigned int> FedRawDataInputSource::sourceEventsReport_
private

Definition at line 184 of file FedRawDataInputSource.h.

Referenced by getEventReport(), and reportEventsThisLumiInSource().

◆ startedSupervisorThread_

bool FedRawDataInputSource::startedSupervisorThread_ = false
private

Definition at line 146 of file FedRawDataInputSource.h.

Referenced by checkNext(), and ~FedRawDataInputSource().

◆ startupCv_

std::condition_variable FedRawDataInputSource::startupCv_
private

◆ startupLock_

std::mutex FedRawDataInputSource::startupLock_
private

◆ streamFileTracker_

std::vector<int> FedRawDataInputSource::streamFileTracker_
private

Definition at line 170 of file FedRawDataInputSource.h.

Referenced by read().

◆ tcds_pointer_

unsigned char* FedRawDataInputSource::tcds_pointer_
private

Definition at line 127 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

◆ testTCDSFEDRange_

const std::vector<unsigned int> FedRawDataInputSource::testTCDSFEDRange_
private

Definition at line 103 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

◆ thread_quit_signal

std::vector<unsigned int> FedRawDataInputSource::thread_quit_signal
private

◆ threadInit_

std::atomic<bool> FedRawDataInputSource::threadInit_
private

Definition at line 182 of file FedRawDataInputSource.h.

◆ tid_active_

std::vector<unsigned int> FedRawDataInputSource::tid_active_
private

Definition at line 158 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), readSupervisor(), and readWorker().

◆ useFileBroker_

bool FedRawDataInputSource::useFileBroker_
private

◆ useL1EventID_

const bool FedRawDataInputSource::useL1EventID_
private

Definition at line 102 of file FedRawDataInputSource.h.

Referenced by read().

◆ verifyChecksum_

const bool FedRawDataInputSource::verifyChecksum_
private

Definition at line 101 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

◆ workerJob_

std::vector<ReaderInfo> FedRawDataInputSource::workerJob_
private

Definition at line 151 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), readSupervisor(), and readWorker().

◆ workerPool_

tbb::concurrent_queue<unsigned int> FedRawDataInputSource::workerPool_
private

Definition at line 150 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), and readWorker().

◆ workerThreads_

std::vector<std::thread*> FedRawDataInputSource::workerThreads_
private