CMS 3D CMS Logo

List of all members | Public Member Functions | Static Public Member Functions | Protected Member Functions | Private Types | Private Member Functions | Private Attributes | Friends
FedRawDataInputSource Class Reference

#include <FedRawDataInputSource.h>

Inheritance diagram for FedRawDataInputSource:
edm::RawInputSource edm::InputSource

Public Member Functions

 FedRawDataInputSource (edm::ParameterSet const &, edm::InputSourceDescription const &)
 
std::pair< bool, unsigned int > getEventReport (unsigned int lumi, bool erase)
 
 ~FedRawDataInputSource () override
 
- Public Member Functions inherited from edm::RawInputSource
 RawInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
 
 ~RawInputSource () override
 
- Public Member Functions inherited from edm::InputSource
std::shared_ptr< ActivityRegistryactReg () const
 Accessor for Activity Registry. More...
 
std::shared_ptr< BranchIDListHelper const > branchIDListHelper () const
 Accessors for branchIDListHelper. More...
 
std::shared_ptr< BranchIDListHelper > & branchIDListHelper ()
 
void closeFile (FileBlock *, bool cleaningUpAfterException)
 close current file More...
 
void doBeginJob ()
 Called by framework at beginning of job. More...
 
virtual void doBeginLumi (LuminosityBlockPrincipal &lbp, ProcessContext const *)
 Called by framework at beginning of lumi block. More...
 
virtual void doBeginRun (RunPrincipal &rp, ProcessContext const *)
 Called by framework at beginning of run. More...
 
void doEndJob ()
 Called by framework at end of job. More...
 
void fillProcessBlockHelper ()
 Fill the ProcessBlockHelper with info for the current file. More...
 
ProcessingController::ForwardState forwardState () const
 
bool goToEvent (EventID const &eventID)
 
 InputSource (ParameterSet const &, InputSourceDescription const &)
 Constructor. More...
 
 InputSource (InputSource const &)=delete
 
void issueReports (EventID const &eventID, StreamID streamID)
 issue an event report More...
 
LuminosityBlockNumber_t luminosityBlock () const
 Accessor for current luminosity block number. More...
 
std::shared_ptr< LuminosityBlockAuxiliaryluminosityBlockAuxiliary () const
 Called by the framework to merge or insert lumi in principal cache. More...
 
int maxEvents () const
 
int maxLuminosityBlocks () const
 
ModuleDescription const & moduleDescription () const
 Accessor for 'module' description. More...
 
ItemTypeInfo nextItemType ()
 Advances the source to the next item. More...
 
bool nextProcessBlock (ProcessBlockPrincipal &)
 Next process block, return false if there is none, sets the processName in the principal. More...
 
InputSourceoperator= (InputSource const &)=delete
 
std::shared_ptr< ProcessBlockHelper const > processBlockHelper () const
 Accessors for processBlockHelper. More...
 
std::shared_ptr< ProcessBlockHelper > & processBlockHelper ()
 
ProcessConfiguration const & processConfiguration () const
 Accessor for Process Configuration. More...
 
std::string const & processGUID () const
 Accessor for global process identifier. More...
 
ProcessHistoryRegistry const & processHistoryRegistry () const
 Accessors for process history registry. More...
 
ProcessHistoryRegistryprocessHistoryRegistry ()
 
ProcessingMode processingMode () const
 RunsLumisAndEvents (default), RunsAndLumis, or Runs. More...
 
std::shared_ptr< ProductRegistry const > productRegistry () const
 Accessors for product registry. More...
 
bool randomAccess () const
 
void readAndMergeLumi (LuminosityBlockPrincipal &lbp)
 Read next luminosity block (same as a prior lumi) More...
 
void readAndMergeRun (RunPrincipal &rp)
 Read next run (same as a prior run) More...
 
void readEvent (EventPrincipal &ep, StreamContext &)
 Read next event. More...
 
bool readEvent (EventPrincipal &ep, EventID const &, StreamContext &)
 Read a specific event. More...
 
std::shared_ptr< FileBlockreadFile ()
 Read next file. More...
 
void readLuminosityBlock (LuminosityBlockPrincipal &lumiPrincipal, HistoryAppender &historyAppender)
 Read next luminosity block (new lumi) More...
 
std::shared_ptr< LuminosityBlockAuxiliaryreadLuminosityBlockAuxiliary ()
 Read next luminosity block Auxilary. More...
 
void readProcessBlock (ProcessBlockPrincipal &)
 Read next process block. More...
 
void readRun (RunPrincipal &runPrincipal, HistoryAppender &historyAppender)
 Read next run (new run) More...
 
std::shared_ptr< RunAuxiliaryreadRunAuxiliary ()
 Read next run Auxiliary. More...
 
ProcessHistoryID const & reducedProcessHistoryID () const
 
virtual void registerProducts ()
 Register any produced products. More...
 
int remainingEvents () const
 
int remainingLuminosityBlocks () const
 
void repeat ()
 Reset the remaining number of events/lumis to the maximum number. More...
 
std::pair< SharedResourcesAcquirer *, std::recursive_mutex * > resourceSharedWithDelayedReader ()
 Returns nullptr if no resource shared between the Source and a DelayedReader. More...
 
ProcessingController::ReverseState reverseState () const
 
void rewind ()
 Begin again at the first event. More...
 
RunNumber_t run () const
 Accessor for current run number. More...
 
std::shared_ptr< RunAuxiliaryrunAuxiliary () const
 Called by the framework to merge or insert run in principal cache. More...
 
void setLuminosityBlockNumber_t (LuminosityBlockNumber_t lb)
 Set the luminosity block ID. More...
 
void setRunNumber (RunNumber_t r)
 Set the run number. More...
 
void skipEvents (int offset)
 
void switchTo (std::shared_ptr< ProductRegistry > iOther)
 switch to a different ProductRegistry. More...
 
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper () const
 Accessors for thinnedAssociationsHelper. More...
 
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper ()
 
Timestamp const & timestamp () const
 Accessor for the current time, as seen by the input source. More...
 
virtual ~InputSource () noexcept(false)
 Destructor. More...
 

Static Public Member Functions

static void fillDescriptions (edm::ConfigurationDescriptions &descriptions)
 
- Static Public Member Functions inherited from edm::RawInputSource
static void fillDescription (ParameterSetDescription &description)
 
- Static Public Member Functions inherited from edm::InputSource
static const std::string & baseType ()
 
static void fillDescription (ParameterSetDescription &desc)
 
static void fillDescriptions (ConfigurationDescriptions &descriptions)
 
static void prevalidate (ConfigurationDescriptions &)
 

Protected Member Functions

Next checkNext () override
 
void read (edm::EventPrincipal &eventPrincipal) override
 
void setMonState (evf::FastMonState::InputState state)
 
void setMonStateSup (evf::FastMonState::InputState state)
 
