CMS 3D CMS Logo

List of all members | Public Member Functions | Static Public Member Functions | Protected Member Functions | Private Types | Private Member Functions | Private Attributes | Friends
FedRawDataInputSource Class Reference

#include <FedRawDataInputSource.h>

Inheritance diagram for FedRawDataInputSource:
edm::RawInputSource edm::InputSource

Public Member Functions

 FedRawDataInputSource (edm::ParameterSet const &, edm::InputSourceDescription const &)
 
std::pair< bool, unsigned int > getEventReport (unsigned int lumi, bool erase)
 
 ~FedRawDataInputSource () override
 
- Public Member Functions inherited from edm::RawInputSource
 RawInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
 
 ~RawInputSource () override
 
- Public Member Functions inherited from edm::InputSource
std::shared_ptr< ActivityRegistryactReg () const
 Accessor for Activity Registry. More...
 
std::shared_ptr< BranchIDListHelper const > branchIDListHelper () const
 Accessors for branchIDListHelper. More...
 
std::shared_ptr< BranchIDListHelper > & branchIDListHelper ()
 
void closeFile (FileBlock *, bool cleaningUpAfterException)
 close current file More...
 
void doBeginJob ()
 Called by framework at beginning of job. More...
 
virtual void doBeginLumi (LuminosityBlockPrincipal &lbp, ProcessContext const *)
 Called by framework at beginning of lumi block. More...
 
virtual void doBeginRun (RunPrincipal &rp, ProcessContext const *)
 Called by framework at beginning of run. More...
 
void doEndJob ()
 Called by framework at end of job. More...
 
void fillProcessBlockHelper ()
 Fill the ProcessBlockHelper with info for the current file. More...
 
ProcessingController::ForwardState forwardState () const
 
bool goToEvent (EventID const &eventID)
 
 InputSource (ParameterSet const &, InputSourceDescription const &)
 Constructor. More...
 
 InputSource (InputSource const &)=delete
 
void issueReports (EventID const &eventID, StreamID streamID)
 issue an event report More...
 
LuminosityBlockNumber_t luminosityBlock () const
 Accessor for current luminosity block number. More...
 
std::shared_ptr< LuminosityBlockAuxiliaryluminosityBlockAuxiliary () const
 Called by the framework to merge or insert lumi in principal cache. More...
 
int maxEvents () const
 
int maxLuminosityBlocks () const
 
ModuleDescription const & moduleDescription () const
 Accessor for 'module' description. More...
 
ItemTypeInfo nextItemType ()
 Advances the source to the next item. More...
 
bool nextProcessBlock (ProcessBlockPrincipal &)
 Next process block, return false if there is none, sets the processName in the principal. More...
 
InputSourceoperator= (InputSource const &)=delete
 
std::shared_ptr< ProcessBlockHelper const > processBlockHelper () const
 Accessors for processBlockHelper. More...
 
std::shared_ptr< ProcessBlockHelper > & processBlockHelper ()
 
ProcessConfiguration const & processConfiguration () const
 Accessor for Process Configuration. More...
 
std::string const & processGUID () const
 Accessor for global process identifier. More...
 
ProcessHistoryRegistry const & processHistoryRegistry () const
 Accessors for process history registry. More...
 
ProcessHistoryRegistryprocessHistoryRegistry ()
 
ProcessingMode processingMode () const
 RunsLumisAndEvents (default), RunsAndLumis, or Runs. More...
 
std::shared_ptr< ProductRegistry const > productRegistry () const
 Accessors for product registry. More...
 
bool randomAccess () const
 
void readAndMergeLumi (LuminosityBlockPrincipal &lbp)
 Read next luminosity block (same as a prior lumi) More...
 
void readAndMergeRun (RunPrincipal &rp)
 Read next run (same as a prior run) More...
 
void readEvent (EventPrincipal &ep, StreamContext &)
 Read next event. More...
 
bool readEvent (EventPrincipal &ep, EventID const &, StreamContext &)
 Read a specific event. More...
 
std::shared_ptr< FileBlockreadFile ()
 Read next file. More...
 
void readLuminosityBlock (LuminosityBlockPrincipal &lumiPrincipal, HistoryAppender &historyAppender)
 Read next luminosity block (new lumi) More...
 
std::shared_ptr< LuminosityBlockAuxiliaryreadLuminosityBlockAuxiliary ()
 Read next luminosity block Auxilary. More...
 
void readProcessBlock (ProcessBlockPrincipal &)
 Read next process block. More...
 
void readRun (RunPrincipal &runPrincipal, HistoryAppender &historyAppender)
 Read next run (new run) More...
 
std::shared_ptr< RunAuxiliaryreadRunAuxiliary ()
 Read next run Auxiliary. More...
 
ProcessHistoryID const & reducedProcessHistoryID () const
 
virtual void registerProducts ()
 Register any produced products. More...
 
int remainingEvents () const
 
int remainingLuminosityBlocks () const
 
void repeat ()
 Reset the remaining number of events/lumis to the maximum number. More...
 
std::pair< SharedResourcesAcquirer *, std::recursive_mutex * > resourceSharedWithDelayedReader ()
 Returns nullptr if no resource shared between the Source and a DelayedReader. More...
 
ProcessingController::ReverseState reverseState () const
 
void rewind ()
 Begin again at the first event. More...
 
RunNumber_t run () const
 Accessor for current run number. More...
 
std::shared_ptr< RunAuxiliaryrunAuxiliary () const
 Called by the framework to merge or insert run in principal cache. More...
 
void setLuminosityBlockNumber_t (LuminosityBlockNumber_t lb)
 Set the luminosity block ID. More...
 
void setRunNumber (RunNumber_t r)
 Set the run number. More...
 
void skipEvents (int offset)
 
void switchTo (std::shared_ptr< ProductRegistry > iOther)
 switch to a different ProductRegistry. More...
 
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper () const
 Accessors for thinnedAssociationsHelper. More...
 
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper ()
 
Timestamp const & timestamp () const
 Accessor for the current time, as seen by the input source. More...
 
virtual ~InputSource () noexcept(false)
 Destructor. More...
 

Static Public Member Functions

static void fillDescriptions (edm::ConfigurationDescriptions &descriptions)
 
- Static Public Member Functions inherited from edm::RawInputSource
static void fillDescription (ParameterSetDescription &description)
 
- Static Public Member Functions inherited from edm::InputSource
static const std::string & baseType ()
 
static void fillDescription (ParameterSetDescription &desc)
 
static void fillDescriptions (ConfigurationDescriptions &descriptions)
 
static void prevalidate (ConfigurationDescriptions &)
 

Protected Member Functions

Next checkNext () override
 
void read (edm::EventPrincipal &eventPrincipal) override
 
void setMonState (evf::FastMonState::InputState state)
 
void setMonStateSup (evf::FastMonState::InputState state)
 