- Protected Member Functions inherited from edm::RawInputSource
void makeEvent (EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
 
void setInputFileTransitionsEachEvent ()
 
- Protected Member Functions inherited from edm::InputSource
virtual void beginJob ()
 Begin protected makes it easier to do template programming. More...
 
void decreaseRemainingEventsBy (int iSkipped)
 
bool eventCached () const
 
bool newLumi () const
 
bool newRun () const
 
ProcessHistoryRegistryprocessHistoryRegistryForUpdate ()
 
ProductRegistryproductRegistryUpdate ()
 
void reset () const
 
void resetEventCached ()
 
void resetLuminosityBlockAuxiliary (bool isNewLumi=true) const
 
void resetNewLumi ()
 
void resetNewRun ()
 
void resetRunAuxiliary (bool isNewRun=true) const
 
void setEventCached ()
 Called by the framework to merge or ached() const {return eventCached_;}. More...
 
void setLuminosityBlockAuxiliary (LuminosityBlockAuxiliary *lbp)
 
void setNewLumi ()
 
void setNewRun ()
 
void setRunAuxiliary (RunAuxiliary *rp)
 
void setTimestamp (Timestamp const &theTime)
 To set the current time, as seen by the input source. More...
 
virtual void skip (int offset)
 
ItemTypeInfo state () const
 

Private Types

typedef std::pair< InputFile *, InputChunk * > ReaderInfo
 

Private Member Functions

bool exceptionState ()
 
edm::Timestamp fillFEDRawDataCollection (FEDRawDataCollection &rawData, bool &tcdsInRange)
 
evf::EvFDaqDirector::FileStatus getFile (unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
 
evf::EvFDaqDirector::FileStatus getNextEvent ()
 
long initFileList ()
 
void maybeOpenNewLumiSection (const uint32_t lumiSection)
 
evf::EvFDaqDirector::FileStatus nextEvent ()
 
void readNextChunkIntoBuffer (InputFile *file)
 
void readSupervisor ()
 
void readWorker (unsigned int tid)
 
void reportEventsThisLumiInSource (unsigned int lumi, unsigned int events)
 
void rewind_ () override
 
void threadError ()
 

Private Attributes

const bool alwaysStartFromFirstLS_
 
uint32_t bufferInputRead_ = 0
 
unsigned int checkEvery_ = 10
 
bool chunkIsFree_ = false
 
std::unique_ptr< InputFilecurrentFile_
 
int currentFileIndex_ = -1
 
unsigned int currentLumiSection_
 
std::vector< std::unique_ptr< std::condition_variable > > cvReader_
 
std::condition_variable cvWakeup_
 
evf::EvFDaqDirectordaqDirector_ = nullptr
 
const edm::DaqProvenanceHelper daqProvenanceHelper_
 
std::string defPath_
 
uint16_t detectedFRDversion_ = 0
 
std::unique_ptr< edm::streamer::FRDEventMsgViewevent_
 
unsigned int eventChunkBlock_
 
unsigned int eventChunkSize_
 
edm::EventID eventID_
 
uint32_t eventRunNumber_ = 0
 
unsigned int eventsThisLumi_
 
unsigned long eventsThisRun_ = 0
 
std::mutex fileDeleteLock_
 
int fileDescriptor_ = -1
 
unsigned int fileListIndex_ = 0
 
const bool fileListLoopMode_
 
const bool fileListMode_
 
std::vector< std::string > fileNames_
 
std::list< std::pair< int, std::string > > fileNamesToDelete_
 
tbb::concurrent_queue< std::unique_ptr< InputFile > > fileQueue_
 
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
 
evf::FastMonitoringServicefms_ = nullptr
 
tbb::concurrent_queue< InputChunk * > freeChunks_
 
std::string fuOutputDir_
 
const bool getLSFromFilename_
 
uint32_t GTPEventID_ = 0
 
uint32_t L1EventID_ = 0
 
unsigned int loopModeIterationInc_ = 0
 
unsigned int maxBufferedFiles_
 
uint16_t MAXTCDSuTCAFEDID_ = FEDNumbering::MAXTCDSuTCAFEDID
 
uint16_t MINTCDSuTCAFEDID_ = FEDNumbering::MINTCDSuTCAFEDID
 
std::mutex monlock_
 
std::mutex mReader_
 
std::mutex mWakeup_
 
unsigned int numBuffers_
 
unsigned int numConcurrentReads_
 
edm::ProcessHistoryID processHistoryID_
 
std::atomic< bool > quit_threads_
 
unsigned int readBlocks_
 
std::atomic< unsigned int > readingFilesCount_
 
std::unique_ptr< std::thread > readSupervisorThread_
 
edm::RunNumber_t runNumber_
 
bool setExceptionState_ = false
 
bool singleBufferMode_
 
std::map< unsigned int, unsigned int > sourceEventsReport_
 
bool startedSupervisorThread_ = false
 
std::condition_variable startupCv_
 
std::mutex startupLock_
 
std::vector< int > streamFileTracker_
 
unsigned char * tcds_pointer_
 
const std::vector< unsigned int > testTCDSFEDRange_
 
std::vector< unsigned int > thread_quit_signal
 
std::atomic< bool > threadInit_
 
std::vector< unsigned int > tid_active_
 
bool useFileBroker_
 
const bool useL1EventID_
 
const bool verifyChecksum_
 
std::vector< ReaderInfoworkerJob_
 
tbb::concurrent_queue< unsigned int > workerPool_
 
std::vector< std::thread * > workerThreads_
 

Friends

struct InputChunk
 
class InputFile
 

Additional Inherited Members

- Public Types inherited from edm::RawInputSource
enum  Next { Next::kEvent, Next::kFile, Next::kStop }
 
- Public Types inherited from edm::InputSource
enum  ItemPosition : char { ItemPosition::Invalid, ItemPosition::LastItemToBeMerged, ItemPosition::NotLastItemToBeMerged }
 
enum  ItemType : char {
  ItemType::IsInvalid, ItemType::IsStop, ItemType::IsFile, ItemType::IsRun,
  ItemType::IsLumi, ItemType::IsEvent, ItemType::IsRepeat, ItemType::IsSynchronize
}
 
enum  ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents }
 
- Public Attributes inherited from edm::InputSource
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> postEventReadFromSourceSignal_
 
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> preEventReadFromSourceSignal_
 

Detailed Description

Definition at line 42 of file FedRawDataInputSource.h.

Member Typedef Documentation

◆ ReaderInfo

Definition at line 140 of file FedRawDataInputSource.h.

Constructor & Destructor Documentation

◆ FedRawDataInputSource()

FedRawDataInputSource::FedRawDataInputSource ( edm::ParameterSet const &  pset,
edm::InputSourceDescription const &  desc 
)
explicit

Definition at line 52 of file FedRawDataInputSource.cc.

References cms::cuda::assert(), edm::Timestamp::beginOfTime(), crc32c_hw_test(), cvReader_, daqDirector_, edm::DaqProvenanceHelper::daqInit(), daqProvenanceHelper_, eventChunkBlock_, eventChunkSize_, Exception, fileDeleteLock_, fileListLoopMode_, fileListMode_, filesToDelete_, fms_, freeChunks_, mps_fire::i, evf::FastMonState::inInit, initFileList(), InputChunk, edm::Timestamp::invalidTimestamp(), MAXTCDSuTCAFEDID_, MINTCDSuTCAFEDID_, numBuffers_, numConcurrentReads_, Utilities::operator, processHistoryID_, edm::InputSource::processHistoryRegistryForUpdate(), edm::InputSource::productRegistryUpdate(), quit_threads_, readBlocks_, readingFilesCount_, readWorker(), edm::InputSource::runAuxiliary(), runNumber_, evf::EvFDaqDirector::setDeleteTracking(), evf::EvFDaqDirector::setFMS(), evf::FastMonitoringService::setInputSource(), evf::FastMonitoringService::setInState(), evf::FastMonitoringService::setInStateSup(), edm::InputSource::setNewRun(), edm::InputSource::setRunAuxiliary(), singleBufferMode_, startupCv_, startupLock_, testTCDSFEDRange_, thread_quit_signal, tid_active_, evf::EvFDaqDirector::useFileBroker(), useFileBroker_, workerJob_, and workerThreads_.

54  defPath_(pset.getUntrackedParameter<std::string>("buDefPath", "")),
55  eventChunkSize_(pset.getUntrackedParameter<unsigned int>("eventChunkSize", 32) * 1048576),
56  eventChunkBlock_(pset.getUntrackedParameter<unsigned int>("eventChunkBlock", 32) * 1048576),
57  numBuffers_(pset.getUntrackedParameter<unsigned int>("numBuffers", 2)),
58  maxBufferedFiles_(pset.getUntrackedParameter<unsigned int>("maxBufferedFiles", 2)),
59  getLSFromFilename_(pset.getUntrackedParameter<bool>("getLSFromFilename", true)),
60  alwaysStartFromFirstLS_(pset.getUntrackedParameter<bool>("alwaysStartFromFirstLS", false)),
61  verifyChecksum_(pset.getUntrackedParameter<bool>("verifyChecksum", true)),
62  useL1EventID_(pset.getUntrackedParameter<bool>("useL1EventID", false)),
64  pset.getUntrackedParameter<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())),
65  fileNames_(pset.getUntrackedParameter<std::vector<std::string>>("fileNames", std::vector<std::string>())),
66  fileListMode_(pset.getUntrackedParameter<bool>("fileListMode", false)),
67  fileListLoopMode_(pset.getUntrackedParameter<bool>("fileListLoopMode", false)),
70  eventID_(),
73  tcds_pointer_(nullptr),
74  eventsThisLumi_(0) {
75  char thishost[256];
76  gethostname(thishost, 255);
77  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: " << std::endl
78  << (eventChunkSize_ / 1048576) << " MB on host " << thishost;
79 
80  if (!testTCDSFEDRange_.empty()) {
81  if (testTCDSFEDRange_.size() != 2) {
82  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection")
83  << "Invalid TCDS Test FED range parameter";
84  }
87  }
88 
89  long autoRunNumber = -1;
90  if (fileListMode_) {
91  autoRunNumber = initFileList();
92  if (!fileListLoopMode_) {
93  if (autoRunNumber < 0)
94  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") << "Run number not found from filename";
95  //override run number
96  runNumber_ = (edm::RunNumber_t)autoRunNumber;
97  edm::Service<evf::EvFDaqDirector>()->overrideRunNumber((unsigned int)autoRunNumber);
98  }
99  }
100 
102  setNewRun();
103  //todo:autodetect from file name (assert if names differ)
105 
106  //make sure that chunk size is N * block size
111 
112  if (!numBuffers_)
113  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource")
114  << "no reading enabled with numBuffers parameter 0";
115 
118  readingFilesCount_ = 0;
119 
120  if (!crc32c_hw_test())
121  edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
122 
123  //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
124  if (fileListMode_) {
125  try {
127  } catch (cms::Exception const&) {
128  edm::LogInfo("FedRawDataInputSource") << "No FastMonitoringService found in the configuration";
129  }
130  } else {
132  if (!fms_) {
133  throw cms::Exception("FedRawDataInputSource") << "FastMonitoringService not found";
134  }
135  }
136 
138  if (!daqDirector_)
139  cms::Exception("FedRawDataInputSource") << "EvFDaqDirector not found";
140 
142  if (useFileBroker_)
143  edm::LogInfo("FedRawDataInputSource") << "EvFDaqDirector/Source configured to use file service";
144  //set DaqDirector to delete files in preGlobalEndLumi callback
146  if (fms_) {
148  fms_->setInputSource(this);
151  }
152  //should delete chunks when run stops
153  for (unsigned int i = 0; i < numBuffers_; i++) {
155  }
156 
157  quit_threads_ = false;
158 
159  //prepare data shared by threads
160  for (unsigned int i = 0; i < numConcurrentReads_; i++) {
161  thread_quit_signal.push_back(false);
162  workerJob_.push_back(ReaderInfo(nullptr, nullptr));
163  cvReader_.push_back(std::make_unique<std::condition_variable>());
164  tid_active_.push_back(0);
165  }
166 
167  //start threads
168  for (unsigned int i = 0; i < numConcurrentReads_; i++) {
169  //wait for each thread to complete initialization
170  std::unique_lock<std::mutex> lk(startupLock_);
171  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker, this, i));
172  startupCv_.wait(lk);
173  }
174 
175  runAuxiliary()->setProcessHistoryID(processHistoryID_);
176 }
std::vector< std::string > fileNames_
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:261
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:359
static Timestamp invalidTimestamp()
Definition: Timestamp.h:75
bool crc32c_hw_test()
Definition: crc32c.cc:354
bool useFileBroker() const
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
std::pair< InputFile *, InputChunk * > ReaderInfo
assert(be >=bs)
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
static Timestamp beginOfTime()
Definition: Timestamp.h:77
const std::vector< unsigned int > testTCDSFEDRange_
edm::ProcessHistoryID processHistoryID_
const edm::DaqProvenanceHelper daqProvenanceHelper_
std::vector< std::thread * > workerThreads_
void setInputSource(FedRawDataInputSource *inputSource)
Log< level::Info, false > LogInfo
def getRunNumber(filename)
void setInStateSup(FastMonState::InputState inputState)
std::condition_variable startupCv_
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:360
evf::EvFDaqDirector * daqDirector_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:362
void readWorker(unsigned int tid)
std::vector< unsigned int > thread_quit_signal
std::atomic< unsigned int > readingFilesCount_
std::vector< std::unique_ptr< std::condition_variable > > cvReader_
evf::FastMonitoringService * fms_
std::vector< unsigned int > tid_active_
unsigned int RunNumber_t
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
tbb::concurrent_queue< InputChunk * > freeChunks_
void setFMS(evf::FastMonitoringService *fms)
void setInState(FastMonState::InputState inputState)

◆ ~FedRawDataInputSource()

FedRawDataInputSource::~FedRawDataInputSource ( )
override

Definition at line 178 of file FedRawDataInputSource.cc.

References currentFile_, cvReader_, evf::FastMonitoringService::exceptionDetected(), filesToDelete_, fms_, mps_fire::i, evf::FastMonitoringService::isExceptionOnData(), ALPAKA_ACCELERATOR_NAMESPACE::vertexFinder::it, mReader_, quit_threads_, readSupervisorThread_, startedSupervisorThread_, thread_quit_signal, and workerThreads_.

178  {
179  quit_threads_ = true;
180 
181  //delete any remaining open files
182  if (!fms_ || !fms_->exceptionDetected()) {
183  for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++)
184  it->second.reset();
185  } else {
186  //skip deleting files with exception
187  for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++) {
188  //it->second->unsetDeleteFile();
189  if (fms_->isExceptionOnData(it->second->lumi_))
190  it->second->unsetDeleteFile();
191  else
192  it->second.reset();
193  }
194  //disable deleting current file with exception
195  if (currentFile_.get())
196  if (fms_->isExceptionOnData(currentFile_->lumi_))
197  currentFile_->unsetDeleteFile();
198  }
199 
201  readSupervisorThread_->join();
202  } else {
203  //join aux threads in case the supervisor thread was not started
204  for (unsigned int i = 0; i < workerThreads_.size(); i++) {
205  std::unique_lock<std::mutex> lk(mReader_);
206  thread_quit_signal[i] = true;
207  cvReader_[i]->notify_one();
208  lk.unlock();
209  workerThreads_[i]->join();
210  delete workerThreads_[i];
211  }
212  }
213 }
std::atomic< bool > quit_threads_
std::unique_ptr< std::thread > readSupervisorThread_
bool isExceptionOnData(unsigned int ls)
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
std::vector< std::thread * > workerThreads_
std::vector< unsigned int > thread_quit_signal
std::vector< std::unique_ptr< std::condition_variable > > cvReader_
evf::FastMonitoringService * fms_
std::unique_ptr< InputFile > currentFile_

Member Function Documentation

◆ checkNext()

edm::RawInputSource::Next FedRawDataInputSource::checkNext ( )
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 240 of file FedRawDataInputSource.cc.

References visDQMUpload::buf, evf::EvFDaqDirector::createProcessingNotificationMaybe(), currentLumiSection_, daqDirector_, event_, eventRunNumber_, eventsThisLumi_, fileListLoopMode_, fileListMode_, newFWLiteAna::found, evf::EvFDaqDirector::getEoLSFilePathOnBU(), evf::EvFDaqDirector::getEoLSFilePathOnFU(), evf::EvFDaqDirector::getEoRFilePathOnFU(), getLSFromFilename_, evf::FastMonState::inWaitInput, edm::RawInputSource::kEvent, edm::RawInputSource::kStop, L1EventID_, evf::EvFDaqDirector::lockFULocal2(), maybeOpenNewLumiSection(), evf::EvFDaqDirector::newLumi, nextEvent(), evf::EvFDaqDirector::noFile, readSupervisor(), readSupervisorThread_, reportEventsThisLumiInSource(), edm::InputSource::resetLuminosityBlockAuxiliary(), evf::EvFDaqDirector::runEnded, runNumber_, edm::InputSource::setEventCached(), setMonState(), startedSupervisorThread_, startupCv_, startupLock_, edm_modernize_messagelogger::stat, AlCaHLTBitMon_QueryRunRegistry::string, evf::EvFDaqDirector::unlockFULocal2(), and useFileBroker_.

240  {
242  //this thread opens new files and dispatches reading to worker readers
243  std::unique_lock<std::mutex> lk(startupLock_);
244  readSupervisorThread_ = std::make_unique<std::thread>(&FedRawDataInputSource::readSupervisor, this);
246  startupCv_.wait(lk);
247  }
248  //signal hltd to start event accounting
249  if (!currentLumiSection_)
252  switch (nextEvent()) {
254  //maybe create EoL file in working directory before ending run
255  struct stat buf;
256  if (!useFileBroker_ && currentLumiSection_ > 0) {
257  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
258  if (eolFound) {
260  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
261  if (!found) {
263  int eol_fd =
264  open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
265  close(eol_fd);
267  }
268  }
269  }
270  //also create EoR file in FU data directory
271  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(), &buf) == 0);
272  if (!eorFound) {
273  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(),
274  O_RDWR | O_CREAT,
275  S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
276  close(eor_fd);
277  }
279  eventsThisLumi_ = 0;
281  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
282  return Next::kStop;
283  }
285  //this is not reachable
286  return Next::kEvent;
287  }
289  //std::cout << "--------------NEW LUMI---------------" << std::endl;
290  return Next::kEvent;
291  }
292  default: {
293  if (!getLSFromFilename_) {
294  //get new lumi from file header
295  if (event_->lumi() > currentLumiSection_) {
297  eventsThisLumi_ = 0;
299  }
300  }
303  else
304  eventRunNumber_ = event_->run();
305  L1EventID_ = event_->event();
306 
307  setEventCached();
308 
309  return Next::kEvent;
310  }
311  }
312 }
void maybeOpenNewLumiSection(const uint32_t lumiSection)
std::string getEoLSFilePathOnFU(const unsigned int ls) const
std::unique_ptr< std::thread > readSupervisorThread_
std::string getEoRFilePathOnFU() const
std::string getEoLSFilePathOnBU(const unsigned int ls) const
void setMonState(evf::FastMonState::InputState state)
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:391
void createProcessingNotificationMaybe() const
Log< level::Info, false > LogInfo
std::condition_variable startupCv_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:374
evf::EvFDaqDirector * daqDirector_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
evf::EvFDaqDirector::FileStatus nextEvent()
std::unique_ptr< edm::streamer::FRDEventMsgView > event_

◆ exceptionState()

bool FedRawDataInputSource::exceptionState ( )
inlineprivate

Definition at line 70 of file FedRawDataInputSource.h.

References setExceptionState_.

Referenced by InputFile::advance().

◆ fillDescriptions()

void FedRawDataInputSource::fillDescriptions ( edm::ConfigurationDescriptions descriptions)
static

Definition at line 215 of file FedRawDataInputSource.cc.

References edm::ConfigurationDescriptions::add(), and submitPVResolutionJobs::desc.

215  {
217  desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk");
218  desc.addUntracked<unsigned int>("eventChunkSize", 32)->setComment("Input buffer (chunk) size");
219  desc.addUntracked<unsigned int>("eventChunkBlock", 32)
220  ->setComment("Block size used in a single file read call (must be smaller or equal to buffer size)");
221  desc.addUntracked<unsigned int>("numBuffers", 2)->setComment("Number of buffers used for reading input");
222  desc.addUntracked<unsigned int>("maxBufferedFiles", 2)
223  ->setComment("Maximum number of simultaneously buffered raw files");
224  desc.addUntracked<unsigned int>("alwaysStartFromfirstLS", false)
225  ->setComment("Force source to start from LS 1 if server provides higher lumisection number");
226  desc.addUntracked<bool>("verifyChecksum", true)
227  ->setComment("Verify event CRC-32C checksum of FRDv5 and higher or Adler32 with v3 and v4");
228  desc.addUntracked<bool>("useL1EventID", false)
229  ->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
230  desc.addUntracked<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())
231  ->setComment("[min, max] range to search for TCDS FED ID in test setup");
232  desc.addUntracked<bool>("fileListMode", false)
233  ->setComment("Use fileNames parameter to directly specify raw files to open");
234  desc.addUntracked<std::vector<std::string>>("fileNames", std::vector<std::string>())
235  ->setComment("file list used when fileListMode is enabled");
236  desc.setAllowAnything();
237  descriptions.add("source", desc);
238 }
void add(std::string const &label, ParameterSetDescription const &psetDescription)

◆ fillFEDRawDataCollection()

edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection ( FEDRawDataCollection rawData,
bool &  tcdsInRange 
)
private

Definition at line 719 of file FedRawDataInputSource.cc.

References cms::cuda::assert(), FEDRawData::data(), event_, evf::evtn::evm_board_sense(), Exception, l1tstage2_dqm_sourceclient-live_cfg::fedId, FEDTrailer::fragmentLength(), evf::evtn::get(), evf::evtn::getgpshigh(), evf::evtn::getgpslow(), evf::evtn::gtpe_board_sense(), evf::evtn::gtpe_get(), GTPEventID_, FEDHeader::length, FEDTrailer::length, FEDNumbering::MAXFEDID, FEDNumbering::MAXTCDSuTCAFEDID, MAXTCDSuTCAFEDID_, FEDNumbering::MINTCDSuTCAFEDID, MINTCDSuTCAFEDID_, FEDNumbering::MINTriggerEGTPFEDID, FEDNumbering::MINTriggerGTPFEDID, l1tstage2_dqm_sourceclient-live_cfg::rawData, FEDRawData::resize(), FEDHeader::sourceID(), tcds_pointer_, and hcalRecHitTable_cff::time.

Referenced by read().

719  {
721  timeval stv;
722  gettimeofday(&stv, nullptr);
723  time = stv.tv_sec;
724  time = (time << 32) + stv.tv_usec;
725  edm::Timestamp tstamp(time);
726 
727  uint32_t eventSize = event_->eventSize();
728  unsigned char* event = (unsigned char*)event_->payload();
729  GTPEventID_ = 0;
730  tcds_pointer_ = nullptr;
731  tcdsInRange = false;
732  uint16_t selectedTCDSFed = 0;
733  while (eventSize > 0) {
734  assert(eventSize >= FEDTrailer::length);
735  eventSize -= FEDTrailer::length;
736  const FEDTrailer fedTrailer(event + eventSize);
737  const uint32_t fedSize = fedTrailer.fragmentLength() << 3; //trailer length counts in 8 bytes
738  assert(eventSize >= fedSize - FEDHeader::length);
739  eventSize -= (fedSize - FEDHeader::length);
740  const FEDHeader fedHeader(event + eventSize);
741  const uint16_t fedId = fedHeader.sourceID();
743  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
744  } else if (fedId >= MINTCDSuTCAFEDID_ && fedId <= MAXTCDSuTCAFEDID_) {
745  if (!selectedTCDSFed) {
746  selectedTCDSFed = fedId;
747  tcds_pointer_ = event + eventSize;
749  tcdsInRange = true;
750  }
751  } else
752  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection")
753  << "Second TCDS FED ID " << fedId << " found. First ID: " << selectedTCDSFed;
754  }
756  if (evf::evtn::evm_board_sense(event + eventSize, fedSize))
757  GTPEventID_ = evf::evtn::get(event + eventSize, true);
758  else
759  GTPEventID_ = evf::evtn::get(event + eventSize, false);
760  //evf::evtn::evm_board_setformat(fedSize);
761  const uint64_t gpsl = evf::evtn::getgpslow(event + eventSize);
762  const uint64_t gpsh = evf::evtn::getgpshigh(event + eventSize);
763  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t>((gpsh << 32) + gpsl));
764  }
765  //take event ID from GTPE FED
767  if (evf::evtn::gtpe_board_sense(event + eventSize)) {
768  GTPEventID_ = evf::evtn::gtpe_get(event + eventSize);
769  }
770  }
771  FEDRawData& fedData = rawData.FEDData(fedId);
772  fedData.resize(fedSize);
773  memcpy(fedData.data(), event + eventSize, fedSize);
774  }
775  assert(eventSize == 0);
776 
777  return tstamp;
778 }
unsigned int getgpshigh(const unsigned char *)
bool gtpe_board_sense(const unsigned char *p)
static const uint32_t length
Definition: FEDTrailer.h:57
unsigned int get(const unsigned char *, bool)
static const uint32_t length
Definition: FEDHeader.h:54
assert(be >=bs)
unsigned long long TimeValue_t
Definition: Timestamp.h:21
bool evm_board_sense(const unsigned char *p, size_t size)
unsigned long long uint64_t
Definition: Time.h:13
void resize(size_t newsize, size_t wordsize=8)
Definition: FEDRawData.cc:28
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:24
unsigned int gtpe_get(const unsigned char *)
unsigned int getgpslow(const unsigned char *)
std::unique_ptr< edm::streamer::FRDEventMsgView > event_
Definition: event.py:1

◆ getEventReport()

std::pair< bool, unsigned int > FedRawDataInputSource::getEventReport ( unsigned int  lumi,
bool  erase 
)

Definition at line 1611 of file FedRawDataInputSource.cc.

References CommonMethods::lock(), monlock_, runTheMatrix::ret, and sourceEventsReport_.

Referenced by evf::FastMonitoringService::preGlobalEndLumi().

1611  {
1612  std::lock_guard<std::mutex> lock(monlock_);
1613  auto itr = sourceEventsReport_.find(lumi);
1614  if (itr != sourceEventsReport_.end()) {
1615  std::pair<bool, unsigned int> ret(true, itr->second);
1616  if (erase)
1617  sourceEventsReport_.erase(itr);
1618  return ret;
1619  } else
1620  return std::pair<bool, unsigned int>(false, 0);
1621 }
ret
prodAgent to be discontinued
std::map< unsigned int, unsigned int > sourceEventsReport_

◆ getFile()

evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getFile ( unsigned int &  ls,
std::string &  nextFile,
uint32_t &  fsize,
uint64_t &  lockWaitTime 
)
private

Definition at line 1658 of file FedRawDataInputSource.cc.

References fileListIndex_, fileListLoopMode_, MillePedeFileConverter_cfg::fileName, fileNames_, loopModeIterationInc_, eostools::ls(), evf::EvFDaqDirector::newFile, castor_dqm_sourceclient_file_cfg::path, evf::EvFDaqDirector::runEnded, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by readSupervisor().

1661  {
1662  if (fileListIndex_ < fileNames_.size()) {
1663  nextFile = fileNames_[fileListIndex_];
1664  if (nextFile.find("file://") == 0)
1665  nextFile = nextFile.substr(7);
1666  else if (nextFile.find("file:") == 0)
1667  nextFile = nextFile.substr(5);
1668  std::filesystem::path fileName = nextFile;
1669  std::string fileStem = fileName.stem().string();
1670  if (fileStem.find("ls"))
1671  fileStem = fileStem.substr(fileStem.find("ls") + 2);
1672  if (fileStem.find('_'))
1673  fileStem = fileStem.substr(0, fileStem.find('_'));
1674 
1675  if (!fileListLoopMode_)
1676  ls = std::stoul(fileStem);
1677  else //always starting from LS 1 in loop mode
1678  ls = 1 + loopModeIterationInc_;
1679 
1680  //fsize = 0;
1681  //lockWaitTime = 0;
1682  fileListIndex_++;
1684  } else {
1685  if (!fileListLoopMode_)
1687  else {
1688  //loop through files until interrupted
1690  fileListIndex_ = 0;
1691  return getFile(ls, nextFile, fsize, lockWaitTime);
1692  }
1693  }
1694 }
std::vector< std::string > fileNames_
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
def ls(path, rec=False)
Definition: eostools.py:349

◆ getNextEvent()

evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent ( )
inlineprivate

Definition at line 360 of file FedRawDataInputSource.cc.