- Protected Member Functions inherited from edm::RawInputSource
void makeEvent (EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
 
void setInputFileTransitionsEachEvent ()
 
- Protected Member Functions inherited from edm::InputSource
virtual void beginJob ()
 Begin protected makes it easier to do template programming. More...
 
void decreaseRemainingEventsBy (int iSkipped)
 
bool eventCached () const
 
bool newLumi () const
 
bool newRun () const
 
ProcessHistoryRegistryprocessHistoryRegistryForUpdate ()
 
ProductRegistryproductRegistryUpdate ()
 
void reset () const
 
void resetEventCached ()
 
void resetLuminosityBlockAuxiliary (bool isNewLumi=true) const
 
void resetNewLumi ()
 
void resetNewRun ()
 
void resetRunAuxiliary (bool isNewRun=true) const
 
void setEventCached ()
 Called by the framework to merge or ached() const {return eventCached_;}. More...
 
void setLuminosityBlockAuxiliary (LuminosityBlockAuxiliary *lbp)
 
void setNewLumi ()
 
void setNewRun ()
 
void setRunAuxiliary (RunAuxiliary *rp)
 
void setTimestamp (Timestamp const &theTime)
 To set the current time, as seen by the input source. More...
 
virtual void skip (int offset)
 
ItemTypeInfo state () const
 

Private Types

typedef std::pair< InputFile *, InputChunk * > ReaderInfo
 

Private Member Functions

bool exceptionState ()
 
edm::Timestamp fillFEDRawDataCollection (FEDRawDataCollection &rawData, bool &tcdsInRange)
 
evf::EvFDaqDirector::FileStatus getFile (unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
 
evf::EvFDaqDirector::FileStatus getNextEvent ()
 
long initFileList ()
 
void maybeOpenNewLumiSection (const uint32_t lumiSection)
 
evf::EvFDaqDirector::FileStatus nextEvent ()
 
void readNextChunkIntoBuffer (InputFile *file)
 
void readSupervisor ()
 
void readWorker (unsigned int tid)
 
void reportEventsThisLumiInSource (unsigned int lumi, unsigned int events)
 
void rewind_ () override
 
void threadError ()
 

Private Attributes

const bool alwaysStartFromFirstLS_
 
uint32_t bufferInputRead_ = 0
 
unsigned int checkEvery_ = 10
 
bool chunkIsFree_ = false
 
std::unique_ptr< InputFilecurrentFile_
 
int currentFileIndex_ = -1
 
unsigned int currentLumiSection_
 
std::vector< std::unique_ptr< std::condition_variable > > cvReader_
 
std::condition_variable cvWakeup_
 
evf::EvFDaqDirectordaqDirector_ = nullptr
 
const edm::DaqProvenanceHelper daqProvenanceHelper_
 
std::string defPath_
 
uint16_t detectedFRDversion_ = 0
 
std::unique_ptr< FRDEventMsgViewevent_
 
unsigned int eventChunkBlock_
 
unsigned int eventChunkSize_
 
edm::EventID eventID_
 
uint32_t eventRunNumber_ = 0
 
unsigned int eventsThisLumi_
 
unsigned long eventsThisRun_ = 0
 
std::mutex fileDeleteLock_
 
int fileDescriptor_ = -1
 
unsigned int fileListIndex_ = 0
 
const bool fileListLoopMode_
 
const bool fileListMode_
 
std::vector< std::string > fileNames_
 
std::list< std::pair< int, std::string > > fileNamesToDelete_
 
tbb::concurrent_queue< std::unique_ptr< InputFile > > fileQueue_
 
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
 
evf::FastMonitoringServicefms_ = nullptr
 
tbb::concurrent_queue< InputChunk * > freeChunks_
 
std::string fuOutputDir_
 
const bool getLSFromFilename_
 
uint32_t GTPEventID_ = 0
 
uint32_t L1EventID_ = 0
 
unsigned int loopModeIterationInc_ = 0
 
unsigned int maxBufferedFiles_
 
uint16_t MAXTCDSuTCAFEDID_ = FEDNumbering::MAXTCDSuTCAFEDID
 
uint16_t MINTCDSuTCAFEDID_ = FEDNumbering::MINTCDSuTCAFEDID
 
std::mutex monlock_
 
std::mutex mReader_
 
std::mutex mWakeup_
 
unsigned int numBuffers_
 
unsigned int numConcurrentReads_
 
edm::ProcessHistoryID processHistoryID_
 
std::atomic< bool > quit_threads_
 
unsigned int readBlocks_
 
std::atomic< unsigned int > readingFilesCount_
 
std::unique_ptr< std::thread > readSupervisorThread_
 
edm::RunNumber_t runNumber_
 
bool setExceptionState_ = false
 
bool singleBufferMode_
 
std::map< unsigned int, unsigned int > sourceEventsReport_
 
bool startedSupervisorThread_ = false
 
std::condition_variable startupCv_
 
std::mutex startupLock_
 
std::vector< int > streamFileTracker_
 
unsigned char * tcds_pointer_
 
const std::vector< unsigned int > testTCDSFEDRange_
 
std::vector< unsigned int > thread_quit_signal
 
std::atomic< bool > threadInit_
 
std::vector< unsigned int > tid_active_
 
bool useFileBroker_
 
const bool useL1EventID_
 
const bool verifyChecksum_
 
std::vector< ReaderInfoworkerJob_
 
tbb::concurrent_queue< unsigned int > workerPool_
 
std::vector< std::thread * > workerThreads_
 

Friends

struct InputChunk
 
class InputFile
 

Additional Inherited Members

- Public Types inherited from edm::RawInputSource
enum  Next { Next::kEvent, Next::kFile, Next::kStop }
 
- Public Types inherited from edm::InputSource
enum  ItemPosition : char { ItemPosition::Invalid, ItemPosition::LastItemToBeMerged, ItemPosition::NotLastItemToBeMerged }
 
enum  ItemType : char {
  ItemType::IsInvalid, ItemType::IsStop, ItemType::IsFile, ItemType::IsRun,
  ItemType::IsLumi, ItemType::IsEvent, ItemType::IsRepeat, ItemType::IsSynchronize
}
 
enum  ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents }
 
- Public Attributes inherited from edm::InputSource
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> postEventReadFromSourceSignal_
 
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> preEventReadFromSourceSignal_
 

Detailed Description

Definition at line 42 of file FedRawDataInputSource.h.

Member Typedef Documentation

◆ ReaderInfo

Definition at line 140 of file FedRawDataInputSource.h.

Constructor & Destructor Documentation

◆ FedRawDataInputSource()

FedRawDataInputSource::FedRawDataInputSource ( edm::ParameterSet const &  pset,
edm::InputSourceDescription const &  desc 
)
explicit

Definition at line 51 of file FedRawDataInputSource.cc.

References cms::cuda::assert(), edm::Timestamp::beginOfTime(), crc32c_hw_test(), cvReader_, daqDirector_, edm::DaqProvenanceHelper::daqInit(), daqProvenanceHelper_, eventChunkBlock_, eventChunkSize_, Exception, fileDeleteLock_, fileListLoopMode_, fileListMode_, filesToDelete_, fms_, freeChunks_, mps_fire::i, evf::FastMonState::inInit, initFileList(), InputChunk, edm::Timestamp::invalidTimestamp(), MAXTCDSuTCAFEDID_, MINTCDSuTCAFEDID_, numBuffers_, numConcurrentReads_, Utilities::operator, processHistoryID_, edm::InputSource::processHistoryRegistryForUpdate(), edm::InputSource::productRegistryUpdate(), quit_threads_, readBlocks_, readingFilesCount_, readWorker(), edm::InputSource::runAuxiliary(), runNumber_, evf::EvFDaqDirector::setDeleteTracking(), evf::EvFDaqDirector::setFMS(), evf::FastMonitoringService::setInputSource(), evf::FastMonitoringService::setInState(), evf::FastMonitoringService::setInStateSup(), edm::InputSource::setNewRun(), edm::InputSource::setRunAuxiliary(), singleBufferMode_, startupCv_, startupLock_, testTCDSFEDRange_, thread_quit_signal, tid_active_, evf::EvFDaqDirector::useFileBroker(), useFileBroker_, workerJob_, and workerThreads_.

53  defPath_(pset.getUntrackedParameter<std::string>("buDefPath", "")),
54  eventChunkSize_(pset.getUntrackedParameter<unsigned int>("eventChunkSize", 32) * 1048576),
55  eventChunkBlock_(pset.getUntrackedParameter<unsigned int>("eventChunkBlock", 32) * 1048576),
56  numBuffers_(pset.getUntrackedParameter<unsigned int>("numBuffers", 2)),
57  maxBufferedFiles_(pset.getUntrackedParameter<unsigned int>("maxBufferedFiles", 2)),
58  getLSFromFilename_(pset.getUntrackedParameter<bool>("getLSFromFilename", true)),
59  alwaysStartFromFirstLS_(pset.getUntrackedParameter<bool>("alwaysStartFromFirstLS", false)),
60  verifyChecksum_(pset.getUntrackedParameter<bool>("verifyChecksum", true)),
61  useL1EventID_(pset.getUntrackedParameter<bool>("useL1EventID", false)),
63  pset.getUntrackedParameter<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())),
64  fileNames_(pset.getUntrackedParameter<std::vector<std::string>>("fileNames", std::vector<std::string>())),
65  fileListMode_(pset.getUntrackedParameter<bool>("fileListMode", false)),
66  fileListLoopMode_(pset.getUntrackedParameter<bool>("fileListLoopMode", false)),
69  eventID_(),
72  tcds_pointer_(nullptr),
73  eventsThisLumi_(0) {
74  char thishost[256];
75  gethostname(thishost, 255);
76  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: " << std::endl
77  << (eventChunkSize_ / 1048576) << " MB on host " << thishost;
78 
79  if (!testTCDSFEDRange_.empty()) {
80  if (testTCDSFEDRange_.size() != 2) {
81  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection")
82  << "Invalid TCDS Test FED range parameter";
83  }
86  }
87 
88  long autoRunNumber = -1;
89  if (fileListMode_) {
90  autoRunNumber = initFileList();
91  if (!fileListLoopMode_) {
92  if (autoRunNumber < 0)
93  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") << "Run number not found from filename";
94  //override run number
95  runNumber_ = (edm::RunNumber_t)autoRunNumber;
96  edm::Service<evf::EvFDaqDirector>()->overrideRunNumber((unsigned int)autoRunNumber);
97  }
98  }
99 
101  setNewRun();
102  //todo:autodetect from file name (assert if names differ)
104 
105  //make sure that chunk size is N * block size
110 
111  if (!numBuffers_)
112  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource")
113  << "no reading enabled with numBuffers parameter 0";
114 
117  readingFilesCount_ = 0;
118 
119  if (!crc32c_hw_test())
120  edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
121 
122  //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
123  if (fileListMode_) {
124  try {
126  } catch (cms::Exception const&) {
127  edm::LogInfo("FedRawDataInputSource") << "No FastMonitoringService found in the configuration";
128  }
129  } else {
131  if (!fms_) {
132  throw cms::Exception("FedRawDataInputSource") << "FastMonitoringService not found";
133  }
134  }
135 
137  if (!daqDirector_)
138  cms::Exception("FedRawDataInputSource") << "EvFDaqDirector not found";
139 
141  if (useFileBroker_)
142  edm::LogInfo("FedRawDataInputSource") << "EvFDaqDirector/Source configured to use file service";
143  //set DaqDirector to delete files in preGlobalEndLumi callback
145  if (fms_) {
147  fms_->setInputSource(this);
150  }
151  //should delete chunks when run stops
152  for (unsigned int i = 0; i < numBuffers_; i++) {
154  }
155 
156  quit_threads_ = false;
157 
158  //prepare data shared by threads
159  for (unsigned int i = 0; i < numConcurrentReads_; i++) {
160  thread_quit_signal.push_back(false);
161  workerJob_.push_back(ReaderInfo(nullptr, nullptr));
162  cvReader_.push_back(std::make_unique<std::condition_variable>());
163  tid_active_.push_back(0);
164  }
165 
166  //start threads
167  for (unsigned int i = 0; i < numConcurrentReads_; i++) {
168  //wait for each thread to complete initialization
169  std::unique_lock<std::mutex> lk(startupLock_);
170  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker, this, i));
171  startupCv_.wait(lk);
172  }
173 
174  runAuxiliary()->setProcessHistoryID(processHistoryID_);
175 }
std::vector< std::string > fileNames_
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:261
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:359
static Timestamp invalidTimestamp()
Definition: Timestamp.h:75
bool crc32c_hw_test()
Definition: crc32c.cc:354
bool useFileBroker() const
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
std::pair< InputFile *, InputChunk * > ReaderInfo
assert(be >=bs)
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
static Timestamp beginOfTime()
Definition: Timestamp.h:77
const std::vector< unsigned int > testTCDSFEDRange_
edm::ProcessHistoryID processHistoryID_
const edm::DaqProvenanceHelper daqProvenanceHelper_
std::vector< std::thread * > workerThreads_
void setInputSource(FedRawDataInputSource *inputSource)
Log< level::Info, false > LogInfo
def getRunNumber(filename)
void setInStateSup(FastMonState::InputState inputState)
std::condition_variable startupCv_
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:360
evf::EvFDaqDirector * daqDirector_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:362
void readWorker(unsigned int tid)
std::vector< unsigned int > thread_quit_signal
std::atomic< unsigned int > readingFilesCount_
std::vector< std::unique_ptr< std::condition_variable > > cvReader_
evf::FastMonitoringService * fms_
std::vector< unsigned int > tid_active_
unsigned int RunNumber_t
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
tbb::concurrent_queue< InputChunk * > freeChunks_
void setFMS(evf::FastMonitoringService *fms)
void setInState(FastMonState::InputState inputState)

◆ ~FedRawDataInputSource()

FedRawDataInputSource::~FedRawDataInputSource ( )
override

Definition at line 177 of file FedRawDataInputSource.cc.

References currentFile_, cvReader_, evf::FastMonitoringService::exceptionDetected(), filesToDelete_, fms_, mps_fire::i, evf::FastMonitoringService::isExceptionOnData(), ALPAKA_ACCELERATOR_NAMESPACE::vertexFinder::it, mReader_, quit_threads_, readSupervisorThread_, startedSupervisorThread_, thread_quit_signal, and workerThreads_.

177  {
178  quit_threads_ = true;
179 
180  //delete any remaining open files
181  if (!fms_ || !fms_->exceptionDetected()) {
182  for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++)
183  it->second.reset();
184  } else {
185  //skip deleting files with exception
186  for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++) {
187  //it->second->unsetDeleteFile();
188  if (fms_->isExceptionOnData(it->second->lumi_))
189  it->second->unsetDeleteFile();
190  else
191  it->second.reset();
192  }
193  //disable deleting current file with exception
194  if (currentFile_.get())
195  if (fms_->isExceptionOnData(currentFile_->lumi_))
196  currentFile_->unsetDeleteFile();
197  }
198 
200  readSupervisorThread_->join();
201  } else {
202  //join aux threads in case the supervisor thread was not started
203  for (unsigned int i = 0; i < workerThreads_.size(); i++) {
204  std::unique_lock<std::mutex> lk(mReader_);
205  thread_quit_signal[i] = true;
206  cvReader_[i]->notify_one();
207  lk.unlock();
208  workerThreads_[i]->join();
209  delete workerThreads_[i];
210  }
211  }
212 }
std::atomic< bool > quit_threads_
std::unique_ptr< std::thread > readSupervisorThread_
bool isExceptionOnData(unsigned int ls)
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
std::vector< std::thread * > workerThreads_
std::vector< unsigned int > thread_quit_signal
std::vector< std::unique_ptr< std::condition_variable > > cvReader_
evf::FastMonitoringService * fms_
std::unique_ptr< InputFile > currentFile_

Member Function Documentation

◆ checkNext()

edm::RawInputSource::Next FedRawDataInputSource::checkNext ( )
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 239 of file FedRawDataInputSource.cc.

References visDQMUpload::buf, evf::EvFDaqDirector::createProcessingNotificationMaybe(), currentLumiSection_, daqDirector_, event_, eventRunNumber_, eventsThisLumi_, fileListLoopMode_, fileListMode_, newFWLiteAna::found, evf::EvFDaqDirector::getEoLSFilePathOnBU(), evf::EvFDaqDirector::getEoLSFilePathOnFU(), evf::EvFDaqDirector::getEoRFilePathOnFU(), getLSFromFilename_, evf::FastMonState::inWaitInput, edm::RawInputSource::kEvent, edm::RawInputSource::kStop, L1EventID_, evf::EvFDaqDirector::lockFULocal2(), maybeOpenNewLumiSection(), evf::EvFDaqDirector::newLumi, nextEvent(), evf::EvFDaqDirector::noFile, readSupervisor(), readSupervisorThread_, reportEventsThisLumiInSource(), edm::InputSource::resetLuminosityBlockAuxiliary(), evf::EvFDaqDirector::runEnded, runNumber_, edm::InputSource::setEventCached(), setMonState(), startedSupervisorThread_, startupCv_, startupLock_, edm_modernize_messagelogger::stat, AlCaHLTBitMon_QueryRunRegistry::string, evf::EvFDaqDirector::unlockFULocal2(), and useFileBroker_.