References cms::cuda::assert(), bufferInputRead_, chunkIsFree_, crc32c(), currentFile_, currentFileIndex_, currentLumiSection_, cvWakeup_, daqDirector_, detectedFRDversion_, event_, eventChunkSize_, eventsThisLumi_, Exception, fileDeleteLock_, fileListMode_, fileQueue_, filesToDelete_, fms_, edm::streamer::FRDHeaderMaxVersion, edm::streamer::FRDHeaderVersionSize, freeChunks_, getLSFromFilename_, evf::FastMonState::inCachedEvent, evf::FastMonState::inChecksumEvent, evf::FastMonState::inChunkReceived, evf::FastMonState::inNewLumi, evf::FastMonState::inProcessingFile, evf::FastMonState::inRunEnd, evf::FastMonState::inWaitChunk, evf::FastMonState::inWaitInput, evf::EvFDaqDirector::isSingleStreamThread(), dttmaxenums::L, maybeOpenNewLumiSection(), eostools::move(), mWakeup_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, readingFilesCount_, readNextChunkIntoBuffer(), reportEventsThisLumiInSource(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, evf::EvFDaqDirector::sameFile, evf::FastMonitoringService::setExceptionDetected(), setExceptionState_, setMonState(), singleBufferMode_, mps_update::status, threadError(), mps_check::timeout, and verifyChecksum_.

Referenced by nextEvent().

360  {
361  if (setExceptionState_)
362  threadError();
363  if (!currentFile_.get()) {
366  {
367  IdleSourceSentry ids(fms_);
368  if (!fileQueue_.try_pop(currentFile_)) {
369  //sleep until wakeup (only in single-buffer mode) or timeout
370  std::unique_lock<std::mutex> lkw(mWakeup_);
371  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_.get())
373  }
374  }
375  status = currentFile_->status_;
378  currentFile_.reset();
379  return status;
380  } else if (status == evf::EvFDaqDirector::runAbort) {
381  throw cms::Exception("FedRawDataInputSource::getNextEvent")
382  << "Run has been aborted by the input source reader thread";
383  } else if (status == evf::EvFDaqDirector::newLumi) {
385  if (getLSFromFilename_) {
386  if (currentFile_->lumi_ > currentLumiSection_) {
388  eventsThisLumi_ = 0;
390  }
391  } else { //let this be picked up from next event
393  }
394  currentFile_.reset();
395  return status;
396  } else if (status == evf::EvFDaqDirector::newFile) {
398  } else
399  assert(false);
400  }
402 
403  //file is empty
404  if (!currentFile_->fileSize_) {
406  //try to open new lumi
407  assert(currentFile_->nChunks_ == 0);
408  if (getLSFromFilename_)
409  if (currentFile_->lumi_ > currentLumiSection_) {
411  eventsThisLumi_ = 0;
413  }
414  //immediately delete empty file
415  currentFile_.reset();
417  }
418 
419  //file is finished
420  if (currentFile_->bufferPosition_ == currentFile_->fileSize_) {
422  //release last chunk (it is never released elsewhere)
423  freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]);
424  if (currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_)) {
425  throw cms::Exception("FedRawDataInputSource::getNextEvent")
426  << "Fully processed " << currentFile_->nProcessed_ << " from the file " << currentFile_->fileName_
427  << " but according to BU JSON there should be " << currentFile_->nEvents_ << " events";
428  }
429  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
430  if (singleBufferMode_) {
431  std::unique_lock<std::mutex> lkw(mWakeup_);
432  cvWakeup_.notify_one();
433  }
434  bufferInputRead_ = 0;
436  //put the file in pending delete list;
437  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
438  filesToDelete_.push_back(std::pair<int, std::unique_ptr<InputFile>>(currentFileIndex_, std::move(currentFile_)));
439  } else {
440  //in single-thread and stream jobs, events are already processed
441  currentFile_.reset();
442  }
444  }
445 
446  //handle RAW file header
447  if (currentFile_->bufferPosition_ == 0 && currentFile_->rawHeaderSize_ > 0) {
448  if (currentFile_->fileSize_ <= currentFile_->rawHeaderSize_) {
449  if (currentFile_->fileSize_ < currentFile_->rawHeaderSize_)
450  throw cms::Exception("FedRawDataInputSource::getNextEvent")
451  << "Premature end of input file while reading file header";
452 
453  edm::LogWarning("FedRawDataInputSource")
454  << "File with only raw header and no events received in LS " << currentFile_->lumi_;
455  if (getLSFromFilename_)
456  if (currentFile_->lumi_ > currentLumiSection_) {
458  eventsThisLumi_ = 0;
460  }
461  }
462 
463  //advance buffer position to skip file header (chunk will be acquired later)
464  currentFile_->chunkPosition_ += currentFile_->rawHeaderSize_;
465  currentFile_->bufferPosition_ += currentFile_->rawHeaderSize_;
466  }
467 
468  //file is too short
469  if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < FRDHeaderVersionSize[detectedFRDversion_]) {
470  throw cms::Exception("FedRawDataInputSource::getNextEvent")
471  << "Premature end of input file while reading event header";
472  }
473  if (singleBufferMode_) {
474  //should already be there
476  {
477  IdleSourceSentry ids(fms_);
478  while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
479  usleep(10000);
480  if (currentFile_->parent_->exceptionState() || setExceptionState_)
481  currentFile_->parent_->threadError();
482  }
483  }
485 
486  unsigned char* dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
487 
488  //conditions when read amount is not sufficient for the header to fit
492 
493  if (detectedFRDversion_ == 0) {
494  detectedFRDversion_ = *((uint16_t*)dataPosition);
496  throw cms::Exception("FedRawDataInputSource::getNextEvent")
497  << "Unknown FRD version -: " << detectedFRDversion_;
499  }
500 
501  //recalculate chunk position
502  dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
504  throw cms::Exception("FedRawDataInputSource::getNextEvent")
505  << "Premature end of input file while reading event header";
506  }
507  }
508 
509  event_ = std::make_unique<FRDEventMsgView>(dataPosition);
510  if (event_->size() > eventChunkSize_) {
511  throw cms::Exception("FedRawDataInputSource::getNextEvent")
512  << " event id:" << event_->event() << " lumi:" << event_->lumi() << " run:" << event_->run()
513  << " of size:" << event_->size() << " bytes does not fit into a chunk of size:" << eventChunkSize_
514  << " bytes";
515  }
516 
517  const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];
518 
519  if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize) {
520  throw cms::Exception("FedRawDataInputSource::getNextEvent")
521  << "Premature end of input file while reading event data";
522  }
523  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
525  //recalculate chunk position
526  dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
527  event_ = std::make_unique<FRDEventMsgView>(dataPosition);
528  }
529  currentFile_->bufferPosition_ += event_->size();
530  currentFile_->chunkPosition_ += event_->size();
531  //last chunk is released when this function is invoked next time
532 
533  }
534  //multibuffer mode:
535  else {
536  //wait for the current chunk to become added to the vector
538  {
539  IdleSourceSentry ids(fms_);
540  while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
541  usleep(10000);
542  if (setExceptionState_)
543  threadError();
544  }
545  }
547 
548  //check if header is at the boundary of two chunks
549  chunkIsFree_ = false;
550  unsigned char* dataPosition;
551 
552  //read header, copy it to a single chunk if necessary
554  throw cms::Exception("FedRawDataInputSource::getNextEvent")
555  << "Premature end of input file (missing:"
557  << ") while reading event data for next event header";
558  bool chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_]);
559 
560  event_ = std::make_unique<FRDEventMsgView>(dataPosition);
561  if (event_->size() > eventChunkSize_) {
562  throw cms::Exception("FedRawDataInputSource::getNextEvent")
563  << " event id:" << event_->event() << " lumi:" << event_->lumi() << " run:" << event_->run()
564  << " of size:" << event_->size() << " bytes does not fit into a chunk of size:" << eventChunkSize_
565  << " bytes";
566  }
567 
568  const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];
569 
570  if (currentFile_->fileSizeLeft() < msgSize) {
571  throw cms::Exception("FedRawDataInputSource::getNextEvent")
572  << "Premature end of input file (missing:" << (msgSize - currentFile_->fileSizeLeft())
573  << ") while reading event data for event " << event_->event() << " lumi:" << event_->lumi();
574  }
575 
576  if (chunkEnd) {
577  //header was at the chunk boundary, we will have to move payload as well
578  currentFile_->moveToPreviousChunk(msgSize, FRDHeaderVersionSize[detectedFRDversion_]);
579  chunkIsFree_ = true;
580  } else {
581  //header was contiguous, but check if payload fits the chunk
582  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
583  //rewind to header start position
585  //copy event to a chunk start and move pointers
586 
588  {
589  IdleSourceSentry ids(fms_);
590  chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_] + msgSize);
591  }
593 
594  assert(chunkEnd);
595  chunkIsFree_ = true;
596  //header is moved
597  event_ = std::make_unique<FRDEventMsgView>(dataPosition);
598  } else {
599  //everything is in a single chunk, only move pointers forward
600  chunkEnd = currentFile_->advance(dataPosition, msgSize);
601  assert(!chunkEnd);
602  chunkIsFree_ = false;
603  }
604  }
605  //sanity-check check that the buffer position has not exceeded file size after preparing event
606  if (currentFile_->fileSize_ < currentFile_->bufferPosition_) {
607  throw cms::Exception("FedRawDataInputSource::getNextEvent")
608  << "Exceeded file size by " << currentFile_->bufferPosition_ - currentFile_->fileSize_
609  << " after reading last event declared size of " << event_->size() << " bytes";
610  }
611  } //end multibuffer mode
613 
614  if (verifyChecksum_ && event_->version() >= 5) {
615  uint32_t crc = 0;
616  crc = crc32c(crc, (const unsigned char*)event_->payload(), event_->eventSize());
617  if (crc != event_->crc32c()) {
618  if (fms_)
620  throw cms::Exception("FedRawDataInputSource::getNextEvent")
621  << "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() << " but calculated 0x"
622  << crc;
623  }
624  } else if (verifyChecksum_ && event_->version() >= 3) {
625  uint32_t adler = adler32(0L, Z_NULL, 0);
626  adler = adler32(adler, (Bytef*)event_->payload(), event_->eventSize());
627 
628  if (adler != event_->adler32()) {
629  if (fms_)
631  throw cms::Exception("FedRawDataInputSource::getNextEvent")
632  << "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() << " but calculated 0x"
633  << adler;
634  }
635  }
637 
638  currentFile_->nProcessed_++;
639 
641 }
std::condition_variable cvWakeup_
void setExceptionDetected(unsigned int ls)
constexpr std::array< uint32, FRDHeaderMaxVersion+1 > FRDHeaderVersionSize
void maybeOpenNewLumiSection(const uint32_t lumiSection)
tbb::concurrent_queue< std::unique_ptr< InputFile > > fileQueue_
constexpr size_t FRDHeaderMaxVersion
int timeout
Definition: mps_check.py:53
assert(be >=bs)
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
void setMonState(evf::FastMonState::InputState state)
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
Definition: crc32c.cc:340
evf::EvFDaqDirector * daqDirector_
std::atomic< unsigned int > readingFilesCount_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
evf::FastMonitoringService * fms_
tbb::concurrent_queue< InputChunk * > freeChunks_
Log< level::Warning, false > LogWarning
void readNextChunkIntoBuffer(InputFile *file)
std::unique_ptr< InputFile > currentFile_
def move(src, dest)
Definition: eostools.py:511
std::unique_ptr< edm::streamer::FRDEventMsgView > event_

◆ initFileList()

long FedRawDataInputSource::initFileList ( )
private

Definition at line 1623 of file FedRawDataInputSource.cc.

References a, b, mps_fire::end, cppFunctionSkipper::exception, MillePedeFileConverter_cfg::fileName, fileNames_, castor_dqm_sourceclient_file_cfg::path, jetUpdater_cfi::sort, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FedRawDataInputSource().

1623  {
1624  std::sort(fileNames_.begin(), fileNames_.end(), [](std::string a, std::string b) {
1625  if (a.rfind('/') != std::string::npos)
1626  a = a.substr(a.rfind('/'));
1627  if (b.rfind('/') != std::string::npos)
1628  b = b.substr(b.rfind('/'));
1629  return b > a;
1630  });
1631 
1632  if (!fileNames_.empty()) {
1633  //get run number from first file in the vector
1635  std::string fileStem = fileName.stem().string();
1636  if (fileStem.find("file://") == 0)
1637  fileStem = fileStem.substr(7);
1638  else if (fileStem.find("file:") == 0)
1639  fileStem = fileStem.substr(5);
1640  auto end = fileStem.find('_');
1641 
1642  if (fileStem.find("run") == 0) {
1643  std::string runStr = fileStem.substr(3, end - 3);
1644  try {
1645  //get long to support test run numbers < 2^32
1646  long rval = std::stol(runStr);
1647  edm::LogInfo("FedRawDataInputSource") << "Autodetected run number in fileListMode -: " << rval;
1648  return rval;
1649  } catch (const std::exception&) {
1650  edm::LogWarning("FedRawDataInputSource")
1651  << "Unable to autodetect run number in fileListMode from file -: " << fileName;
1652  }
1653  }
1654  }
1655  return -1;
1656 }
std::vector< std::string > fileNames_
Log< level::Info, false > LogInfo
double b
Definition: hdecay.h:120
double a
Definition: hdecay.h:121
Log< level::Warning, false > LogWarning

◆ maybeOpenNewLumiSection()

void FedRawDataInputSource::maybeOpenNewLumiSection ( const uint32_t  lumiSection)
private

Definition at line 314 of file FedRawDataInputSource.cc.

References visDQMUpload::buf, evf::EvFDaqDirector::createBoLSFile(), currentLumiSection_, daqDirector_, newFWLiteAna::found, evf::EvFDaqDirector::getEoLSFilePathOnFU(), edm::Timestamp::invalidTimestamp(), evf::EvFDaqDirector::lockFULocal2(), edm::InputSource::luminosityBlock(), edm::InputSource::luminosityBlockAuxiliary(), processHistoryID_, edm::InputSource::resetLuminosityBlockAuxiliary(), edm::InputSource::run(), edm::InputSource::runAuxiliary(), edm::InputSource::setLuminosityBlockAuxiliary(), edm_modernize_messagelogger::stat, AlCaHLTBitMon_QueryRunRegistry::string, evf::EvFDaqDirector::unlockFULocal2(), and useFileBroker_.

Referenced by checkNext(), and getNextEvent().

314  {
315  if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
316  if (!useFileBroker_) {
317  if (currentLumiSection_ > 0) {
319  struct stat buf;
320  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
321  if (!found) {
323  int eol_fd =
324  open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
325  close(eol_fd);
326  daqDirector_->createBoLSFile(lumiSection, false);
328  }
329  } else
330  daqDirector_->createBoLSFile(lumiSection, true); //needed for initial lumisection
331  }
332 
333  currentLumiSection_ = lumiSection;
334 
336 
337  timeval tv;
338  gettimeofday(&tv, nullptr);
339  const edm::Timestamp lsopentime((unsigned long long)tv.tv_sec * 1000000 + (unsigned long long)tv.tv_usec);
340 
342  runAuxiliary()->run(), lumiSection, lsopentime, edm::Timestamp::invalidTimestamp());
343 
344  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
345  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
346 
347  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: " << lumiSection;
348  }
349 }
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:261
static Timestamp invalidTimestamp()
Definition: Timestamp.h:75
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:458
std::string getEoLSFilePathOnFU(const unsigned int ls) const
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:366
edm::ProcessHistoryID processHistoryID_
Log< level::Info, false > LogInfo
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:374
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:463
evf::EvFDaqDirector * daqDirector_
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:264
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const

◆ nextEvent()

evf::EvFDaqDirector::FileStatus FedRawDataInputSource::nextEvent ( )
inlineprivate

Definition at line 351 of file FedRawDataInputSource.cc.

References getNextEvent(), svgfig::load(), evf::EvFDaqDirector::noFile, edm::shutdown_flag, and mps_update::status.

Referenced by checkNext().

351  {
354  if (edm::shutdown_flag.load(std::memory_order_relaxed))
355  break;
356  }
357  return status;
358 }
volatile std::atomic< bool > shutdown_flag
def load(fileName)
Definition: svgfig.py:547
evf::EvFDaqDirector::FileStatus getNextEvent()

◆ read()

void FedRawDataInputSource::read ( edm::EventPrincipal eventPrincipal)
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 643 of file FedRawDataInputSource.cc.

References printConversionInfo::aux, edm::DaqProvenanceHelper::branchDescription(), checkEvery_, chunkIsFree_, runTheMatrix::const, currentFile_, currentFileIndex_, currentLumiSection_, daqProvenanceHelper_, edm::DaqProvenanceHelper::dummyProvenance(), event_, eventID_, eventRunNumber_, eventsThisLumi_, Exception, fileDeleteLock_, fileListLoopMode_, filesToDelete_, fillFEDRawDataCollection(), fms_, freeChunks_, GTPEventID_, mps_fire::i, evf::FastMonState::inNoRequest, evf::FastMonState::inReadCleanup, evf::FastMonState::inReadEvent, evf::FastMonitoringService::isExceptionOnData(), ALPAKA_ACCELERATOR_NAMESPACE::vertexFinder::it, L1EventID_, FEDHeader::length, edm::RawInputSource::makeEvent(), evf::evtn::makeEventAuxiliary(), eostools::move(), edm::EventAuxiliary::PhysicsTrigger, edm::InputSource::processGUID(), processHistoryID_, edm::EventPrincipal::put(), l1tstage2_dqm_sourceclient-live_cfg::rawData, setMonState(), streamFileTracker_, edm::EventPrincipal::streamID(), AlCaHLTBitMon_QueryRunRegistry::string, tcds_pointer_, FEDHeader::triggerType(), and useL1EventID_.

Referenced by edmIntegrityCheck.PublishToFileSystem::get(), readNextChunkIntoBuffer(), and readWorker().

643  {
645  std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
646  bool tcdsInRange;
647  edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData, tcdsInRange);
648 
649  if (useL1EventID_) {
652  aux.setProcessHistoryID(processHistoryID_);
653  makeEvent(eventPrincipal, aux);
654  } else if (tcds_pointer_ == nullptr) {
655  if (!GTPEventID_) {
656  throw cms::Exception("FedRawDataInputSource::read")
657  << "No TCDS or GTP FED in event with FEDHeader EID -: " << L1EventID_;
658  }
661  aux.setProcessHistoryID(processHistoryID_);
662  makeEvent(eventPrincipal, aux);
663  } else {
664  const FEDHeader fedHeader(tcds_pointer_);
665  tcds::Raw_v1 const* tcds = reinterpret_cast<tcds::Raw_v1 const*>(tcds_pointer_ + FEDHeader::length);
670  event_->isRealData(),
671  static_cast<edm::EventAuxiliary::ExperimentType>(fedHeader.triggerType()),
672  processGUID(),
674  !tcdsInRange);
675  aux.setProcessHistoryID(processHistoryID_);
676  makeEvent(eventPrincipal, aux);
677  }
678 
679  std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
680 
682 
683  eventsThisLumi_++;
685 
686  //resize vector if needed
687  while (streamFileTracker_.size() <= eventPrincipal.streamID())
688  streamFileTracker_.push_back(-1);
689 
690  streamFileTracker_[eventPrincipal.streamID()] = currentFileIndex_;
691 
692  //this old file check runs no more often than every 10 events
693  if (!((currentFile_->nProcessed_ - 1) % (checkEvery_))) {
694  //delete files that are not in processing
695  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
696  auto it = filesToDelete_.begin();
697  while (it != filesToDelete_.end()) {
698  bool fileIsBeingProcessed = false;
699  for (unsigned int i = 0; i < streamFileTracker_.size(); i++) {
700  if (it->first == streamFileTracker_.at(i)) {
701  fileIsBeingProcessed = true;
702  break;
703  }
704  }
705  if (!fileIsBeingProcessed && (!fms_ || !fms_->isExceptionOnData(it->second->lumi_))) {
706  std::string fileToDelete = it->second->fileName_;
707  it = filesToDelete_.erase(it);
708  } else
709  it++;
710  }
711  }
712  if (chunkIsFree_)
713  freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_ - 1]);
714  chunkIsFree_ = false;
716  return;
717 }
static const uint32_t length
Definition: FEDHeader.h:54
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection &rawData, bool &tcdsInRange)
StreamID streamID() const
bool isExceptionOnData(unsigned int ls)
edm::EventAuxiliary makeEventAuxiliary(const tcds::Raw_v1 *, unsigned int runNumber, unsigned int lumiSection, bool isRealData, const edm::EventAuxiliary::ExperimentType &, const std::string &processGUID, bool verifyLumiSection, bool suppressWarning)
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance) const
Definition: TCDSRaw.h:16
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:226
std::vector< int > streamFileTracker_
ProductProvenance const & dummyProvenance() const
edm::ProcessHistoryID processHistoryID_
const edm::DaqProvenanceHelper daqProvenanceHelper_
void setMonState(evf::FastMonState::InputState state)
BranchDescription const & branchDescription() const
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
evf::FastMonitoringService * fms_
tbb::concurrent_queue< InputChunk * > freeChunks_
std::unique_ptr< InputFile > currentFile_
def move(src, dest)
Definition: eostools.py:511
std::unique_ptr< edm::streamer::FRDEventMsgView > event_

◆ readNextChunkIntoBuffer()

void FedRawDataInputSource::readNextChunkIntoBuffer ( InputFile file)
private

Definition at line 1531 of file FedRawDataInputSource.cc.

References bufferInputRead_, eventChunkBlock_, eventChunkSize_, Exception, geometryDiff::file, fileDescriptor_, mps_fire::i, dqmdumpme::last, LogDebug, read(), and readBlocks_.

Referenced by getNextEvent().

1531  {
1532  uint32_t existingSize = 0;
1533 
1534  if (fileDescriptor_ < 0) {
1535  bufferInputRead_ = 0;
1536  if (file->rawFd_ == -1) {
1537  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1538  if (file->rawHeaderSize_)
1539  lseek(fileDescriptor_, file->rawHeaderSize_, SEEK_SET);
1540  } else
1541  fileDescriptor_ = file->rawFd_;
1542 
1543  //skip header size in destination buffer (chunk position was already adjusted)
1544  bufferInputRead_ += file->rawHeaderSize_;
1545  existingSize += file->rawHeaderSize_;
1546 
1547  if (fileDescriptor_ >= 0)
1548  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1549  else {
1550  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer")
1551  << "failed to open file " << std::endl
1552  << file->fileName_ << " fd:" << fileDescriptor_;
1553  }
1554  //fill chunk (skipping file header if present)
1555  for (unsigned int i = 0; i < readBlocks_; i++) {
1556  const ssize_t last = ::read(fileDescriptor_,
1557  (void*)(file->chunks_[0]->buf_ + existingSize),
1558  eventChunkBlock_ - (i == readBlocks_ - 1 ? existingSize : 0));
1560  existingSize += last;
1561  }
1562 
1563  } else {
1564  //continue reading
1565  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1566  for (unsigned int i = 0; i < readBlocks_; i++) {
1567  const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1569  existingSize += last;
1570  }
1571  } else {
1572  //event didn't fit in last chunk, so leftover must be moved to the beginning and completed
1573  uint32_t existingSizeLeft = eventChunkSize_ - file->chunkPosition_;
1574  memmove((void*)file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSizeLeft);
1575 
1576  //calculate amount of data that can be added
1577  const uint32_t blockcount = file->chunkPosition_ / eventChunkBlock_;
1578  const uint32_t leftsize = file->chunkPosition_ % eventChunkBlock_;
1579 
1580  for (uint32_t i = 0; i < blockcount; i++) {
1581  const ssize_t last =
1582  ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), eventChunkBlock_);
1584  existingSizeLeft += last;
1585  }
1586  if (leftsize) {
1587  const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), leftsize);
1589  }
1590  file->chunkPosition_ = 0; //data was moved to beginning of the chunk
1591  }
1592  }
1593  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1594  if (fileDescriptor_ != -1) {
1595  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1596  close(fileDescriptor_);
1597  file->rawFd_ = fileDescriptor_ = -1;
1598  }
1599  }
1600 }
void read(edm::EventPrincipal &eventPrincipal) override
#define LogDebug(id)

◆ readSupervisor()

void FedRawDataInputSource::readSupervisor ( )
private

Definition at line 782 of file FedRawDataInputSource.cc.