239  {
241  //this thread opens new files and dispatches reading to worker readers
242  std::unique_lock<std::mutex> lk(startupLock_);
243  readSupervisorThread_ = std::make_unique<std::thread>(&FedRawDataInputSource::readSupervisor, this);
245  startupCv_.wait(lk);
246  }
247  //signal hltd to start event accounting
248  if (!currentLumiSection_)
251  switch (nextEvent()) {
253  //maybe create EoL file in working directory before ending run
254  struct stat buf;
255  if (!useFileBroker_ && currentLumiSection_ > 0) {
256  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
257  if (eolFound) {
259  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
260  if (!found) {
262  int eol_fd =
263  open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
264  close(eol_fd);
266  }
267  }
268  }
269  //also create EoR file in FU data directory
270  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(), &buf) == 0);
271  if (!eorFound) {
272  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(),
273  O_RDWR | O_CREAT,
274  S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
275  close(eor_fd);
276  }
278  eventsThisLumi_ = 0;
280  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
281  return Next::kStop;
282  }
284  //this is not reachable
285  return Next::kEvent;
286  }
288  //std::cout << "--------------NEW LUMI---------------" << std::endl;
289  return Next::kEvent;
290  }
291  default: {
292  if (!getLSFromFilename_) {
293  //get new lumi from file header
294  if (event_->lumi() > currentLumiSection_) {
296  eventsThisLumi_ = 0;
298  }
299  }
302  else
303  eventRunNumber_ = event_->run();
304  L1EventID_ = event_->event();
305 
306  setEventCached();
307 
308  return Next::kEvent;
309  }
310  }
311 }
void maybeOpenNewLumiSection(const uint32_t lumiSection)
std::string getEoLSFilePathOnFU(const unsigned int ls) const
std::unique_ptr< std::thread > readSupervisorThread_
std::string getEoRFilePathOnFU() const
std::string getEoLSFilePathOnBU(const unsigned int ls) const
void setMonState(evf::FastMonState::InputState state)
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:391
void createProcessingNotificationMaybe() const
Log< level::Info, false > LogInfo
std::unique_ptr< FRDEventMsgView > event_
std::condition_variable startupCv_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:374
evf::EvFDaqDirector * daqDirector_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
evf::EvFDaqDirector::FileStatus nextEvent()

◆ exceptionState()

bool FedRawDataInputSource::exceptionState ( )
inlineprivate

Definition at line 70 of file FedRawDataInputSource.h.

References setExceptionState_.

Referenced by InputFile::advance().

◆ fillDescriptions()

void FedRawDataInputSource::fillDescriptions ( edm::ConfigurationDescriptions descriptions)
static

Definition at line 214 of file FedRawDataInputSource.cc.

References edm::ConfigurationDescriptions::add(), and submitPVResolutionJobs::desc.

214  {
216  desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk");
217  desc.addUntracked<unsigned int>("eventChunkSize", 32)->setComment("Input buffer (chunk) size");
218  desc.addUntracked<unsigned int>("eventChunkBlock", 32)
219  ->setComment("Block size used in a single file read call (must be smaller or equal to buffer size)");
220  desc.addUntracked<unsigned int>("numBuffers", 2)->setComment("Number of buffers used for reading input");
221  desc.addUntracked<unsigned int>("maxBufferedFiles", 2)
222  ->setComment("Maximum number of simultaneously buffered raw files");
223  desc.addUntracked<unsigned int>("alwaysStartFromfirstLS", false)
224  ->setComment("Force source to start from LS 1 if server provides higher lumisection number");
225  desc.addUntracked<bool>("verifyChecksum", true)
226  ->setComment("Verify event CRC-32C checksum of FRDv5 and higher or Adler32 with v3 and v4");
227  desc.addUntracked<bool>("useL1EventID", false)
228  ->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
229  desc.addUntracked<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())
230  ->setComment("[min, max] range to search for TCDS FED ID in test setup");
231  desc.addUntracked<bool>("fileListMode", false)
232  ->setComment("Use fileNames parameter to directly specify raw files to open");
233  desc.addUntracked<std::vector<std::string>>("fileNames", std::vector<std::string>())
234  ->setComment("file list used when fileListMode is enabled");
235  desc.setAllowAnything();
236  descriptions.add("source", desc);
237 }
void add(std::string const &label, ParameterSetDescription const &psetDescription)

◆ fillFEDRawDataCollection()

edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection ( FEDRawDataCollection rawData,
bool &  tcdsInRange 
)
private

Definition at line 718 of file FedRawDataInputSource.cc.

References cms::cuda::assert(), FEDRawData::data(), event_, evf::evtn::evm_board_sense(), Exception, l1tstage2_dqm_sourceclient-live_cfg::fedId, FEDTrailer::fragmentLength(), evf::evtn::get(), evf::evtn::getgpshigh(), evf::evtn::getgpslow(), evf::evtn::gtpe_board_sense(), evf::evtn::gtpe_get(), GTPEventID_, FEDHeader::length, FEDTrailer::length, FEDNumbering::MAXFEDID, FEDNumbering::MAXTCDSuTCAFEDID, MAXTCDSuTCAFEDID_, FEDNumbering::MINTCDSuTCAFEDID, MINTCDSuTCAFEDID_, FEDNumbering::MINTriggerEGTPFEDID, FEDNumbering::MINTriggerGTPFEDID, l1tstage2_dqm_sourceclient-live_cfg::rawData, FEDRawData::resize(), FEDHeader::sourceID(), tcds_pointer_, and hcalRecHitTable_cff::time.

Referenced by read().

718  {
720  timeval stv;
721  gettimeofday(&stv, nullptr);
722  time = stv.tv_sec;
723  time = (time << 32) + stv.tv_usec;
724  edm::Timestamp tstamp(time);
725 
726  uint32_t eventSize = event_->eventSize();
727  unsigned char* event = (unsigned char*)event_->payload();
728  GTPEventID_ = 0;
729  tcds_pointer_ = nullptr;
730  tcdsInRange = false;
731  uint16_t selectedTCDSFed = 0;
732  while (eventSize > 0) {
733  assert(eventSize >= FEDTrailer::length);
734  eventSize -= FEDTrailer::length;
735  const FEDTrailer fedTrailer(event + eventSize);
736  const uint32_t fedSize = fedTrailer.fragmentLength() << 3; //trailer length counts in 8 bytes
737  assert(eventSize >= fedSize - FEDHeader::length);
738  eventSize -= (fedSize - FEDHeader::length);
739  const FEDHeader fedHeader(event + eventSize);
740  const uint16_t fedId = fedHeader.sourceID();
742  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
743  } else if (fedId >= MINTCDSuTCAFEDID_ && fedId <= MAXTCDSuTCAFEDID_) {
744  if (!selectedTCDSFed) {
745  selectedTCDSFed = fedId;
746  tcds_pointer_ = event + eventSize;
748  tcdsInRange = true;
749  }
750  } else
751  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection")
752  << "Second TCDS FED ID " << fedId << " found. First ID: " << selectedTCDSFed;
753  }
755  if (evf::evtn::evm_board_sense(event + eventSize, fedSize))
756  GTPEventID_ = evf::evtn::get(event + eventSize, true);
757  else
758  GTPEventID_ = evf::evtn::get(event + eventSize, false);
759  //evf::evtn::evm_board_setformat(fedSize);
760  const uint64_t gpsl = evf::evtn::getgpslow(event + eventSize);
761  const uint64_t gpsh = evf::evtn::getgpshigh(event + eventSize);
762  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t>((gpsh << 32) + gpsl));
763  }
764  //take event ID from GTPE FED
766  if (evf::evtn::gtpe_board_sense(event + eventSize)) {
767  GTPEventID_ = evf::evtn::gtpe_get(event + eventSize);
768  }
769  }
770  FEDRawData& fedData = rawData.FEDData(fedId);
771  fedData.resize(fedSize);
772  memcpy(fedData.data(), event + eventSize, fedSize);
773  }
774  assert(eventSize == 0);
775 
776  return tstamp;
777 }
unsigned int getgpshigh(const unsigned char *)
bool gtpe_board_sense(const unsigned char *p)
static const uint32_t length
Definition: FEDTrailer.h:57
unsigned int get(const unsigned char *, bool)
static const uint32_t length
Definition: FEDHeader.h:54
assert(be >=bs)
unsigned long long TimeValue_t
Definition: Timestamp.h:21
bool evm_board_sense(const unsigned char *p, size_t size)
unsigned long long uint64_t
Definition: Time.h:13
std::unique_ptr< FRDEventMsgView > event_
void resize(size_t newsize, size_t wordsize=8)
Definition: FEDRawData.cc:28
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:24
unsigned int gtpe_get(const unsigned char *)
unsigned int getgpslow(const unsigned char *)
Definition: event.py:1

◆ getEventReport()

std::pair< bool, unsigned int > FedRawDataInputSource::getEventReport ( unsigned int  lumi,
bool  erase 
)

Definition at line 1610 of file FedRawDataInputSource.cc.

References CommonMethods::lock(), monlock_, runTheMatrix::ret, and sourceEventsReport_.

Referenced by evf::FastMonitoringService::preGlobalEndLumi().

1610  {
1611  std::lock_guard<std::mutex> lock(monlock_);
1612  auto itr = sourceEventsReport_.find(lumi);
1613  if (itr != sourceEventsReport_.end()) {
1614  std::pair<bool, unsigned int> ret(true, itr->second);
1615  if (erase)
1616  sourceEventsReport_.erase(itr);
1617  return ret;
1618  } else
1619  return std::pair<bool, unsigned int>(false, 0);
1620 }
ret
prodAgent to be discontinued
std::map< unsigned int, unsigned int > sourceEventsReport_

◆ getFile()

evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getFile ( unsigned int &  ls,
std::string &  nextFile,
uint32_t &  fsize,
uint64_t &  lockWaitTime 
)
private

Definition at line 1657 of file FedRawDataInputSource.cc.

References fileListIndex_, fileListLoopMode_, MillePedeFileConverter_cfg::fileName, fileNames_, loopModeIterationInc_, eostools::ls(), evf::EvFDaqDirector::newFile, castor_dqm_sourceclient_file_cfg::path, evf::EvFDaqDirector::runEnded, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by readSupervisor().

1660  {
1661  if (fileListIndex_ < fileNames_.size()) {
1662  nextFile = fileNames_[fileListIndex_];
1663  if (nextFile.find("file://") == 0)
1664  nextFile = nextFile.substr(7);
1665  else if (nextFile.find("file:") == 0)
1666  nextFile = nextFile.substr(5);
1667  std::filesystem::path fileName = nextFile;
1668  std::string fileStem = fileName.stem().string();
1669  if (fileStem.find("ls"))
1670  fileStem = fileStem.substr(fileStem.find("ls") + 2);
1671  if (fileStem.find('_'))
1672  fileStem = fileStem.substr(0, fileStem.find('_'));
1673 
1674  if (!fileListLoopMode_)
1675  ls = std::stoul(fileStem);
1676  else //always starting from LS 1 in loop mode
1677  ls = 1 + loopModeIterationInc_;
1678 
1679  //fsize = 0;
1680  //lockWaitTime = 0;
1681  fileListIndex_++;
1683  } else {
1684  if (!fileListLoopMode_)
1686  else {
1687  //loop through files until interrupted
1689  fileListIndex_ = 0;
1690  return getFile(ls, nextFile, fsize, lockWaitTime);
1691  }
1692  }
1693 }
std::vector< std::string > fileNames_
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
def ls(path, rec=False)
Definition: eostools.py:349

◆ getNextEvent()

evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent ( )
inlineprivate

Definition at line 359 of file FedRawDataInputSource.cc.