References alwaysStartFromFirstLS_, cms::cuda::assert(), cvReader_, cvWakeup_, daqDirector_, relativeConstraints::empty, eventChunkSize_, fileListMode_, fileQueue_, fms_, freeChunks_, getFile(), getLSFromFilename_, evf::EvFDaqDirector::getLumisectionToStart(), evf::EvFDaqDirector::getNextFromFileBroker(), evf::EvFDaqDirector::getStartLumisectionFromEnv(), evf::EvFDaqDirector::grabNextJsonFileAndUnlock(), evf::EvFDaqDirector::grabNextJsonFromRaw(), mps_fire::i, dqmiodatasetharvest::inf, InputFile, evf::EvFDaqDirector::inputThrottled(), evf::FastMonState::inRunEnd, evf::FastMonState::inSupBusy, evf::FastMonState::inSupFileLimit, evf::FastMonState::inSupLockPolling, evf::FastMonState::inSupNewFile, evf::FastMonState::inSupNewFileWaitChunk, evf::FastMonState::inSupNewFileWaitChunkCopying, evf::FastMonState::inSupNewFileWaitThread, evf::FastMonState::inSupNewFileWaitThreadCopying, evf::FastMonState::inSupNoFile, evf::FastMonState::inSupWaitFreeChunk, evf::FastMonState::inSupWaitFreeChunkCopying, evf::FastMonState::inSupWaitFreeThread, evf::FastMonState::inSupWaitFreeThreadCopying, createfilelist::int, evf::FastMonState::inThrottled, dqmiolumiharvest::j, LogDebug, eostools::ls(), evf::EvFDaqDirector::lumisectionDiscarded(), maxBufferedFiles_, SiStripPI::min, eostools::move(), mReader_, mWakeup_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, evf::EvFDaqDirector::numConcurrentLumis(), evf::EvFDaqDirector::parseFRDFileHeader(), castor_dqm_sourceclient_file_cfg::path, funct::pow(), quit_threads_, InputChunk::readComplete_, readingFilesCount_, evf::FastMonitoringService::reportLockWait(), InputChunk::reset(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, setExceptionState_, setMonStateSup(), edm::shutdown_flag, singleBufferMode_, evf::FastMonitoringService::startedLookingForFile(), startupCv_, startupLock_, edm_modernize_messagelogger::stat, mps_update::status, evf::FastMonitoringService::stoppedLookingForFile(), AlCaHLTBitMon_QueryRunRegistry::string, thread_quit_signal, tid_active_, mps_check::timeout, evf::EvFDaqDirector::unlockFULocal(), evf::EvFDaqDirector::updateFuLock(), useFileBroker_, workerJob_, workerPool_, and workerThreads_.

Referenced by checkNext().

782  {
783  bool stop = false;
784  unsigned int currentLumiSection = 0;
785 
786  {
787  std::unique_lock<std::mutex> lk(startupLock_);
788  startupCv_.notify_one();
789  }
790 
791  uint32_t ls = 0;
792  uint32_t monLS = 1;
793  uint32_t lockCount = 0;
794  uint64_t sumLockWaitTimeUs = 0.;
795 
796  while (!stop) {
797  //wait for at least one free thread and chunk
798  int counter = 0;
799 
800  while ((workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty() ||
802  //report state to monitoring
803  if (fms_) {
804  bool copy_active = false;
805  for (auto j : tid_active_)
806  if (j)
807  copy_active = true;
810  else if (freeChunks_.empty()) {
811  if (copy_active)
813  else
815  } else {
816  if (copy_active)
818  else
820  }
821  }
822  std::unique_lock<std::mutex> lkw(mWakeup_);
823  //sleep until woken up by condition or a timeout
824  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
825  counter++;
826  if (!(counter % 6000)) {
827  edm::LogWarning("FedRawDataInputSource")
828  << "No free chunks or threads. Worker pool empty:" << workerPool_.empty()
829  << ", free chunks empty:" << freeChunks_.empty() << ", number of files buffered:" << readingFilesCount_
830  << " / " << maxBufferedFiles_;
831  }
832  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
833  } else {
834  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
835  }
836  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
837  stop = true;
838  break;
839  }
840  }
841  //if this is reached, there are enough buffers and threads to proceed or processing is instructed to stop
842 
843  if (stop)
844  break;
845 
846  //look for a new file
847  std::string nextFile;
848  uint32_t fileSizeIndex;
849  int64_t fileSizeFromMetadata;
850 
851  if (fms_) {
854  }
855 
857  uint16_t rawHeaderSize = 0;
858  uint32_t lsFromRaw = 0;
859  int32_t serverEventsInNewFile = -1;
860  int rawFd = -1;
861 
862  int backoff_exp = 0;
863 
864  //entering loop which tries to grab new file from ramdisk
866  //check if hltd has signalled to throttle input
867  counter = 0;
868  while (daqDirector_->inputThrottled()) {
869  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed))
870  break;
871 
872  unsigned int nConcurrentLumis = daqDirector_->numConcurrentLumis();
873  unsigned int nOtherLumis = nConcurrentLumis > 0 ? nConcurrentLumis - 1 : 0;
874  unsigned int checkLumiStart = currentLumiSection > nOtherLumis ? currentLumiSection - nOtherLumis : 1;
875  bool hasDiscardedLumi = false;
876  for (unsigned int i = checkLumiStart; i <= currentLumiSection; i++) {
878  edm::LogWarning("FedRawDataInputSource") << "Source detected that the lumisection is discarded -: " << i;
879  hasDiscardedLumi = true;
880  break;
881  }
882  }
883  if (hasDiscardedLumi)
884  break;
885 
887 
888  if (!(counter % 50))
889  edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, reading files is paused...";
890  usleep(100000);
891  counter++;
892  }
893 
894  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
895  stop = true;
896  break;
897  }
898 
899  assert(rawFd == -1);
900  uint64_t thisLockWaitTimeUs = 0.;
902  if (fileListMode_) {
903  //return LS if LS not set, otherwise return file
904  status = getFile(ls, nextFile, fileSizeIndex, thisLockWaitTimeUs);
906  uint16_t rawDataType;
908  rawFd,
909  rawHeaderSize,
910  rawDataType,
911  lsFromRaw,
912  serverEventsInNewFile,
913  fileSizeFromMetadata,
914  false,
915  false,
916  false) != 0) {
917  //error
918  setExceptionState_ = true;
919  stop = true;
920  break;
921  }
922  if (!getLSFromFilename_)
923  ls = lsFromRaw;
924  }
925  } else if (!useFileBroker_)
927  ls, nextFile, fileSizeIndex, rawHeaderSize, thisLockWaitTimeUs, setExceptionState_);
928  else {
929  status = daqDirector_->getNextFromFileBroker(currentLumiSection,
930  ls,
931  nextFile,
932  rawFd,
933  rawHeaderSize,
934  serverEventsInNewFile,
935  fileSizeFromMetadata,
936  thisLockWaitTimeUs);
937  }
938 
940 
941  //cycle through all remaining LS even if no files get assigned
942  if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
944 
945  //monitoring of lock wait time
946  if (thisLockWaitTimeUs > 0.)
947  sumLockWaitTimeUs += thisLockWaitTimeUs;
948  lockCount++;
949  if (ls > monLS) {
950  monLS = ls;
951  if (lockCount)
952  if (fms_)
953  fms_->reportLockWait(monLS, sumLockWaitTimeUs, lockCount);
954  lockCount = 0;
955  sumLockWaitTimeUs = 0;
956  }
957 
958  //check again for any remaining index/EoLS files after EoR file is seen
961  usleep(100000);
962  //now all files should have appeared in ramdisk, check again if any raw files were left behind
964  ls, nextFile, fileSizeIndex, rawHeaderSize, thisLockWaitTimeUs, setExceptionState_);
965  if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
967  }
968 
970  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runEnded));
971  fileQueue_.push(std::move(inf));
972  stop = true;
973  break;
974  }
975 
976  //error from filelocking function
978  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
979  fileQueue_.push(std::move(inf));
980  stop = true;
981  break;
982  }
983  //queue new lumisection
984  if (getLSFromFilename_) {
985  if (ls > currentLumiSection) {
986  if (!useFileBroker_) {
987  //file locking
988  //setMonStateSup(inSupNewLumi);
989  currentLumiSection = ls;
990  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
991  fileQueue_.push(std::move(inf));
992  } else {
993  //new file service
994  if (currentLumiSection == 0 && !alwaysStartFromFirstLS_) {
996  //start transitions from LS specified by env, continue if not reached
997  if (ls < daqDirector_->getStartLumisectionFromEnv()) {
998  //skip file if from earlier LS than specified by env
999  if (rawFd != -1) {
1000  close(rawFd);
1001  rawFd = -1;
1002  }
1004  continue;
1005  } else {
1006  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, ls));
1007  fileQueue_.push(std::move(inf));
1008  }
1009  } else if (ls < 100) {
1010  //look at last LS file on disk to start from that lumisection (only within first 100 LS)
1011  unsigned int lsToStart = daqDirector_->getLumisectionToStart();
1012 
1013  for (unsigned int nextLS = std::min(lsToStart, ls); nextLS <= ls; nextLS++) {
1014  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
1015  fileQueue_.push(std::move(inf));
1016  }
1017  } else {
1018  //start from current LS
1019  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, ls));
1020  fileQueue_.push(std::move(inf));
1021  }
1022  } else {
1023  //queue all lumisections after last one seen to avoid gaps
1024  for (unsigned int nextLS = currentLumiSection + 1; nextLS <= ls; nextLS++) {
1025  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
1026  fileQueue_.push(std::move(inf));
1027  }
1028  }
1029  currentLumiSection = ls;
1030  }
1031  }
1032  //else
1033  if (currentLumiSection > 0 && ls < currentLumiSection) {
1034  edm::LogError("FedRawDataInputSource")
1035  << "Got old LS (" << ls << ") file from EvFDAQDirector! Expected LS:" << currentLumiSection
1036  << ". Aborting execution." << std::endl;
1037  if (rawFd != -1)
1038  close(rawFd);
1039  rawFd = -1;
1040  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
1041  fileQueue_.push(std::move(inf));
1042  stop = true;
1043  break;
1044  }
1045  }
1046 
1047  int dbgcount = 0;
1050  dbgcount++;
1051  if (!(dbgcount % 20))
1052  LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
1053  if (!useFileBroker_)
1054  usleep(100000);
1055  else {
1056  backoff_exp = std::min(4, backoff_exp); // max 1.6 seconds
1057  //backoff_exp=0; // disabled!
1058  int sleeptime = (int)(100000. * pow(2, backoff_exp));
1059  usleep(sleeptime);
1060  backoff_exp++;
1061  }
1062  } else
1063  backoff_exp = 0;
1064  }
1065  //end of file grab loop, parse result
1068  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
1069 
1070  std::string rawFile;
1071  //file service will report raw extension
1072  if (useFileBroker_ || rawHeaderSize)
1073  rawFile = nextFile;
1074  else {
1075  std::filesystem::path rawFilePath(nextFile);
1076  rawFile = rawFilePath.replace_extension(".raw").string();
1077  }
1078 
1079  struct stat st;
1080  int stat_res = stat(rawFile.c_str(), &st);
1081  if (stat_res == -1) {
1082  edm::LogError("FedRawDataInputSource") << "Can not stat file (" << errno << "):-" << rawFile << std::endl;
1083  setExceptionState_ = true;
1084  break;
1085  }
1086  uint64_t fileSize = st.st_size;
1087 
1088  if (fms_) {
1092  }
1093  int eventsInNewFile;
1094  if (fileListMode_) {
1095  if (fileSize == 0)
1096  eventsInNewFile = 0;
1097  else
1098  eventsInNewFile = -1;
1099  } else {
1101  if (!useFileBroker_) {
1102  if (rawHeaderSize) {
1103  int rawFdEmpty = -1;
1104  uint16_t rawHeaderCheck;
1105  bool fileFound;
1106  eventsInNewFile = daqDirector_->grabNextJsonFromRaw(
1107  nextFile, rawFdEmpty, rawHeaderCheck, fileSizeFromMetadata, fileFound, 0, true);
1108  assert(fileFound && rawHeaderCheck == rawHeaderSize);
1110  } else
1111  eventsInNewFile = daqDirector_->grabNextJsonFileAndUnlock(nextFile);
1112  } else
1113  eventsInNewFile = serverEventsInNewFile;
1114  assert(eventsInNewFile >= 0);
1115  assert((eventsInNewFile > 0) ==
1116  (fileSize > rawHeaderSize)); //file without events must be empty or contain only header
1117  }
1118 
1119  if (!singleBufferMode_) {
1120  //calculate number of needed chunks
1121  unsigned int neededChunks = fileSize / eventChunkSize_;
1122  if (fileSize % eventChunkSize_)
1123  neededChunks++;
1124 
1125  std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1126  ls,
1127  rawFile,
1128  !fileListMode_,
1129  rawFd,
1130  fileSize,
1131  rawHeaderSize,
1132  neededChunks,
1133  eventsInNewFile,
1134  this));
1136  auto newInputFilePtr = newInputFile.get();
1137  fileQueue_.push(std::move(newInputFile));
1138 
1139  for (unsigned int i = 0; i < neededChunks; i++) {
1140  if (fms_) {
1141  bool copy_active = false;
1142  for (auto j : tid_active_)
1143  if (j)
1144  copy_active = true;
1145  if (copy_active)
1147  else
1149  }
1150  //get thread
1151  unsigned int newTid = 0xffffffff;
1152  while (!workerPool_.try_pop(newTid)) {
1153  usleep(100000);
1154  if (quit_threads_.load(std::memory_order_relaxed)) {
1155  stop = true;
1156  break;
1157  }
1158  }
1159 
1160  if (fms_) {
1161  bool copy_active = false;
1162  for (auto j : tid_active_)
1163  if (j)
1164  copy_active = true;
1165  if (copy_active)
1167  else
1169  }
1170  InputChunk* newChunk = nullptr;
1171  while (!freeChunks_.try_pop(newChunk)) {
1172  usleep(100000);
1173  if (quit_threads_.load(std::memory_order_relaxed)) {
1174  stop = true;
1175  break;
1176  }
1177  }
1178 
1179  if (newChunk == nullptr) {
1180  //return unused tid if we received shutdown (nullptr chunk)
1181  if (newTid != 0xffffffff)
1182  workerPool_.push(newTid);
1183  stop = true;
1184  break;
1185  }
1186  if (stop)
1187  break;
1189 
1190  std::unique_lock<std::mutex> lk(mReader_);
1191 
1192  unsigned int toRead = eventChunkSize_;
1193  if (i == neededChunks - 1 && fileSize % eventChunkSize_)
1194  toRead = fileSize % eventChunkSize_;
1195  newChunk->reset(i * eventChunkSize_, toRead, i);
1196 
1197  workerJob_[newTid].first = newInputFilePtr;
1198  workerJob_[newTid].second = newChunk;
1199 
1200  //wake up the worker thread
1201  cvReader_[newTid]->notify_one();
1202  }
1203  } else {
1204  if (!eventsInNewFile) {
1205  if (rawFd) {
1206  close(rawFd);
1207  rawFd = -1;
1208  }
1209  //still queue file for lumi update
1210  std::unique_lock<std::mutex> lkw(mWakeup_);
1211  //TODO: also file with only file header fits in this edge case. Check if read correctly in single buffer mode
1212  std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1213  ls,
1214  rawFile,
1215  !fileListMode_,
1216  rawFd,
1217  fileSize,
1218  rawHeaderSize,
1219  (rawHeaderSize > 0),
1220  0,
1221  this));
1223  fileQueue_.push(std::move(newInputFile));
1224  cvWakeup_.notify_one();
1225  break;
1226  }
1227  //in single-buffer mode put single chunk in the file and let the main thread read the file
1228  InputChunk* newChunk = nullptr;
1229  //should be available immediately
1230  while (!freeChunks_.try_pop(newChunk)) {
1231  usleep(100000);
1232  if (quit_threads_.load(std::memory_order_relaxed)) {
1233  stop = true;
1234  break;
1235  }
1236  }
1237 
1238  if (newChunk == nullptr) {
1239  stop = true;
1240  }
1241 
1242  if (stop)
1243  break;
1244 
1245  std::unique_lock<std::mutex> lkw(mWakeup_);
1246 
1247  unsigned int toRead = eventChunkSize_;
1248  if (fileSize % eventChunkSize_)
1249  toRead = fileSize % eventChunkSize_;
1250  newChunk->reset(0, toRead, 0);
1251  newChunk->readComplete_ = true;
1252 
1253  //push file and wakeup main thread
1254  std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1255  ls,
1256  rawFile,
1257  !fileListMode_,
1258  rawFd,
1259  fileSize,
1260  rawHeaderSize,
1261  1,
1262  eventsInNewFile,
1263  this));
1264  newInputFile->chunks_[0] = newChunk;
1266  fileQueue_.push(std::move(newInputFile));
1267  cvWakeup_.notify_one();
1268  }
1269  }
1270  }
1272  //make sure threads finish reading
1273  unsigned numFinishedThreads = 0;
1274  while (numFinishedThreads < workerThreads_.size()) {
1275  unsigned tid = 0;
1276  while (!workerPool_.try_pop(tid)) {
1277  usleep(10000);
1278  }
1279  std::unique_lock<std::mutex> lk(mReader_);
1280  thread_quit_signal[tid] = true;
1281  cvReader_[tid]->notify_one();
1282  numFinishedThreads++;
1283  }
1284  for (unsigned int i = 0; i < workerThreads_.size(); i++) {
1285  workerThreads_[i]->join();
1286  delete workerThreads_[i];
1287  }
1288 }
std::condition_variable cvWakeup_
tbb::concurrent_queue< unsigned int > workerPool_
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
volatile std::atomic< bool > shutdown_flag
tbb::concurrent_queue< std::unique_ptr< InputFile > > fileQueue_
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
bool lumisectionDiscarded(unsigned int ls)
Log< level::Error, false > LogError
unsigned int numConcurrentLumis() const
int timeout
Definition: mps_check.py:53
assert(be >=bs)
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint16_t &rawDataType, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs, bool requireHeader=true)
void reset(uint64_t newOffset, uint64_t toRead, unsigned int fileIndex)
void setMonStateSup(evf::FastMonState::InputState state)
int grabNextJsonFileAndUnlock(std::filesystem::path const &jsonSourcePath)
std::vector< std::thread * > workerThreads_
std::atomic< bool > readComplete_
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, uint64_t &lockWaitTime, bool &setExceptionState)
def ls(path, rec=False)
Definition: eostools.py:349
unsigned int getStartLumisectionFromEnv() const
unsigned long long uint64_t
Definition: Time.h:13
std::condition_variable startupCv_
void stoppedLookingForFile(unsigned int lumi)
evf::EvFDaqDirector * daqDirector_
std::vector< unsigned int > thread_quit_signal
unsigned int getLumisectionToStart() const
std::atomic< unsigned int > readingFilesCount_
std::vector< std::unique_ptr< std::condition_variable > > cvReader_
evf::FastMonitoringService * fms_
std::vector< unsigned int > tid_active_
tbb::concurrent_queue< InputChunk * > freeChunks_
Log< level::Warning, false > LogWarning
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile, bool requireHeader=true)
Power< A, B >::type pow(const A &a, const B &b)
Definition: Power.h:29
def move(src, dest)
Definition: eostools.py:511
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
#define LogDebug(id)

◆ readWorker()

void FedRawDataInputSource::readWorker ( unsigned int  tid)
private

Definition at line 1290 of file FedRawDataInputSource.cc.

References cms::cuda::assert(), InputChunk::buf_, cvReader_, cvWakeup_, detectedFRDversion_, change_name::diff, mps_fire::end, eventChunkBlock_, geometryDiff::file, InputChunk::fileIndex_, dqmdumpme::first, edm::streamer::FRDHeaderMaxVersion, mps_fire::i, ALPAKA_ACCELERATOR_NAMESPACE::caPixelDoublets::if(), dqmdumpme::last, LogDebug, SiStripPI::min, mReader_, submitPVValidationJobs::now, numConcurrentReads_, InputChunk::offset_, read(), readBlocks_, InputChunk::readComplete_, edm::second(), setExceptionState_, runEdmFileComparison::skipped, command_line::start, startupCv_, startupLock_, thread_quit_signal, tid_active_, InputChunk::usedSize_, workerJob_, and workerPool_.

Referenced by FedRawDataInputSource().

1290  {
1291  bool init = true;
1292 
1293  while (true) {
1294  tid_active_[tid] = false;
1295  std::unique_lock<std::mutex> lk(mReader_);
1296  workerJob_[tid].first = nullptr;
1297  workerJob_[tid].first = nullptr;
1298 
1299  assert(!thread_quit_signal[tid]); //should never get it here
1300  workerPool_.push(tid);
1301 
1302  if (init) {
1303  std::unique_lock<std::mutex> lks(startupLock_);
1304  init = false;
1305  startupCv_.notify_one();
1306  }
1307  cvWakeup_.notify_all();
1308  cvReader_[tid]->wait(lk);
1309  lk.unlock();
1310 
1311  if (thread_quit_signal[tid])
1312  return;
1313  tid_active_[tid] = true;
1314 
1315  InputFile* file;
1316  InputChunk* chunk;
1317 
1318  assert(workerJob_[tid].first != nullptr && workerJob_[tid].second != nullptr);
1319 
1320  file = workerJob_[tid].first;
1321  chunk = workerJob_[tid].second;
1322 
1323  //skip reading initial header size in first chunk if inheriting file descriptor (already set at appropriate position)
1324  unsigned int bufferLeft = (chunk->offset_ == 0 && file->rawFd_ != -1) ? file->rawHeaderSize_ : 0;
1325 
1326  //if only one worker thread exists, use single fd for all operations
1327  //if more worker threads exist, use rawFd_ for only the first read operation and then close file
1328  int fileDescriptor;
1329  bool fileOpenedHere = false;
1330 
1331  if (numConcurrentReads_ == 1) {
1332  fileDescriptor = file->rawFd_;
1333  if (fileDescriptor == -1) {
1334  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1335  fileOpenedHere = true;
1336  file->rawFd_ = fileDescriptor;
1337  }
1338  } else {
1339  if (chunk->offset_ == 0) {
1340  fileDescriptor = file->rawFd_;
1341  file->rawFd_ = -1;
1342  if (fileDescriptor == -1) {
1343  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1344  fileOpenedHere = true;
1345  }
1346  } else {
1347  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1348  fileOpenedHere = true;
1349  }
1350  }
1351 
1352  if (fileDescriptor < 0) {
1353  edm::LogError("FedRawDataInputSource") << "readWorker failed to open file -: " << file->fileName_
1354  << " fd:" << fileDescriptor << " error: " << strerror(errno);
1355  setExceptionState_ = true;
1356  continue;
1357  }
1358  if (fileOpenedHere) { //fast forward to this chunk position
1359  off_t pos = 0;
1360  pos = lseek(fileDescriptor, chunk->offset_, SEEK_SET);
1361  if (pos == -1) {
1362  edm::LogError("FedRawDataInputSource")
1363  << "readWorker failed to seek file -: " << file->fileName_ << " fd:" << fileDescriptor << " to offset "
1364  << chunk->offset_ << " error: " << strerror(errno);
1365  setExceptionState_ = true;
1366  continue;
1367  }
1368  }
1369 
1370  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_
1371  << " at offset " << lseek(fileDescriptor, 0, SEEK_CUR);
1372 
1373  unsigned int skipped = bufferLeft;
1375  for (unsigned int i = 0; i < readBlocks_; i++) {
1376  ssize_t last;
1377 
1378  //protect against reading into next block
1379  last = ::read(fileDescriptor,
1380  (void*)(chunk->buf_ + bufferLeft),
1381  std::min(chunk->usedSize_ - bufferLeft, (uint64_t)eventChunkBlock_));
1382 
1383  if (last < 0) {
1384  edm::LogError("FedRawDataInputSource") << "readWorker failed to read file -: " << file->fileName_
1385  << " fd:" << fileDescriptor << " error: " << strerror(errno);
1386  setExceptionState_ = true;
1387  break;
1388  }
1389  if (last > 0)
1390  bufferLeft += last;
1391  if (last < eventChunkBlock_) { //last read
1392  //check if this is last block, then total read size must match file size
1393  if (!(chunk->usedSize_ - skipped == i * eventChunkBlock_ + (unsigned int)last)) {
1394  edm::LogError("FedRawDataInputSource")
1395  << "readWorker failed to read file -: " << file->fileName_ << " fd:" << fileDescriptor << " last:" << last
1396  << " expectedChunkSize:" << chunk->usedSize_
1397  << " readChunkSize:" << (skipped + i * eventChunkBlock_ + last) << " skipped:" << skipped
1398  << " block:" << (i + 1) << "/" << readBlocks_ << " error: " << strerror(errno);
1399  setExceptionState_ = true;
1400  }
1401  break;
1402  }
1403  }
1404  if (setExceptionState_)
1405  continue;
1406 
1408  auto diff = end - start;
1409  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1410  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1411  << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1412  << " GB/s)";
1413 
1414  if (chunk->offset_ + bufferLeft == file->fileSize_) { //file reading finished using same fd
1415  close(fileDescriptor);
1416  fileDescriptor = -1;
1417  if (numConcurrentReads_ == 1)
1418  file->rawFd_ = -1;
1419  }
1420  if (numConcurrentReads_ > 1 && fileDescriptor != -1)
1421  close(fileDescriptor);
1422 
1423  //detect FRD event version. Skip file Header if it exists
1424  if (detectedFRDversion_ == 0 && chunk->offset_ == 0) {
1425  detectedFRDversion_ = *((uint16_t*)(chunk->buf_ + file->rawHeaderSize_));
1426  }
1428  chunk->readComplete_ =
1429  true; //this is atomic to secure the sequential buffer fill before becoming available for processing)
1430  file->chunks_[chunk->fileIndex_] = chunk; //put the completed chunk in the file chunk vector at predetermined index
1431  }
1432 }
Definition: start.py:1
std::condition_variable cvWakeup_
void read(edm::EventPrincipal &eventPrincipal) override
tbb::concurrent_queue< unsigned int > workerPool_
constexpr size_t FRDHeaderMaxVersion
std::vector< ReaderInfo > workerJob_
Log< level::Error, false > LogError
assert(be >=bs)
U second(std::pair< T, U > const &p)
unsigned char * buf_
unsigned int fileIndex_
std::atomic< bool > readComplete_
Definition: init.py:1
unsigned long long uint64_t
Definition: Time.h:13
std::condition_variable startupCv_
std::vector< unsigned int > thread_quit_signal
std::vector< std::unique_ptr< std::condition_variable > > cvReader_
std::vector< unsigned int > tid_active_
if(threadIdxLocalY==0 &&threadIdxLocalX==0)
#define LogDebug(id)

◆ reportEventsThisLumiInSource()

void FedRawDataInputSource::reportEventsThisLumiInSource ( unsigned int  lumi,
unsigned int  events 
)
private

Definition at line 1602 of file FedRawDataInputSource.cc.

References events, CommonMethods::lock(), monlock_, and sourceEventsReport_.

Referenced by checkNext(), and getNextEvent().

1602  {
1603  std::lock_guard<std::mutex> lock(monlock_);
1604  auto itr = sourceEventsReport_.find(lumi);
1605  if (itr != sourceEventsReport_.end())
1606  itr->second += events;
1607  else
1609 }
std::map< unsigned int, unsigned int > sourceEventsReport_
int events

◆ rewind_()

void FedRawDataInputSource::rewind_ ( )
overrideprivatevirtual