References cms::cuda::assert(), bufferInputRead_, chunkIsFree_, crc32c(), currentFile_, currentFileIndex_, currentLumiSection_, cvWakeup_, daqDirector_, detectedFRDversion_, event_, eventChunkSize_, eventsThisLumi_, Exception, fileDeleteLock_, fileListMode_, fileQueue_, filesToDelete_, fms_, FRDHeaderMaxVersion, FRDHeaderVersionSize, freeChunks_, getLSFromFilename_, evf::FastMonState::inCachedEvent, evf::FastMonState::inChecksumEvent, evf::FastMonState::inChunkReceived, evf::FastMonState::inNewLumi, evf::FastMonState::inProcessingFile, evf::FastMonState::inRunEnd, evf::FastMonState::inWaitChunk, evf::FastMonState::inWaitInput, evf::EvFDaqDirector::isSingleStreamThread(), dttmaxenums::L, maybeOpenNewLumiSection(), eostools::move(), mWakeup_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, readingFilesCount_, readNextChunkIntoBuffer(), reportEventsThisLumiInSource(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, evf::EvFDaqDirector::sameFile, evf::FastMonitoringService::setExceptionDetected(), setExceptionState_, setMonState(), singleBufferMode_, mps_update::status, threadError(), mps_check::timeout, and verifyChecksum_.

Referenced by nextEvent().

359  {
360  if (setExceptionState_)
361  threadError();
362  if (!currentFile_.get()) {
365  {
366  IdleSourceSentry ids(fms_);
367  if (!fileQueue_.try_pop(currentFile_)) {
368  //sleep until wakeup (only in single-buffer mode) or timeout
369  std::unique_lock<std::mutex> lkw(mWakeup_);
370  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_.get())
372  }
373  }
374  status = currentFile_->status_;
377  currentFile_.reset();
378  return status;
379  } else if (status == evf::EvFDaqDirector::runAbort) {
380  throw cms::Exception("FedRawDataInputSource::getNextEvent")
381  << "Run has been aborted by the input source reader thread";
382  } else if (status == evf::EvFDaqDirector::newLumi) {
384  if (getLSFromFilename_) {
385  if (currentFile_->lumi_ > currentLumiSection_) {
387  eventsThisLumi_ = 0;
389  }
390  } else { //let this be picked up from next event
392  }
393  currentFile_.reset();
394  return status;
395  } else if (status == evf::EvFDaqDirector::newFile) {
397  } else
398  assert(false);
399  }
401 
402  //file is empty
403  if (!currentFile_->fileSize_) {
405  //try to open new lumi
406  assert(currentFile_->nChunks_ == 0);
407  if (getLSFromFilename_)
408  if (currentFile_->lumi_ > currentLumiSection_) {
410  eventsThisLumi_ = 0;
412  }
413  //immediately delete empty file
414  currentFile_.reset();
416  }
417 
418  //file is finished
419  if (currentFile_->bufferPosition_ == currentFile_->fileSize_) {
421  //release last chunk (it is never released elsewhere)
422  freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]);
423  if (currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_)) {
424  throw cms::Exception("FedRawDataInputSource::getNextEvent")
425  << "Fully processed " << currentFile_->nProcessed_ << " from the file " << currentFile_->fileName_
426  << " but according to BU JSON there should be " << currentFile_->nEvents_ << " events";
427  }
428  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
429  if (singleBufferMode_) {
430  std::unique_lock<std::mutex> lkw(mWakeup_);
431  cvWakeup_.notify_one();
432  }
433  bufferInputRead_ = 0;
435  //put the file in pending delete list;
436  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
437  filesToDelete_.push_back(std::pair<int, std::unique_ptr<InputFile>>(currentFileIndex_, std::move(currentFile_)));
438  } else {
439  //in single-thread and stream jobs, events are already processed
440  currentFile_.reset();
441  }
443  }
444 
445  //handle RAW file header
446  if (currentFile_->bufferPosition_ == 0 && currentFile_->rawHeaderSize_ > 0) {
447  if (currentFile_->fileSize_ <= currentFile_->rawHeaderSize_) {
448  if (currentFile_->fileSize_ < currentFile_->rawHeaderSize_)
449  throw cms::Exception("FedRawDataInputSource::getNextEvent")
450  << "Premature end of input file while reading file header";
451 
452  edm::LogWarning("FedRawDataInputSource")
453  << "File with only raw header and no events received in LS " << currentFile_->lumi_;
454  if (getLSFromFilename_)
455  if (currentFile_->lumi_ > currentLumiSection_) {
457  eventsThisLumi_ = 0;
459  }
460  }
461 
462  //advance buffer position to skip file header (chunk will be acquired later)
463  currentFile_->chunkPosition_ += currentFile_->rawHeaderSize_;
464  currentFile_->bufferPosition_ += currentFile_->rawHeaderSize_;
465  }
466 
467  //file is too short
468  if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < FRDHeaderVersionSize[detectedFRDversion_]) {
469  throw cms::Exception("FedRawDataInputSource::getNextEvent")
470  << "Premature end of input file while reading event header";
471  }
472  if (singleBufferMode_) {
473  //should already be there
475  {
476  IdleSourceSentry ids(fms_);
477  while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
478  usleep(10000);
479  if (currentFile_->parent_->exceptionState() || setExceptionState_)
480  currentFile_->parent_->threadError();
481  }
482  }
484 
485  unsigned char* dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
486 
487  //conditions when read amount is not sufficient for the header to fit
491 
492  if (detectedFRDversion_ == 0) {
493  detectedFRDversion_ = *((uint16_t*)dataPosition);
495  throw cms::Exception("FedRawDataInputSource::getNextEvent")
496  << "Unknown FRD version -: " << detectedFRDversion_;
498  }
499 
500  //recalculate chunk position
501  dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
503  throw cms::Exception("FedRawDataInputSource::getNextEvent")
504  << "Premature end of input file while reading event header";
505  }
506  }
507 
508  event_ = std::make_unique<FRDEventMsgView>(dataPosition);
509  if (event_->size() > eventChunkSize_) {
510  throw cms::Exception("FedRawDataInputSource::getNextEvent")
511  << " event id:" << event_->event() << " lumi:" << event_->lumi() << " run:" << event_->run()
512  << " of size:" << event_->size() << " bytes does not fit into a chunk of size:" << eventChunkSize_
513  << " bytes";
514  }
515 
516  const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];
517 
518  if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize) {
519  throw cms::Exception("FedRawDataInputSource::getNextEvent")
520  << "Premature end of input file while reading event data";
521  }
522  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
524  //recalculate chunk position
525  dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
526  event_ = std::make_unique<FRDEventMsgView>(dataPosition);
527  }
528  currentFile_->bufferPosition_ += event_->size();
529  currentFile_->chunkPosition_ += event_->size();
530  //last chunk is released when this function is invoked next time
531 
532  }
533  //multibuffer mode:
534  else {
535  //wait for the current chunk to become added to the vector
537  {
538  IdleSourceSentry ids(fms_);
539  while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
540  usleep(10000);
541  if (setExceptionState_)
542  threadError();
543  }
544  }
546 
547  //check if header is at the boundary of two chunks
548  chunkIsFree_ = false;
549  unsigned char* dataPosition;
550 
551  //read header, copy it to a single chunk if necessary
553  throw cms::Exception("FedRawDataInputSource::getNextEvent")
554  << "Premature end of input file (missing:"
556  << ") while reading event data for next event header";
557  bool chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_]);
558 
559  event_ = std::make_unique<FRDEventMsgView>(dataPosition);
560  if (event_->size() > eventChunkSize_) {
561  throw cms::Exception("FedRawDataInputSource::getNextEvent")
562  << " event id:" << event_->event() << " lumi:" << event_->lumi() << " run:" << event_->run()
563  << " of size:" << event_->size() << " bytes does not fit into a chunk of size:" << eventChunkSize_
564  << " bytes";
565  }
566 
567  const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];
568 
569  if (currentFile_->fileSizeLeft() < msgSize) {
570  throw cms::Exception("FedRawDataInputSource::getNextEvent")
571  << "Premature end of input file (missing:" << (msgSize - currentFile_->fileSizeLeft())
572  << ") while reading event data for event " << event_->event() << " lumi:" << event_->lumi();
573  }
574 
575  if (chunkEnd) {
576  //header was at the chunk boundary, we will have to move payload as well
577  currentFile_->moveToPreviousChunk(msgSize, FRDHeaderVersionSize[detectedFRDversion_]);
578  chunkIsFree_ = true;
579  } else {
580  //header was contiguous, but check if payload fits the chunk
581  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
582  //rewind to header start position
584  //copy event to a chunk start and move pointers
585 
587  {
588  IdleSourceSentry ids(fms_);
589  chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_] + msgSize);
590  }
592 
593  assert(chunkEnd);
594  chunkIsFree_ = true;
595  //header is moved
596  event_ = std::make_unique<FRDEventMsgView>(dataPosition);
597  } else {
598  //everything is in a single chunk, only move pointers forward
599  chunkEnd = currentFile_->advance(dataPosition, msgSize);
600  assert(!chunkEnd);
601  chunkIsFree_ = false;
602  }
603  }
604  //sanity-check check that the buffer position has not exceeded file size after preparing event
605  if (currentFile_->fileSize_ < currentFile_->bufferPosition_) {
606  throw cms::Exception("FedRawDataInputSource::getNextEvent")
607  << "Exceeded file size by " << currentFile_->bufferPosition_ - currentFile_->fileSize_
608  << " after reading last event declared size of " << event_->size() << " bytes";
609  }
610  } //end multibuffer mode
612 
613  if (verifyChecksum_ && event_->version() >= 5) {
614  uint32_t crc = 0;
615  crc = crc32c(crc, (const unsigned char*)event_->payload(), event_->eventSize());
616  if (crc != event_->crc32c()) {
617  if (fms_)
619  throw cms::Exception("FedRawDataInputSource::getNextEvent")
620  << "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() << " but calculated 0x"
621  << crc;
622  }
623  } else if (verifyChecksum_ && event_->version() >= 3) {
624  uint32_t adler = adler32(0L, Z_NULL, 0);
625  adler = adler32(adler, (Bytef*)event_->payload(), event_->eventSize());
626 
627  if (adler != event_->adler32()) {
628  if (fms_)
630  throw cms::Exception("FedRawDataInputSource::getNextEvent")
631  << "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() << " but calculated 0x"
632  << adler;
633  }
634  }
636 
637  currentFile_->nProcessed_++;
638 
640 }
std::condition_variable cvWakeup_
constexpr size_t FRDHeaderMaxVersion
void setExceptionDetected(unsigned int ls)
void maybeOpenNewLumiSection(const uint32_t lumiSection)
tbb::concurrent_queue< std::unique_ptr< InputFile > > fileQueue_
int timeout
Definition: mps_check.py:53
assert(be >=bs)
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
void setMonState(evf::FastMonState::InputState state)
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
Definition: crc32c.cc:340
std::unique_ptr< FRDEventMsgView > event_
evf::EvFDaqDirector * daqDirector_
std::atomic< unsigned int > readingFilesCount_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
evf::FastMonitoringService * fms_
constexpr std::array< uint32, FRDHeaderMaxVersion+1 > FRDHeaderVersionSize
tbb::concurrent_queue< InputChunk * > freeChunks_
Log< level::Warning, false > LogWarning
void readNextChunkIntoBuffer(InputFile *file)
std::unique_ptr< InputFile > currentFile_
def move(src, dest)
Definition: eostools.py:511

◆ initFileList()

long FedRawDataInputSource::initFileList ( )
private

Definition at line 1622 of file FedRawDataInputSource.cc.

References a, b, mps_fire::end, cppFunctionSkipper::exception, MillePedeFileConverter_cfg::fileName, fileNames_, castor_dqm_sourceclient_file_cfg::path, jetUpdater_cfi::sort, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FedRawDataInputSource().

1622  {
1623  std::sort(fileNames_.begin(), fileNames_.end(), [](std::string a, std::string b) {
1624  if (a.rfind('/') != std::string::npos)
1625  a = a.substr(a.rfind('/'));
1626  if (b.rfind('/') != std::string::npos)
1627  b = b.substr(b.rfind('/'));
1628  return b > a;
1629  });
1630 
1631  if (!fileNames_.empty()) {
1632  //get run number from first file in the vector
1634  std::string fileStem = fileName.stem().string();
1635  if (fileStem.find("file://") == 0)
1636  fileStem = fileStem.substr(7);
1637  else if (fileStem.find("file:") == 0)
1638  fileStem = fileStem.substr(5);
1639  auto end = fileStem.find('_');
1640 
1641  if (fileStem.find("run") == 0) {
1642  std::string runStr = fileStem.substr(3, end - 3);
1643  try {
1644  //get long to support test run numbers < 2^32
1645  long rval = std::stol(runStr);
1646  edm::LogInfo("FedRawDataInputSource") << "Autodetected run number in fileListMode -: " << rval;
1647  return rval;
1648  } catch (const std::exception&) {
1649  edm::LogWarning("FedRawDataInputSource")
1650  << "Unable to autodetect run number in fileListMode from file -: " << fileName;
1651  }
1652  }
1653  }
1654  return -1;
1655 }
std::vector< std::string > fileNames_
Log< level::Info, false > LogInfo
double b
Definition: hdecay.h:120
double a
Definition: hdecay.h:121
Log< level::Warning, false > LogWarning

◆ maybeOpenNewLumiSection()

void FedRawDataInputSource::maybeOpenNewLumiSection ( const uint32_t  lumiSection)
private

Definition at line 313 of file FedRawDataInputSource.cc.

References visDQMUpload::buf, evf::EvFDaqDirector::createBoLSFile(), currentLumiSection_, daqDirector_, newFWLiteAna::found, evf::EvFDaqDirector::getEoLSFilePathOnFU(), edm::Timestamp::invalidTimestamp(), evf::EvFDaqDirector::lockFULocal2(), edm::InputSource::luminosityBlock(), edm::InputSource::luminosityBlockAuxiliary(), processHistoryID_, edm::InputSource::resetLuminosityBlockAuxiliary(), edm::InputSource::run(), edm::InputSource::runAuxiliary(), edm::InputSource::setLuminosityBlockAuxiliary(), edm_modernize_messagelogger::stat, AlCaHLTBitMon_QueryRunRegistry::string, evf::EvFDaqDirector::unlockFULocal2(), and useFileBroker_.

Referenced by checkNext(), and getNextEvent().

313  {
314  if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
315  if (!useFileBroker_) {
316  if (currentLumiSection_ > 0) {
318  struct stat buf;
319  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
320  if (!found) {
322  int eol_fd =
323  open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
324  close(eol_fd);
325  daqDirector_->createBoLSFile(lumiSection, false);
327  }
328  } else
329  daqDirector_->createBoLSFile(lumiSection, true); //needed for initial lumisection
330  }
331 
332  currentLumiSection_ = lumiSection;
333 
335 
336  timeval tv;
337  gettimeofday(&tv, nullptr);
338  const edm::Timestamp lsopentime((unsigned long long)tv.tv_sec * 1000000 + (unsigned long long)tv.tv_usec);
339 
341  runAuxiliary()->run(), lumiSection, lsopentime, edm::Timestamp::invalidTimestamp());
342 
343  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
344  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
345 
346  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: " << lumiSection;
347  }
348 }
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:261
static Timestamp invalidTimestamp()
Definition: Timestamp.h:75
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:458
std::string getEoLSFilePathOnFU(const unsigned int ls) const
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:366
edm::ProcessHistoryID processHistoryID_
Log< level::Info, false > LogInfo
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:374
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:463
evf::EvFDaqDirector * daqDirector_
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:264
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const

◆ nextEvent()

evf::EvFDaqDirector::FileStatus FedRawDataInputSource::nextEvent ( )
inlineprivate

Definition at line 350 of file FedRawDataInputSource.cc.

References getNextEvent(), svgfig::load(), evf::EvFDaqDirector::noFile, edm::shutdown_flag, and mps_update::status.

Referenced by checkNext().

350  {
353  if (edm::shutdown_flag.load(std::memory_order_relaxed))
354  break;
355  }
356  return status;
357 }
volatile std::atomic< bool > shutdown_flag
def load(fileName)
Definition: svgfig.py:547
evf::EvFDaqDirector::FileStatus getNextEvent()

◆ read()

void FedRawDataInputSource::read ( edm::EventPrincipal eventPrincipal)
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 642 of file FedRawDataInputSource.cc.

References printConversionInfo::aux, edm::DaqProvenanceHelper::branchDescription(), checkEvery_, chunkIsFree_, runTheMatrix::const, currentFile_, currentFileIndex_, currentLumiSection_, daqProvenanceHelper_, edm::DaqProvenanceHelper::dummyProvenance(), event_, eventID_, eventRunNumber_, eventsThisLumi_, Exception, fileDeleteLock_, fileListLoopMode_, filesToDelete_, fillFEDRawDataCollection(), fms_, freeChunks_, GTPEventID_, mps_fire::i, evf::FastMonState::inNoRequest, evf::FastMonState::inReadCleanup, evf::FastMonState::inReadEvent, evf::FastMonitoringService::isExceptionOnData(), ALPAKA_ACCELERATOR_NAMESPACE::vertexFinder::it, L1EventID_, FEDHeader::length, edm::RawInputSource::makeEvent(), evf::evtn::makeEventAuxiliary(), eostools::move(), edm::EventAuxiliary::PhysicsTrigger, edm::InputSource::processGUID(), processHistoryID_, edm::EventPrincipal::put(), l1tstage2_dqm_sourceclient-live_cfg::rawData, setMonState(), streamFileTracker_, edm::EventPrincipal::streamID(), AlCaHLTBitMon_QueryRunRegistry::string, tcds_pointer_, FEDHeader::triggerType(), and useL1EventID_.

Referenced by edmIntegrityCheck.PublishToFileSystem::get(), readNextChunkIntoBuffer(), and readWorker().

642  {
644  std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
645  bool tcdsInRange;
646  edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData, tcdsInRange);
647 
648  if (useL1EventID_) {
651  aux.setProcessHistoryID(processHistoryID_);
652  makeEvent(eventPrincipal, aux);
653  } else if (tcds_pointer_ == nullptr) {
654  if (!GTPEventID_) {
655  throw cms::Exception("FedRawDataInputSource::read")
656  << "No TCDS or GTP FED in event with FEDHeader EID -: " << L1EventID_;
657  }
660  aux.setProcessHistoryID(processHistoryID_);
661  makeEvent(eventPrincipal, aux);
662  } else {
663  const FEDHeader fedHeader(tcds_pointer_);
664  tcds::Raw_v1 const* tcds = reinterpret_cast<tcds::Raw_v1 const*>(tcds_pointer_ + FEDHeader::length);
669  event_->isRealData(),
670  static_cast<edm::EventAuxiliary::ExperimentType>(fedHeader.triggerType()),
671  processGUID(),
673  !tcdsInRange);
674  aux.setProcessHistoryID(processHistoryID_);
675  makeEvent(eventPrincipal, aux);
676  }
677 
678  std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
679 
681 
682  eventsThisLumi_++;
684 
685  //resize vector if needed
686  while (streamFileTracker_.size() <= eventPrincipal.streamID())
687  streamFileTracker_.push_back(-1);
688 
689  streamFileTracker_[eventPrincipal.streamID()] = currentFileIndex_;
690 
691  //this old file check runs no more often than every 10 events
692  if (!((currentFile_->nProcessed_ - 1) % (checkEvery_))) {
693  //delete files that are not in processing
694  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
695  auto it = filesToDelete_.begin();
696  while (it != filesToDelete_.end()) {
697  bool fileIsBeingProcessed = false;
698  for (unsigned int i = 0; i < streamFileTracker_.size(); i++) {
699  if (it->first == streamFileTracker_.at(i)) {
700  fileIsBeingProcessed = true;
701  break;
702  }
703  }
704  if (!fileIsBeingProcessed && (!fms_ || !fms_->isExceptionOnData(it->second->lumi_))) {
705  std::string fileToDelete = it->second->fileName_;
706  it = filesToDelete_.erase(it);
707  } else
708  it++;
709  }
710  }
711  if (chunkIsFree_)
712  freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_ - 1]);
713  chunkIsFree_ = false;
715  return;
716 }
static const uint32_t length
Definition: FEDHeader.h:54
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection &rawData, bool &tcdsInRange)
StreamID streamID() const
bool isExceptionOnData(unsigned int ls)
edm::EventAuxiliary makeEventAuxiliary(const tcds::Raw_v1 *, unsigned int runNumber, unsigned int lumiSection, bool isRealData, const edm::EventAuxiliary::ExperimentType &, const std::string &processGUID, bool verifyLumiSection, bool suppressWarning)
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance) const
Definition: TCDSRaw.h:16
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:226
std::vector< int > streamFileTracker_
ProductProvenance const & dummyProvenance() const
edm::ProcessHistoryID processHistoryID_
const edm::DaqProvenanceHelper daqProvenanceHelper_
void setMonState(evf::FastMonState::InputState state)
BranchDescription const & branchDescription() const
std::unique_ptr< FRDEventMsgView > event_
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
evf::FastMonitoringService * fms_
tbb::concurrent_queue< InputChunk * > freeChunks_
std::unique_ptr< InputFile > currentFile_
def move(src, dest)
Definition: eostools.py:511

◆ readNextChunkIntoBuffer()

void FedRawDataInputSource::readNextChunkIntoBuffer ( InputFile file)
private

Definition at line 1530 of file FedRawDataInputSource.cc.

References bufferInputRead_, eventChunkBlock_, eventChunkSize_, Exception, geometryDiff::file, fileDescriptor_, mps_fire::i, dqmdumpme::last, LogDebug, read(), and readBlocks_.

Referenced by getNextEvent().

1530  {
1531  uint32_t existingSize = 0;
1532 
1533  if (fileDescriptor_ < 0) {
1534  bufferInputRead_ = 0;
1535  if (file->rawFd_ == -1) {
1536  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1537  if (file->rawHeaderSize_)
1538  lseek(fileDescriptor_, file->rawHeaderSize_, SEEK_SET);
1539  } else
1540  fileDescriptor_ = file->rawFd_;
1541 
1542  //skip header size in destination buffer (chunk position was already adjusted)
1543  bufferInputRead_ += file->rawHeaderSize_;
1544  existingSize += file->rawHeaderSize_;
1545 
1546  if (fileDescriptor_ >= 0)
1547  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1548  else {
1549  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer")
1550  << "failed to open file " << std::endl
1551  << file->fileName_ << " fd:" << fileDescriptor_;
1552  }
1553  //fill chunk (skipping file header if present)
1554  for (unsigned int i = 0; i < readBlocks_; i++) {
1555  const ssize_t last = ::read(fileDescriptor_,
1556  (void*)(file->chunks_[0]->buf_ + existingSize),
1557  eventChunkBlock_ - (i == readBlocks_ - 1 ? existingSize : 0));
1559  existingSize += last;
1560  }
1561 
1562  } else {
1563  //continue reading
1564  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1565  for (unsigned int i = 0; i < readBlocks_; i++) {
1566  const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1568  existingSize += last;
1569  }
1570  } else {
1571  //event didn't fit in last chunk, so leftover must be moved to the beginning and completed
1572  uint32_t existingSizeLeft = eventChunkSize_ - file->chunkPosition_;
1573  memmove((void*)file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSizeLeft);
1574 
1575  //calculate amount of data that can be added
1576  const uint32_t blockcount = file->chunkPosition_ / eventChunkBlock_;
1577  const uint32_t leftsize = file->chunkPosition_ % eventChunkBlock_;
1578 
1579  for (uint32_t i = 0; i < blockcount; i++) {
1580  const ssize_t last =
1581  ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), eventChunkBlock_);
1583  existingSizeLeft += last;
1584  }
1585  if (leftsize) {
1586  const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), leftsize);
1588  }
1589  file->chunkPosition_ = 0; //data was moved to beginning of the chunk
1590  }
1591  }
1592  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1593  if (fileDescriptor_ != -1) {
1594  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1595  close(fileDescriptor_);
1596  file->rawFd_ = fileDescriptor_ = -1;
1597  }
1598  }
1599 }
void read(edm::EventPrincipal &eventPrincipal) override
#define LogDebug(id)

◆ readSupervisor()

void FedRawDataInputSource::readSupervisor ( )
private

Definition at line 781 of file FedRawDataInputSource.cc.