Reimplemented from edm::InputSource.

Definition at line 780 of file FedRawDataInputSource.cc.

780 {}

◆ setMonState()

void FedRawDataInputSource::setMonState ( evf::FastMonState::InputState  state)
inlineprotected

Definition at line 1439 of file FedRawDataInputSource.cc.

References fms_, evf::FastMonitoringService::setInState(), and edm::InputSource::state().

Referenced by InputFile::advance(), checkNext(), getNextEvent(), and read().

1439  {
1440  if (fms_)
1441  fms_->setInState(state);
1442 }
ItemTypeInfo state() const
Definition: InputSource.h:361
evf::FastMonitoringService * fms_
void setInState(FastMonState::InputState inputState)

◆ setMonStateSup()

void FedRawDataInputSource::setMonStateSup ( evf::FastMonState::InputState  state)
inlineprotected

Definition at line 1444 of file FedRawDataInputSource.cc.

References fms_, evf::FastMonitoringService::setInStateSup(), and edm::InputSource::state().

Referenced by readSupervisor().

1444  {
1445  if (fms_)
1447 }
void setInStateSup(FastMonState::InputState inputState)
ItemTypeInfo state() const
Definition: InputSource.h:361
evf::FastMonitoringService * fms_

◆ threadError()

void FedRawDataInputSource::threadError ( )
private

Definition at line 1434 of file FedRawDataInputSource.cc.

References Exception, and quit_threads_.

Referenced by InputFile::advance(), and getNextEvent().

1434  {
1435  quit_threads_ = true;
1436  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1437 }
std::atomic< bool > quit_threads_

Friends And Related Function Documentation

◆ InputChunk

friend struct InputChunk
friend

Definition at line 44 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

◆ InputFile

friend class InputFile
friend

Definition at line 43 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

Member Data Documentation

◆ alwaysStartFromFirstLS_

const bool FedRawDataInputSource::alwaysStartFromFirstLS_
private

Definition at line 100 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

◆ bufferInputRead_

uint32_t FedRawDataInputSource::bufferInputRead_ = 0
private

Definition at line 180 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readNextChunkIntoBuffer().

◆ checkEvery_

unsigned int FedRawDataInputSource::checkEvery_ = 10
private

Definition at line 171 of file FedRawDataInputSource.h.

Referenced by read().

◆ chunkIsFree_

bool FedRawDataInputSource::chunkIsFree_ = false
private

Definition at line 144 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

◆ currentFile_

std::unique_ptr<InputFile> FedRawDataInputSource::currentFile_
private

Definition at line 143 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), read(), and ~FedRawDataInputSource().

◆ currentFileIndex_

int FedRawDataInputSource::currentFileIndex_ = -1
private

Definition at line 166 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

◆ currentLumiSection_

unsigned int FedRawDataInputSource::currentLumiSection_
private

Definition at line 123 of file FedRawDataInputSource.h.

Referenced by checkNext(), getNextEvent(), maybeOpenNewLumiSection(), and read().

◆ cvReader_

std::vector<std::unique_ptr<std::condition_variable> > FedRawDataInputSource::cvReader_
private

◆ cvWakeup_

std::condition_variable FedRawDataInputSource::cvWakeup_
private

Definition at line 175 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), readSupervisor(), and readWorker().

◆ daqDirector_

evf::EvFDaqDirector* FedRawDataInputSource::daqDirector_ = nullptr
private

◆ daqProvenanceHelper_

const edm::DaqProvenanceHelper FedRawDataInputSource::daqProvenanceHelper_
private

Definition at line 116 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and read().

◆ defPath_

std::string FedRawDataInputSource::defPath_
private

Definition at line 88 of file FedRawDataInputSource.h.

◆ detectedFRDversion_

uint16_t FedRawDataInputSource::detectedFRDversion_ = 0
private

Definition at line 142 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readWorker().

◆ event_

std::unique_ptr<edm::streamer::FRDEventMsgView> FedRawDataInputSource::event_
private

Definition at line 118 of file FedRawDataInputSource.h.

Referenced by checkNext(), fillFEDRawDataCollection(), getNextEvent(), and read().

◆ eventChunkBlock_

unsigned int FedRawDataInputSource::eventChunkBlock_
private

◆ eventChunkSize_

unsigned int FedRawDataInputSource::eventChunkSize_
private

◆ eventID_

edm::EventID FedRawDataInputSource::eventID_
private

Definition at line 120 of file FedRawDataInputSource.h.

Referenced by read().

◆ eventRunNumber_

uint32_t FedRawDataInputSource::eventRunNumber_ = 0
private

Definition at line 124 of file FedRawDataInputSource.h.

Referenced by checkNext(), and read().

◆ eventsThisLumi_

unsigned int FedRawDataInputSource::eventsThisLumi_
private

Definition at line 128 of file FedRawDataInputSource.h.

Referenced by checkNext(), getNextEvent(), and read().

◆ eventsThisRun_

unsigned long FedRawDataInputSource::eventsThisRun_ = 0
private

Definition at line 129 of file FedRawDataInputSource.h.

◆ fileDeleteLock_

std::mutex FedRawDataInputSource::fileDeleteLock_
private

Definition at line 169 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and read().

◆ fileDescriptor_

int FedRawDataInputSource::fileDescriptor_ = -1
private

Definition at line 179 of file FedRawDataInputSource.h.

Referenced by readNextChunkIntoBuffer().

◆ fileListIndex_

unsigned int FedRawDataInputSource::fileListIndex_ = 0
private

Definition at line 109 of file FedRawDataInputSource.h.

Referenced by getFile().

◆ fileListLoopMode_

const bool FedRawDataInputSource::fileListLoopMode_
private

Definition at line 110 of file FedRawDataInputSource.h.

Referenced by checkNext(), FedRawDataInputSource(), getFile(), and read().

◆ fileListMode_

const bool FedRawDataInputSource::fileListMode_
private

◆ fileNames_

std::vector<std::string> FedRawDataInputSource::fileNames_
private

Definition at line 104 of file FedRawDataInputSource.h.

Referenced by getFile(), and initFileList().

◆ fileNamesToDelete_

std::list<std::pair<int, std::string> > FedRawDataInputSource::fileNamesToDelete_
private

Definition at line 168 of file FedRawDataInputSource.h.

◆ fileQueue_

tbb::concurrent_queue<std::unique_ptr<InputFile> > FedRawDataInputSource::fileQueue_
private

Definition at line 154 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

◆ filesToDelete_

std::list<std::pair<int, std::unique_ptr<InputFile> > > FedRawDataInputSource::filesToDelete_
private

◆ fms_

evf::FastMonitoringService* FedRawDataInputSource::fms_ = nullptr
private

◆ freeChunks_

tbb::concurrent_queue<InputChunk*> FedRawDataInputSource::freeChunks_
private

Definition at line 153 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), read(), and readSupervisor().

◆ fuOutputDir_

std::string FedRawDataInputSource::fuOutputDir_
private

Definition at line 114 of file FedRawDataInputSource.h.

◆ getLSFromFilename_

const bool FedRawDataInputSource::getLSFromFilename_
private

Definition at line 99 of file FedRawDataInputSource.h.

Referenced by checkNext(), getNextEvent(), and readSupervisor().

◆ GTPEventID_

uint32_t FedRawDataInputSource::GTPEventID_ = 0
private

Definition at line 125 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

◆ L1EventID_

uint32_t FedRawDataInputSource::L1EventID_ = 0
private

Definition at line 126 of file FedRawDataInputSource.h.

Referenced by checkNext(), and read().

◆ loopModeIterationInc_

unsigned int FedRawDataInputSource::loopModeIterationInc_ = 0
private

Definition at line 111 of file FedRawDataInputSource.h.

Referenced by getFile().

◆ maxBufferedFiles_

unsigned int FedRawDataInputSource::maxBufferedFiles_
private

Definition at line 94 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

◆ MAXTCDSuTCAFEDID_

uint16_t FedRawDataInputSource::MAXTCDSuTCAFEDID_ = FEDNumbering::MAXTCDSuTCAFEDID
private

Definition at line 132 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and fillFEDRawDataCollection().

◆ MINTCDSuTCAFEDID_

uint16_t FedRawDataInputSource::MINTCDSuTCAFEDID_ = FEDNumbering::MINTCDSuTCAFEDID
private

Definition at line 131 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and fillFEDRawDataCollection().

◆ monlock_

std::mutex FedRawDataInputSource::monlock_
private

Definition at line 185 of file FedRawDataInputSource.h.

Referenced by getEventReport(), and reportEventsThisLumiInSource().

◆ mReader_

std::mutex FedRawDataInputSource::mReader_
private

Definition at line 156 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), readWorker(), and ~FedRawDataInputSource().

◆ mWakeup_

std::mutex FedRawDataInputSource::mWakeup_
private

Definition at line 174 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

◆ numBuffers_

unsigned int FedRawDataInputSource::numBuffers_
private

Definition at line 93 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

◆ numConcurrentReads_

unsigned int FedRawDataInputSource::numConcurrentReads_
private

Definition at line 95 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and readWorker().

◆ processHistoryID_

edm::ProcessHistoryID FedRawDataInputSource::processHistoryID_
private

Definition at line 121 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), maybeOpenNewLumiSection(), and read().

◆ quit_threads_

std::atomic<bool> FedRawDataInputSource::quit_threads_
private

◆ readBlocks_

unsigned int FedRawDataInputSource::readBlocks_
private

◆ readingFilesCount_

std::atomic<unsigned int> FedRawDataInputSource::readingFilesCount_
private

Definition at line 96 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

◆ readSupervisorThread_

std::unique_ptr<std::thread> FedRawDataInputSource::readSupervisorThread_
private

Definition at line 147 of file FedRawDataInputSource.h.

Referenced by checkNext(), and ~FedRawDataInputSource().

◆ runNumber_

edm::RunNumber_t FedRawDataInputSource::runNumber_
private

Definition at line 113 of file FedRawDataInputSource.h.

Referenced by checkNext(), and FedRawDataInputSource().

◆ setExceptionState_

bool FedRawDataInputSource::setExceptionState_ = false
private

Definition at line 162 of file FedRawDataInputSource.h.

Referenced by exceptionState(), getNextEvent(), readSupervisor(), and readWorker().

◆ singleBufferMode_

bool FedRawDataInputSource::singleBufferMode_
private

Definition at line 178 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

◆ sourceEventsReport_

std::map<unsigned int, unsigned int> FedRawDataInputSource::sourceEventsReport_
private

Definition at line 184 of file FedRawDataInputSource.h.

Referenced by getEventReport(), and reportEventsThisLumiInSource().

◆ startedSupervisorThread_

bool FedRawDataInputSource::startedSupervisorThread_ = false
private

Definition at line 146 of file FedRawDataInputSource.h.

Referenced by checkNext(), and ~FedRawDataInputSource().

◆ startupCv_

std::condition_variable FedRawDataInputSource::startupCv_
private

◆ startupLock_

std::mutex FedRawDataInputSource::startupLock_
private

◆ streamFileTracker_

std::vector<int> FedRawDataInputSource::streamFileTracker_
private

Definition at line 170 of file FedRawDataInputSource.h.

Referenced by read().

◆ tcds_pointer_

unsigned char* FedRawDataInputSource::tcds_pointer_
private

Definition at line 127 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

◆ testTCDSFEDRange_

const std::vector<unsigned int> FedRawDataInputSource::testTCDSFEDRange_
private

Definition at line 103 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

◆ thread_quit_signal

std::vector<unsigned int> FedRawDataInputSource::thread_quit_signal
private

◆ threadInit_

std::atomic<bool> FedRawDataInputSource::threadInit_
private

Definition at line 182 of file FedRawDataInputSource.h.

◆ tid_active_

std::vector<unsigned int> FedRawDataInputSource::tid_active_
private

Definition at line 158 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), readSupervisor(), and readWorker().

◆ useFileBroker_

bool FedRawDataInputSource::useFileBroker_
private

◆ useL1EventID_

const bool FedRawDataInputSource::useL1EventID_
private

Definition at line 102 of file FedRawDataInputSource.h.

Referenced by read().

◆ verifyChecksum_

const bool FedRawDataInputSource::verifyChecksum_
private

Definition at line 101 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

◆ workerJob_

std::vector<ReaderInfo> FedRawDataInputSource::workerJob_
private

Definition at line 151 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), readSupervisor(), and readWorker().

◆ workerPool_

tbb::concurrent_queue<unsigned int> FedRawDataInputSource::workerPool_
private

Definition at line 150 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), and readWorker().

◆ workerThreads_

std::vector<std::thread*> FedRawDataInputSource::workerThreads_
private