References alwaysStartFromFirstLS_, cms::cuda::assert(), cvReader_, cvWakeup_, daqDirector_, relativeConstraints::empty, eventChunkSize_, fileListMode_, fileQueue_, fms_, freeChunks_, getFile(), getLSFromFilename_, evf::EvFDaqDirector::getLumisectionToStart(), evf::EvFDaqDirector::getNextFromFileBroker(), evf::EvFDaqDirector::getStartLumisectionFromEnv(), evf::EvFDaqDirector::grabNextJsonFileAndUnlock(), evf::EvFDaqDirector::grabNextJsonFromRaw(), mps_fire::i, dqmiodatasetharvest::inf, InputFile, evf::EvFDaqDirector::inputThrottled(), evf::FastMonState::inRunEnd, evf::FastMonState::inSupBusy, evf::FastMonState::inSupFileLimit, evf::FastMonState::inSupLockPolling, evf::FastMonState::inSupNewFile, evf::FastMonState::inSupNewFileWaitChunk, evf::FastMonState::inSupNewFileWaitChunkCopying, evf::FastMonState::inSupNewFileWaitThread, evf::FastMonState::inSupNewFileWaitThreadCopying, evf::FastMonState::inSupNoFile, evf::FastMonState::inSupWaitFreeChunk, evf::FastMonState::inSupWaitFreeChunkCopying, evf::FastMonState::inSupWaitFreeThread, evf::FastMonState::inSupWaitFreeThreadCopying, createfilelist::int, evf::FastMonState::inThrottled, dqmiolumiharvest::j, LogDebug, eostools::ls(), evf::EvFDaqDirector::lumisectionDiscarded(), maxBufferedFiles_, SiStripPI::min, eostools::move(), mReader_, mWakeup_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, evf::EvFDaqDirector::numConcurrentLumis(), evf::EvFDaqDirector::parseFRDFileHeader(), castor_dqm_sourceclient_file_cfg::path, funct::pow(), quit_threads_, InputChunk::readComplete_, readingFilesCount_, evf::FastMonitoringService::reportLockWait(), InputChunk::reset(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, setExceptionState_, setMonStateSup(), edm::shutdown_flag, singleBufferMode_, evf::FastMonitoringService::startedLookingForFile(), startupCv_, startupLock_, edm_modernize_messagelogger::stat, mps_update::status, evf::FastMonitoringService::stoppedLookingForFile(), AlCaHLTBitMon_QueryRunRegistry::string, thread_quit_signal, tid_active_, mps_check::timeout, evf::EvFDaqDirector::unlockFULocal(), evf::EvFDaqDirector::updateFuLock(), useFileBroker_, workerJob_, workerPool_, and workerThreads_.

Referenced by checkNext().

781  {
782  bool stop = false;
783  unsigned int currentLumiSection = 0;
784 
785  {
786  std::unique_lock<std::mutex> lk(startupLock_);
787  startupCv_.notify_one();
788  }
789 
790  uint32_t ls = 0;
791  uint32_t monLS = 1;
792  uint32_t lockCount = 0;
793  uint64_t sumLockWaitTimeUs = 0.;
794 
795  while (!stop) {
796  //wait for at least one free thread and chunk
797  int counter = 0;
798 
799  while ((workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty() ||
801  //report state to monitoring
802  if (fms_) {
803  bool copy_active = false;
804  for (auto j : tid_active_)
805  if (j)
806  copy_active = true;
809  else if (freeChunks_.empty()) {
810  if (copy_active)
812  else
814  } else {
815  if (copy_active)
817  else
819  }
820  }
821  std::unique_lock<std::mutex> lkw(mWakeup_);
822  //sleep until woken up by condition or a timeout
823  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
824  counter++;
825  if (!(counter % 6000)) {
826  edm::LogWarning("FedRawDataInputSource")
827  << "No free chunks or threads. Worker pool empty:" << workerPool_.empty()
828  << ", free chunks empty:" << freeChunks_.empty() << ", number of files buffered:" << readingFilesCount_
829  << " / " << maxBufferedFiles_;
830  }
831  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
832  } else {
833  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
834  }
835  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
836  stop = true;
837  break;
838  }
839  }
840  //if this is reached, there are enough buffers and threads to proceed or processing is instructed to stop
841 
842  if (stop)
843  break;
844 
845  //look for a new file
846  std::string nextFile;
847  uint32_t fileSizeIndex;
848  int64_t fileSizeFromMetadata;
849 
850  if (fms_) {
853  }
854 
856  uint16_t rawHeaderSize = 0;
857  uint32_t lsFromRaw = 0;
858  int32_t serverEventsInNewFile = -1;
859  int rawFd = -1;
860 
861  int backoff_exp = 0;
862 
863  //entering loop which tries to grab new file from ramdisk
865  //check if hltd has signalled to throttle input
866  counter = 0;
867  while (daqDirector_->inputThrottled()) {
868  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed))
869  break;
870 
871  unsigned int nConcurrentLumis = daqDirector_->numConcurrentLumis();
872  unsigned int nOtherLumis = nConcurrentLumis > 0 ? nConcurrentLumis - 1 : 0;
873  unsigned int checkLumiStart = currentLumiSection > nOtherLumis ? currentLumiSection - nOtherLumis : 1;
874  bool hasDiscardedLumi = false;
875  for (unsigned int i = checkLumiStart; i <= currentLumiSection; i++) {
877  edm::LogWarning("FedRawDataInputSource") << "Source detected that the lumisection is discarded -: " << i;
878  hasDiscardedLumi = true;
879  break;
880  }
881  }
882  if (hasDiscardedLumi)
883  break;
884 
886 
887  if (!(counter % 50))
888  edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, reading files is paused...";
889  usleep(100000);
890  counter++;
891  }
892 
893  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
894  stop = true;
895  break;
896  }
897 
898  assert(rawFd == -1);
899  uint64_t thisLockWaitTimeUs = 0.;
901  if (fileListMode_) {
902  //return LS if LS not set, otherwise return file
903  status = getFile(ls, nextFile, fileSizeIndex, thisLockWaitTimeUs);
905  uint16_t rawDataType;
907  rawFd,
908  rawHeaderSize,
909  rawDataType,
910  lsFromRaw,
911  serverEventsInNewFile,
912  fileSizeFromMetadata,
913  false,
914  false,
915  false) != 0) {
916  //error
917  setExceptionState_ = true;
918  stop = true;
919  break;
920  }
921  if (!getLSFromFilename_)
922  ls = lsFromRaw;
923  }
924  } else if (!useFileBroker_)
926  ls, nextFile, fileSizeIndex, rawHeaderSize, thisLockWaitTimeUs, setExceptionState_);
927  else {
928  status = daqDirector_->getNextFromFileBroker(currentLumiSection,
929  ls,
930  nextFile,
931  rawFd,
932  rawHeaderSize,
933  serverEventsInNewFile,
934  fileSizeFromMetadata,
935  thisLockWaitTimeUs);
936  }
937 
939 
940  //cycle through all remaining LS even if no files get assigned
941  if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
943 
944  //monitoring of lock wait time
945  if (thisLockWaitTimeUs > 0.)
946  sumLockWaitTimeUs += thisLockWaitTimeUs;
947  lockCount++;
948  if (ls > monLS) {
949  monLS = ls;
950  if (lockCount)
951  if (fms_)
952  fms_->reportLockWait(monLS, sumLockWaitTimeUs, lockCount);
953  lockCount = 0;
954  sumLockWaitTimeUs = 0;
955  }
956 
957  //check again for any remaining index/EoLS files after EoR file is seen
960  usleep(100000);
961  //now all files should have appeared in ramdisk, check again if any raw files were left behind
963  ls, nextFile, fileSizeIndex, rawHeaderSize, thisLockWaitTimeUs, setExceptionState_);
964  if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
966  }
967 
969  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runEnded));
970  fileQueue_.push(std::move(inf));
971  stop = true;
972  break;
973  }
974 
975  //error from filelocking function
977  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
978  fileQueue_.push(std::move(inf));
979  stop = true;
980  break;
981  }
982  //queue new lumisection
983  if (getLSFromFilename_) {
984  if (ls > currentLumiSection) {
985  if (!useFileBroker_) {
986  //file locking
987  //setMonStateSup(inSupNewLumi);
988  currentLumiSection = ls;
989  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
990  fileQueue_.push(std::move(inf));
991  } else {
992  //new file service
993  if (currentLumiSection == 0 && !alwaysStartFromFirstLS_) {
995  //start transitions from LS specified by env, continue if not reached
996  if (ls < daqDirector_->getStartLumisectionFromEnv()) {
997  //skip file if from earlier LS than specified by env
998  if (rawFd != -1) {
999  close(rawFd);
1000  rawFd = -1;
1001  }
1003  continue;
1004  } else {
1005  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, ls));
1006  fileQueue_.push(std::move(inf));
1007  }
1008  } else if (ls < 100) {
1009  //look at last LS file on disk to start from that lumisection (only within first 100 LS)
1010  unsigned int lsToStart = daqDirector_->getLumisectionToStart();
1011 
1012  for (unsigned int nextLS = std::min(lsToStart, ls); nextLS <= ls; nextLS++) {
1013  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
1014  fileQueue_.push(std::move(inf));
1015  }
1016  } else {
1017  //start from current LS
1018  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, ls));
1019  fileQueue_.push(std::move(inf));
1020  }
1021  } else {
1022  //queue all lumisections after last one seen to avoid gaps
1023  for (unsigned int nextLS = currentLumiSection + 1; nextLS <= ls; nextLS++) {
1024  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
1025  fileQueue_.push(std::move(inf));
1026  }
1027  }
1028  currentLumiSection = ls;
1029  }
1030  }
1031  //else
1032  if (currentLumiSection > 0 && ls < currentLumiSection) {
1033  edm::LogError("FedRawDataInputSource")
1034  << "Got old LS (" << ls << ") file from EvFDAQDirector! Expected LS:" << currentLumiSection
1035  << ". Aborting execution." << std::endl;
1036  if (rawFd != -1)
1037  close(rawFd);
1038  rawFd = -1;
1039  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
1040  fileQueue_.push(std::move(inf));
1041  stop = true;
1042  break;
1043  }
1044  }
1045 
1046  int dbgcount = 0;
1049  dbgcount++;
1050  if (!(dbgcount % 20))
1051  LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
1052  if (!useFileBroker_)
1053  usleep(100000);
1054  else {
1055  backoff_exp = std::min(4, backoff_exp); // max 1.6 seconds
1056  //backoff_exp=0; // disabled!
1057  int sleeptime = (int)(100000. * pow(2, backoff_exp));
1058  usleep(sleeptime);
1059  backoff_exp++;
1060  }
1061  } else
1062  backoff_exp = 0;
1063  }
1064  //end of file grab loop, parse result
1067  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
1068 
1069  std::string rawFile;
1070  //file service will report raw extension
1071  if (useFileBroker_ || rawHeaderSize)
1072  rawFile = nextFile;
1073  else {
1074  std::filesystem::path rawFilePath(nextFile);
1075  rawFile = rawFilePath.replace_extension(".raw").string();
1076  }
1077 
1078  struct stat st;
1079  int stat_res = stat(rawFile.c_str(), &st);
1080  if (stat_res == -1) {
1081  edm::LogError("FedRawDataInputSource") << "Can not stat file (" << errno << "):-" << rawFile << std::endl;
1082  setExceptionState_ = true;
1083  break;
1084  }
1085  uint64_t fileSize = st.st_size;
1086 
1087  if (fms_) {
1091  }
1092  int eventsInNewFile;
1093  if (fileListMode_) {
1094  if (fileSize == 0)
1095  eventsInNewFile = 0;
1096  else
1097  eventsInNewFile = -1;
1098  } else {
1100  if (!useFileBroker_) {
1101  if (rawHeaderSize) {
1102  int rawFdEmpty = -1;
1103  uint16_t rawHeaderCheck;
1104  bool fileFound;
1105  eventsInNewFile = daqDirector_->grabNextJsonFromRaw(
1106  nextFile, rawFdEmpty, rawHeaderCheck, fileSizeFromMetadata, fileFound, 0, true);
1107  assert(fileFound && rawHeaderCheck == rawHeaderSize);
1109  } else
1110  eventsInNewFile = daqDirector_->grabNextJsonFileAndUnlock(nextFile);
1111  } else
1112  eventsInNewFile = serverEventsInNewFile;
1113  assert(eventsInNewFile >= 0);
1114  assert((eventsInNewFile > 0) ==
1115  (fileSize > rawHeaderSize)); //file without events must be empty or contain only header
1116  }
1117 
1118  if (!singleBufferMode_) {
1119  //calculate number of needed chunks
1120  unsigned int neededChunks = fileSize / eventChunkSize_;
1121  if (fileSize % eventChunkSize_)
1122  neededChunks++;
1123 
1124  std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1125  ls,
1126  rawFile,
1127  !fileListMode_,
1128  rawFd,
1129  fileSize,
1130  rawHeaderSize,
1131  neededChunks,
1132  eventsInNewFile,
1133  this));
1135  auto newInputFilePtr = newInputFile.get();
1136  fileQueue_.push(std::move(newInputFile));
1137 
1138  for (unsigned int i = 0; i < neededChunks; i++) {
1139  if (fms_) {
1140  bool copy_active = false;
1141  for (auto j : tid_active_)
1142  if (j)
1143  copy_active = true;
1144  if (copy_active)
1146  else
1148  }
1149  //get thread
1150  unsigned int newTid = 0xffffffff;
1151  while (!workerPool_.try_pop(newTid)) {
1152  usleep(100000);
1153  if (quit_threads_.load(std::memory_order_relaxed)) {
1154  stop = true;
1155  break;
1156  }
1157  }
1158 
1159  if (fms_) {
1160  bool copy_active = false;
1161  for (auto j : tid_active_)
1162  if (j)
1163  copy_active = true;
1164  if (copy_active)
1166  else
1168  }
1169  InputChunk* newChunk = nullptr;
1170  while (!freeChunks_.try_pop(newChunk)) {
1171  usleep(100000);
1172  if (quit_threads_.load(std::memory_order_relaxed)) {
1173  stop = true;
1174  break;
1175  }
1176  }
1177 
1178  if (newChunk == nullptr) {
1179  //return unused tid if we received shutdown (nullptr chunk)
1180  if (newTid != 0xffffffff)
1181  workerPool_.push(newTid);
1182  stop = true;
1183  break;
1184  }
1185  if (stop)
1186  break;
1188 
1189  std::unique_lock<std::mutex> lk(mReader_);
1190 
1191  unsigned int toRead = eventChunkSize_;
1192  if (i == neededChunks - 1 && fileSize % eventChunkSize_)
1193  toRead = fileSize % eventChunkSize_;
1194  newChunk->reset(i * eventChunkSize_, toRead, i);
1195 
1196  workerJob_[newTid].first = newInputFilePtr;
1197  workerJob_[newTid].second = newChunk;
1198 
1199  //wake up the worker thread
1200  cvReader_[newTid]->notify_one();
1201  }
1202  } else {
1203  if (!eventsInNewFile) {
1204  if (rawFd) {
1205  close(rawFd);
1206  rawFd = -1;
1207  }
1208  //still queue file for lumi update
1209  std::unique_lock<std::mutex> lkw(mWakeup_);
1210  //TODO: also file with only file header fits in this edge case. Check if read correctly in single buffer mode
1211  std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1212  ls,
1213  rawFile,
1214  !fileListMode_,
1215  rawFd,
1216  fileSize,
1217  rawHeaderSize,
1218  (rawHeaderSize > 0),
1219  0,
1220  this));
1222  fileQueue_.push(std::move(newInputFile));
1223  cvWakeup_.notify_one();
1224  break;
1225  }
1226  //in single-buffer mode put single chunk in the file and let the main thread read the file
1227  InputChunk* newChunk = nullptr;
1228  //should be available immediately
1229  while (!freeChunks_.try_pop(newChunk)) {
1230  usleep(100000);
1231  if (quit_threads_.load(std::memory_order_relaxed)) {
1232  stop = true;
1233  break;
1234  }
1235  }
1236 
1237  if (newChunk == nullptr) {
1238  stop = true;
1239  }
1240 
1241  if (stop)
1242  break;
1243 
1244  std::unique_lock<std::mutex> lkw(mWakeup_);
1245 
1246  unsigned int toRead = eventChunkSize_;
1247  if (fileSize % eventChunkSize_)
1248  toRead = fileSize % eventChunkSize_;
1249  newChunk->reset(0, toRead, 0);
1250  newChunk->readComplete_ = true;
1251 
1252  //push file and wakeup main thread
1253  std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1254  ls,
1255  rawFile,
1256  !fileListMode_,
1257  rawFd,
1258  fileSize,
1259  rawHeaderSize,
1260  1,
1261  eventsInNewFile,
1262  this));
1263  newInputFile->chunks_[0] = newChunk;
1265  fileQueue_.push(std::move(newInputFile));
1266  cvWakeup_.notify_one();
1267  }
1268  }
1269  }
1271  //make sure threads finish reading
1272  unsigned numFinishedThreads = 0;
1273  while (numFinishedThreads < workerThreads_.size()) {
1274  unsigned tid = 0;
1275  while (!workerPool_.try_pop(tid)) {
1276  usleep(10000);
1277  }
1278  std::unique_lock<std::mutex> lk(mReader_);
1279  thread_quit_signal[tid] = true;
1280  cvReader_[tid]->notify_one();
1281  numFinishedThreads++;
1282  }
1283  for (unsigned int i = 0; i < workerThreads_.size(); i++) {
1284  workerThreads_[i]->join();
1285  delete workerThreads_[i];
1286  }
1287 }
std::condition_variable cvWakeup_
tbb::concurrent_queue< unsigned int > workerPool_
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
volatile std::atomic< bool > shutdown_flag
tbb::concurrent_queue< std::unique_ptr< InputFile > > fileQueue_
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
bool lumisectionDiscarded(unsigned int ls)
Log< level::Error, false > LogError
unsigned int numConcurrentLumis() const
int timeout
Definition: mps_check.py:53
assert(be >=bs)
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint16_t &rawDataType, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs, bool requireHeader=true)
void reset(uint64_t newOffset, uint64_t toRead, unsigned int fileIndex)
void setMonStateSup(evf::FastMonState::InputState state)
int grabNextJsonFileAndUnlock(std::filesystem::path const &jsonSourcePath)
std::vector< std::thread * > workerThreads_
std::atomic< bool > readComplete_
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, uint64_t &lockWaitTime, bool &setExceptionState)
def ls(path, rec=False)
Definition: eostools.py:349
unsigned int getStartLumisectionFromEnv() const
unsigned long long uint64_t
Definition: Time.h:13
std::condition_variable startupCv_
void stoppedLookingForFile(unsigned int lumi)
evf::EvFDaqDirector * daqDirector_
std::vector< unsigned int > thread_quit_signal
unsigned int getLumisectionToStart() const
std::atomic< unsigned int > readingFilesCount_
std::vector< std::unique_ptr< std::condition_variable > > cvReader_
evf::FastMonitoringService * fms_
std::vector< unsigned int > tid_active_
tbb::concurrent_queue< InputChunk * > freeChunks_
Log< level::Warning, false > LogWarning
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile, bool requireHeader=true)
Power< A, B >::type pow(const A &a, const B &b)
Definition: Power.h:29
def move(src, dest)
Definition: eostools.py:511
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
#define LogDebug(id)

◆ readWorker()

void FedRawDataInputSource::readWorker ( unsigned int  tid)
private

Definition at line 1289 of file FedRawDataInputSource.cc.

References cms::cuda::assert(), InputChunk::buf_, cvReader_, cvWakeup_, detectedFRDversion_, change_name::diff, mps_fire::end, eventChunkBlock_, geometryDiff::file, InputChunk::fileIndex_, dqmdumpme::first, FRDHeaderMaxVersion, mps_fire::i, ALPAKA_ACCELERATOR_NAMESPACE::caPixelDoublets::if(), dqmdumpme::last, LogDebug, SiStripPI::min, mReader_, submitPVValidationJobs::now, numConcurrentReads_, InputChunk::offset_, read(), readBlocks_, InputChunk::readComplete_, edm::second(), setExceptionState_, runEdmFileComparison::skipped, command_line::start, startupCv_, startupLock_, thread_quit_signal, tid_active_, InputChunk::usedSize_, workerJob_, and workerPool_.

Referenced by FedRawDataInputSource().

1289  {
1290  bool init = true;
1291 
1292  while (true) {
1293  tid_active_[tid] = false;
1294  std::unique_lock<std::mutex> lk(mReader_);
1295  workerJob_[tid].first = nullptr;
1296  workerJob_[tid].first = nullptr;
1297 
1298  assert(!thread_quit_signal[tid]); //should never get it here
1299  workerPool_.push(tid);
1300 
1301  if (init) {
1302  std::unique_lock<std::mutex> lks(startupLock_);
1303  init = false;
1304  startupCv_.notify_one();
1305  }
1306  cvWakeup_.notify_all();
1307  cvReader_[tid]->wait(lk);
1308  lk.unlock();
1309 
1310  if (thread_quit_signal[tid])
1311  return;
1312  tid_active_[tid] = true;
1313 
1314  InputFile* file;
1315  InputChunk* chunk;
1316 
1317  assert(workerJob_[tid].first != nullptr && workerJob_[tid].second != nullptr);
1318 
1319  file = workerJob_[tid].first;
1320  chunk = workerJob_[tid].second;
1321 
1322  //skip reading initial header size in first chunk if inheriting file descriptor (already set at appropriate position)
1323  unsigned int bufferLeft = (chunk->offset_ == 0 && file->rawFd_ != -1) ? file->rawHeaderSize_ : 0;
1324 
1325  //if only one worker thread exists, use single fd for all operations
1326  //if more worker threads exist, use rawFd_ for only the first read operation and then close file
1327  int fileDescriptor;
1328  bool fileOpenedHere = false;
1329 
1330  if (numConcurrentReads_ == 1) {
1331  fileDescriptor = file->rawFd_;
1332  if (fileDescriptor == -1) {
1333  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1334  fileOpenedHere = true;
1335  file->rawFd_ = fileDescriptor;
1336  }
1337  } else {
1338  if (chunk->offset_ == 0) {
1339  fileDescriptor = file->rawFd_;
1340  file->rawFd_ = -1;
1341  if (fileDescriptor == -1) {
1342  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1343  fileOpenedHere = true;
1344  }
1345  } else {
1346  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1347  fileOpenedHere = true;
1348  }
1349  }
1350 
1351  if (fileDescriptor < 0) {
1352  edm::LogError("FedRawDataInputSource") << "readWorker failed to open file -: " << file->fileName_
1353  << " fd:" << fileDescriptor << " error: " << strerror(errno);
1354  setExceptionState_ = true;
1355  continue;
1356  }
1357  if (fileOpenedHere) { //fast forward to this chunk position
1358  off_t pos = 0;
1359  pos = lseek(fileDescriptor, chunk->offset_, SEEK_SET);
1360  if (pos == -1) {
1361  edm::LogError("FedRawDataInputSource")
1362  << "readWorker failed to seek file -: " << file->fileName_ << " fd:" << fileDescriptor << " to offset "
1363  << chunk->offset_ << " error: " << strerror(errno);
1364  setExceptionState_ = true;
1365  continue;
1366  }
1367  }
1368 
1369  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_
1370  << " at offset " << lseek(fileDescriptor, 0, SEEK_CUR);
1371 
1372  unsigned int skipped = bufferLeft;
1374  for (unsigned int i = 0; i < readBlocks_; i++) {
1375  ssize_t last;
1376 
1377  //protect against reading into next block
1378  last = ::read(fileDescriptor,
1379  (void*)(chunk->buf_ + bufferLeft),
1380  std::min(chunk->usedSize_ - bufferLeft, (uint64_t)eventChunkBlock_));
1381 
1382  if (last < 0) {
1383  edm::LogError("FedRawDataInputSource") << "readWorker failed to read file -: " << file->fileName_
1384  << " fd:" << fileDescriptor << " error: " << strerror(errno);
1385  setExceptionState_ = true;
1386  break;
1387  }
1388  if (last > 0)
1389  bufferLeft += last;
1390  if (last < eventChunkBlock_) { //last read
1391  //check if this is last block, then total read size must match file size
1392  if (!(chunk->usedSize_ - skipped == i * eventChunkBlock_ + (unsigned int)last)) {
1393  edm::LogError("FedRawDataInputSource")
1394  << "readWorker failed to read file -: " << file->fileName_ << " fd:" << fileDescriptor << " last:" << last
1395  << " expectedChunkSize:" << chunk->usedSize_
1396  << " readChunkSize:" << (skipped + i * eventChunkBlock_ + last) << " skipped:" << skipped
1397  << " block:" << (i + 1) << "/" << readBlocks_ << " error: " << strerror(errno);
1398  setExceptionState_ = true;
1399  }
1400  break;
1401  }
1402  }
1403  if (setExceptionState_)
1404  continue;
1405 
1407  auto diff = end - start;
1408  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1409  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1410  << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1411  << " GB/s)";
1412 
1413  if (chunk->offset_ + bufferLeft == file->fileSize_) { //file reading finished using same fd
1414  close(fileDescriptor);
1415  fileDescriptor = -1;
1416  if (numConcurrentReads_ == 1)
1417  file->rawFd_ = -1;
1418  }
1419  if (numConcurrentReads_ > 1 && fileDescriptor != -1)
1420  close(fileDescriptor);
1421 
1422  //detect FRD event version. Skip file Header if it exists
1423  if (detectedFRDversion_ == 0 && chunk->offset_ == 0) {
1424  detectedFRDversion_ = *((uint16_t*)(chunk->buf_ + file->rawHeaderSize_));
1425  }
1427  chunk->readComplete_ =
1428  true; //this is atomic to secure the sequential buffer fill before becoming available for processing)
1429  file->chunks_[chunk->fileIndex_] = chunk; //put the completed chunk in the file chunk vector at predetermined index
1430  }
1431 }
Definition: start.py:1
std::condition_variable cvWakeup_
void read(edm::EventPrincipal &eventPrincipal) override
constexpr size_t FRDHeaderMaxVersion
tbb::concurrent_queue< unsigned int > workerPool_
std::vector< ReaderInfo > workerJob_
Log< level::Error, false > LogError
assert(be >=bs)
U second(std::pair< T, U > const &p)
unsigned char * buf_
unsigned int fileIndex_
std::atomic< bool > readComplete_
Definition: init.py:1
unsigned long long uint64_t
Definition: Time.h:13
std::condition_variable startupCv_
std::vector< unsigned int > thread_quit_signal
std::vector< std::unique_ptr< std::condition_variable > > cvReader_
std::vector< unsigned int > tid_active_
if(threadIdxLocalY==0 &&threadIdxLocalX==0)
#define LogDebug(id)

◆ reportEventsThisLumiInSource()

void FedRawDataInputSource::reportEventsThisLumiInSource ( unsigned int  lumi,
unsigned int  events 
)
private

Definition at line 1601 of file FedRawDataInputSource.cc.

References events, CommonMethods::lock(), monlock_, and sourceEventsReport_.

Referenced by checkNext(), and getNextEvent().

1601  {
1602  std::lock_guard<std::mutex> lock(monlock_);
1603  auto itr = sourceEventsReport_.find(lumi);
1604  if (itr != sourceEventsReport_.end())
1605  itr->second += events;
1606  else
1608 }
std::map< unsigned int, unsigned int > sourceEventsReport_
int events

◆ rewind_()

void FedRawDataInputSource::rewind_ ( )
overrideprivatevirtual

Reimplemented from edm::InputSource.

Definition at line 779 of file FedRawDataInputSource.cc.

779 {}

◆ setMonState()

void FedRawDataInputSource::setMonState ( evf::FastMonState::InputState  state)
inlineprotected

Definition at line 1438 of file FedRawDataInputSource.cc.

References fms_, evf::FastMonitoringService::setInState(), and edm::InputSource::state().

Referenced by InputFile::advance(), checkNext(), getNextEvent(), and read().

1438  {
1439  if (fms_)
1440  fms_->setInState(state);
1441 }
ItemTypeInfo state() const
Definition: InputSource.h:361
evf::FastMonitoringService * fms_
void setInState(FastMonState::InputState inputState)

◆ setMonStateSup()

void FedRawDataInputSource::setMonStateSup ( evf::FastMonState::InputState  state)
inlineprotected

Definition at line 1443 of file FedRawDataInputSource.cc.

References fms_, evf::FastMonitoringService::setInStateSup(), and edm::InputSource::state().

Referenced by readSupervisor().

1443  {
1444  if (fms_)
1446 }
void setInStateSup(FastMonState::InputState inputState)
ItemTypeInfo state() const
Definition: InputSource.h:361
evf::FastMonitoringService * fms_

◆ threadError()

void FedRawDataInputSource::threadError ( )
private

Definition at line 1433 of file FedRawDataInputSource.cc.

References Exception, and quit_threads_.

Referenced by InputFile::advance(), and getNextEvent().

1433  {
1434  quit_threads_ = true;
1435  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1436 }
std::atomic< bool > quit_threads_

Friends And Related Function Documentation

◆ InputChunk

friend struct InputChunk
friend

Definition at line 44 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

◆ InputFile

friend class InputFile
friend

Definition at line 43 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

Member Data Documentation

◆ alwaysStartFromFirstLS_

const bool FedRawDataInputSource::alwaysStartFromFirstLS_
private

Definition at line 100 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

◆ bufferInputRead_

uint32_t FedRawDataInputSource::bufferInputRead_ = 0
private

Definition at line 180 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readNextChunkIntoBuffer().

◆ checkEvery_

unsigned int FedRawDataInputSource::checkEvery_ = 10
private

Definition at line 171 of file FedRawDataInputSource.h.

Referenced by read().

◆ chunkIsFree_

bool FedRawDataInputSource::chunkIsFree_ = false
private

Definition at line 144 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

◆ currentFile_

std::unique_ptr<InputFile> FedRawDataInputSource::currentFile_
private

Definition at line 143 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), read(), and ~FedRawDataInputSource().

◆ currentFileIndex_

int FedRawDataInputSource::currentFileIndex_ = -1
private

Definition at line 166 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

◆ currentLumiSection_

unsigned int FedRawDataInputSource::currentLumiSection_
private

Definition at line 123 of file FedRawDataInputSource.h.

Referenced by checkNext(), getNextEvent(), maybeOpenNewLumiSection(), and read().

◆ cvReader_

std::vector<std::unique_ptr<std::condition_variable> > FedRawDataInputSource::cvReader_
private

◆ cvWakeup_

std::condition_variable FedRawDataInputSource::cvWakeup_
private

Definition at line 175 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), readSupervisor(), and readWorker().

◆ daqDirector_

evf::EvFDaqDirector* FedRawDataInputSource::daqDirector_ = nullptr
private

◆ daqProvenanceHelper_

const edm::DaqProvenanceHelper FedRawDataInputSource::daqProvenanceHelper_
private

Definition at line 116 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and read().

◆ defPath_

std::string FedRawDataInputSource::defPath_
private

Definition at line 88 of file FedRawDataInputSource.h.

◆ detectedFRDversion_

uint16_t FedRawDataInputSource::detectedFRDversion_ = 0
private

Definition at line 142 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readWorker().

◆ event_

std::unique_ptr<FRDEventMsgView> FedRawDataInputSource::event_
private

Definition at line 118 of file FedRawDataInputSource.h.

Referenced by checkNext(), fillFEDRawDataCollection(), getNextEvent(), and read().

◆ eventChunkBlock_

unsigned int FedRawDataInputSource::eventChunkBlock_
private

◆ eventChunkSize_

unsigned int FedRawDataInputSource::eventChunkSize_
private

◆ eventID_

edm::EventID FedRawDataInputSource::eventID_
private

Definition at line 120 of file FedRawDataInputSource.h.

Referenced by read().

◆ eventRunNumber_

uint32_t FedRawDataInputSource::eventRunNumber_ = 0
private

Definition at line 124 of file FedRawDataInputSource.h.

Referenced by checkNext(), and read().

◆ eventsThisLumi_

unsigned int FedRawDataInputSource::eventsThisLumi_
private

Definition at line 128 of file FedRawDataInputSource.h.

Referenced by checkNext(), getNextEvent(), and read().

◆ eventsThisRun_

unsigned long FedRawDataInputSource::eventsThisRun_ = 0
private

Definition at line 129 of file FedRawDataInputSource.h.

◆ fileDeleteLock_

std::mutex FedRawDataInputSource::fileDeleteLock_
private

Definition at line 169 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and read().

◆ fileDescriptor_

int FedRawDataInputSource::fileDescriptor_ = -1
private

Definition at line 179 of file FedRawDataInputSource.h.

Referenced by readNextChunkIntoBuffer().

◆ fileListIndex_

unsigned int FedRawDataInputSource::fileListIndex_ = 0
private

Definition at line 109 of file FedRawDataInputSource.h.

Referenced by getFile().

◆ fileListLoopMode_

const bool FedRawDataInputSource::fileListLoopMode_
private

Definition at line 110 of file FedRawDataInputSource.h.

Referenced by checkNext(), FedRawDataInputSource(), getFile(), and read().

◆ fileListMode_

const bool FedRawDataInputSource::fileListMode_
private

◆ fileNames_

std::vector<std::string> FedRawDataInputSource::fileNames_
private

Definition at line 104 of file FedRawDataInputSource.h.

Referenced by getFile(), and initFileList().

◆ fileNamesToDelete_

std::list<std::pair<int, std::string> > FedRawDataInputSource::fileNamesToDelete_
private

Definition at line 168 of file FedRawDataInputSource.h.

◆ fileQueue_

tbb::concurrent_queue<std::unique_ptr<InputFile> > FedRawDataInputSource::fileQueue_
private

Definition at line 154 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

◆ filesToDelete_

std::list<std::pair<int, std::unique_ptr<InputFile> > > FedRawDataInputSource::filesToDelete_
private

◆ fms_

evf::FastMonitoringService* FedRawDataInputSource::fms_ = nullptr
private

◆ freeChunks_

tbb::concurrent_queue<InputChunk*> FedRawDataInputSource::freeChunks_
private

Definition at line 153 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), read(), and readSupervisor().

◆ fuOutputDir_

std::string FedRawDataInputSource::fuOutputDir_
private

Definition at line 114 of file FedRawDataInputSource.h.

◆ getLSFromFilename_

const bool FedRawDataInputSource::getLSFromFilename_
private

Definition at line 99 of file FedRawDataInputSource.h.

Referenced by checkNext(), getNextEvent(), and readSupervisor().

◆ GTPEventID_

uint32_t FedRawDataInputSource::GTPEventID_ = 0
private

Definition at line 125 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

◆ L1EventID_

uint32_t FedRawDataInputSource::L1EventID_ = 0
private

Definition at line 126 of file FedRawDataInputSource.h.

Referenced by checkNext(), and read().

◆ loopModeIterationInc_

unsigned int FedRawDataInputSource::loopModeIterationInc_ = 0
private

Definition at line 111 of file FedRawDataInputSource.h.

Referenced by getFile().

◆ maxBufferedFiles_

unsigned int FedRawDataInputSource::maxBufferedFiles_
private

Definition at line 94 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

◆ MAXTCDSuTCAFEDID_

uint16_t FedRawDataInputSource::MAXTCDSuTCAFEDID_ = FEDNumbering::MAXTCDSuTCAFEDID
private

Definition at line 132 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and fillFEDRawDataCollection().

◆ MINTCDSuTCAFEDID_

uint16_t FedRawDataInputSource::MINTCDSuTCAFEDID_ = FEDNumbering::MINTCDSuTCAFEDID
private

Definition at line 131 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and fillFEDRawDataCollection().

◆ monlock_

std::mutex FedRawDataInputSource::monlock_
private

Definition at line 185 of file FedRawDataInputSource.h.

Referenced by getEventReport(), and reportEventsThisLumiInSource().

◆ mReader_

std::mutex FedRawDataInputSource::mReader_
private

Definition at line 156 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), readWorker(), and ~FedRawDataInputSource().

◆ mWakeup_

std::mutex FedRawDataInputSource::mWakeup_
private

Definition at line 174 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

◆ numBuffers_

unsigned int FedRawDataInputSource::numBuffers_
private

Definition at line 93 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

◆ numConcurrentReads_

unsigned int FedRawDataInputSource::numConcurrentReads_
private

Definition at line 95 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and readWorker().

◆ processHistoryID_

edm::ProcessHistoryID FedRawDataInputSource::processHistoryID_
private

Definition at line 121 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), maybeOpenNewLumiSection(), and read().

◆ quit_threads_

std::atomic<bool> FedRawDataInputSource::quit_threads_
private

◆ readBlocks_

unsigned int FedRawDataInputSource::readBlocks_
private

◆ readingFilesCount_

std::atomic<unsigned int> FedRawDataInputSource::readingFilesCount_
private

Definition at line 96 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

◆ readSupervisorThread_

std::unique_ptr<std::thread> FedRawDataInputSource::readSupervisorThread_
private

Definition at line 147 of file FedRawDataInputSource.h.

Referenced by checkNext(), and ~FedRawDataInputSource().

◆ runNumber_

edm::RunNumber_t FedRawDataInputSource::runNumber_
private

Definition at line 113 of file FedRawDataInputSource.h.

Referenced by checkNext(), and FedRawDataInputSource().

◆ setExceptionState_

bool FedRawDataInputSource::setExceptionState_ = false
private

Definition at line 162 of file FedRawDataInputSource.h.

Referenced by exceptionState(), getNextEvent(), readSupervisor(), and readWorker().

◆ singleBufferMode_

bool FedRawDataInputSource::singleBufferMode_
private

Definition at line 178 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

◆ sourceEventsReport_

std::map<unsigned int, unsigned int> FedRawDataInputSource::sourceEventsReport_
private

Definition at line 184 of file FedRawDataInputSource.h.

Referenced by getEventReport(), and reportEventsThisLumiInSource().

◆ startedSupervisorThread_

bool FedRawDataInputSource::startedSupervisorThread_ = false
private

Definition at line 146 of file FedRawDataInputSource.h.

Referenced by checkNext(), and ~FedRawDataInputSource().

◆ startupCv_

std::condition_variable FedRawDataInputSource::startupCv_
private

◆ startupLock_

std::mutex FedRawDataInputSource::startupLock_
private

◆ streamFileTracker_

std::vector<int> FedRawDataInputSource::streamFileTracker_
private

Definition at line 170 of file FedRawDataInputSource.h.

Referenced by read().

◆ tcds_pointer_

unsigned char* FedRawDataInputSource::tcds_pointer_
private

Definition at line 127 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

◆ testTCDSFEDRange_

const std::vector<unsigned int> FedRawDataInputSource::testTCDSFEDRange_
private

Definition at line 103 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

◆ thread_quit_signal

std::vector<unsigned int> FedRawDataInputSource::thread_quit_signal
private

◆ threadInit_

std::atomic<bool> FedRawDataInputSource::threadInit_
private

Definition at line 182 of file FedRawDataInputSource.h.

◆ tid_active_

std::vector<unsigned int> FedRawDataInputSource::tid_active_
private

Definition at line 158 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), readSupervisor(), and readWorker().

◆ useFileBroker_

bool FedRawDataInputSource::useFileBroker_
private

◆ useL1EventID_

const bool FedRawDataInputSource::useL1EventID_
private

Definition at line 102 of file FedRawDataInputSource.h.

Referenced by read().

◆ verifyChecksum_

const bool FedRawDataInputSource::verifyChecksum_
private

Definition at line 101 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

◆ workerJob_

std::vector<ReaderInfo> FedRawDataInputSource::workerJob_
private

Definition at line 151 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), readSupervisor(), and readWorker().

◆ workerPool_

tbb::concurrent_queue<unsigned int> FedRawDataInputSource::workerPool_
private

Definition at line 150 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), and readWorker().

◆ workerThreads_

std::vector<std::thread*> FedRawDataInputSource::workerThreads_
private