CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
List of all members | Public Member Functions | Static Public Member Functions | Protected Member Functions | Private Types | Private Member Functions | Private Attributes | Friends
FedRawDataInputSource Class Reference

#include <FedRawDataInputSource.h>

Inheritance diagram for FedRawDataInputSource:
edm::RawInputSource edm::InputSource

Public Member Functions

 FedRawDataInputSource (edm::ParameterSet const &, edm::InputSourceDescription const &)
 
std::pair< bool, unsigned int > getEventReport (unsigned int lumi, bool erase)
 
 ~FedRawDataInputSource () override
 
- Public Member Functions inherited from edm::RawInputSource
 RawInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
 
 ~RawInputSource () override
 
- Public Member Functions inherited from edm::InputSource
std::shared_ptr< ActivityRegistryactReg () const
 Accessor for Activity Registry. More...
 
std::shared_ptr
< BranchIDListHelper const > 
branchIDListHelper () const
 Accessors for branchIDListHelper. More...
 
std::shared_ptr
< BranchIDListHelper > & 
branchIDListHelper ()
 
void closeFile (FileBlock *, bool cleaningUpAfterException)
 close current file More...
 
void doBeginJob ()
 Called by framework at beginning of job. More...
 
virtual void doBeginLumi (LuminosityBlockPrincipal &lbp, ProcessContext const *)
 Called by framework at beginning of lumi block. More...
 
virtual void doBeginRun (RunPrincipal &rp, ProcessContext const *)
 Called by framework at beginning of run. More...
 
void doEndJob ()
 Called by framework at end of job. More...
 
void fillProcessBlockHelper ()
 Fill the ProcessBlockHelper with info for the current file. More...
 
ProcessingController::ForwardState forwardState () const
 
bool goToEvent (EventID const &eventID)
 
 InputSource (ParameterSet const &, InputSourceDescription const &)
 Constructor. More...
 
 InputSource (InputSource const &)=delete
 
void issueReports (EventID const &eventID, StreamID streamID)
 issue an event report More...
 
LuminosityBlockNumber_t luminosityBlock () const
 Accessor for current luminosity block number. More...
 
std::shared_ptr
< LuminosityBlockAuxiliary
luminosityBlockAuxiliary () const
 Called by the framework to merge or insert lumi in principal cache. More...
 
int maxEvents () const
 
int maxLuminosityBlocks () const
 
ModuleDescription const & moduleDescription () const
 Accessor for 'module' description. More...
 
ItemType nextItemType ()
 Advances the source to the next item. More...
 
bool nextProcessBlock (ProcessBlockPrincipal &)
 Next process block, return false if there is none, sets the processName in the principal. More...
 
InputSourceoperator= (InputSource const &)=delete
 
std::shared_ptr
< ProcessBlockHelper const > 
processBlockHelper () const
 Accessors for processBlockHelper. More...
 
std::shared_ptr
< ProcessBlockHelper > & 
processBlockHelper ()
 
ProcessConfiguration const & processConfiguration () const
 Accessor for Process Configuration. More...
 
std::string const & processGUID () const
 Accessor for global process identifier. More...
 
ProcessHistoryRegistry const & processHistoryRegistry () const
 Accessors for process history registry. More...
 
ProcessHistoryRegistryprocessHistoryRegistry ()
 
ProcessingMode processingMode () const
 RunsLumisAndEvents (default), RunsAndLumis, or Runs. More...
 
std::shared_ptr
< ProductRegistry const > 
productRegistry () const
 Accessors for product registry. More...
 
std::shared_ptr
< ProductRegistry > & 
productRegistry ()
 
bool randomAccess () const
 
void readAndMergeLumi (LuminosityBlockPrincipal &lbp)
 Read next luminosity block (same as a prior lumi) More...
 
void readAndMergeRun (RunPrincipal &rp)
 Read next run (same as a prior run) More...
 
void readEvent (EventPrincipal &ep, StreamContext &)
 Read next event. More...
 
bool readEvent (EventPrincipal &ep, EventID const &, StreamContext &)
 Read a specific event. More...
 
std::shared_ptr< FileBlockreadFile ()
 Read next file. More...
 
void readLuminosityBlock (LuminosityBlockPrincipal &lumiPrincipal, HistoryAppender &historyAppender)
 Read next luminosity block (new lumi) More...
 
std::shared_ptr
< LuminosityBlockAuxiliary
readLuminosityBlockAuxiliary ()
 Read next luminosity block Auxilary. More...
 
void readProcessBlock (ProcessBlockPrincipal &)
 Read next process block. More...
 
void readRun (RunPrincipal &runPrincipal, HistoryAppender &historyAppender)
 Read next run (new run) More...
 
std::shared_ptr< RunAuxiliaryreadRunAuxiliary ()
 Read next run Auxiliary. More...
 
ProcessHistoryID const & reducedProcessHistoryID () const
 
virtual void registerProducts ()
 Register any produced products. More...
 
int remainingEvents () const
 
int remainingLuminosityBlocks () const
 
void repeat ()
 Reset the remaining number of events/lumis to the maximum number. More...
 
std::pair
< SharedResourcesAcquirer
*, std::recursive_mutex * > 
resourceSharedWithDelayedReader ()
 Returns nullptr if no resource shared between the Source and a DelayedReader. More...
 
ProcessingController::ReverseState reverseState () const
 
void rewind ()
 Begin again at the first event. More...
 
RunNumber_t run () const
 Accessor for current run number. More...
 
std::shared_ptr< RunAuxiliaryrunAuxiliary () const
 Called by the framework to merge or insert run in principal cache. More...
 
void setLuminosityBlockNumber_t (LuminosityBlockNumber_t lb)
 Set the luminosity block ID. More...
 
void setRunNumber (RunNumber_t r)
 Set the run number. More...
 
void skipEvents (int offset)
 
std::shared_ptr
< ThinnedAssociationsHelper
const > 
thinnedAssociationsHelper () const
 Accessors for thinnedAssociationsHelper. More...
 
std::shared_ptr
< ThinnedAssociationsHelper > & 
thinnedAssociationsHelper ()
 
Timestamp const & timestamp () const
 Accessor for the current time, as seen by the input source. More...
 
virtual ~InputSource () noexcept(false)
 Destructor. More...
 

Static Public Member Functions

static void fillDescriptions (edm::ConfigurationDescriptions &descriptions)
 
- Static Public Member Functions inherited from edm::RawInputSource
static void fillDescription (ParameterSetDescription &description)
 
- Static Public Member Functions inherited from edm::InputSource
static const std::string & baseType ()
 
static void fillDescription (ParameterSetDescription &desc)
 
static void fillDescriptions (ConfigurationDescriptions &descriptions)
 
static void prevalidate (ConfigurationDescriptions &)
 

Protected Member Functions

Next checkNext () override
 
void read (edm::EventPrincipal &eventPrincipal) override
 
void setMonState (evf::FastMonState::InputState state)
 
void setMonStateSup (evf::FastMonState::InputState state)
 
- Protected Member Functions inherited from edm::RawInputSource
void makeEvent (EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
 
void setInputFileTransitionsEachEvent ()
 
- Protected Member Functions inherited from edm::InputSource
virtual void beginJob ()
 Begin protected makes it easier to do template programming. More...
 
void decreaseRemainingEventsBy (int iSkipped)
 
bool eventCached () const
 
bool newLumi () const
 
bool newRun () const
 
ProcessHistoryRegistryprocessHistoryRegistryForUpdate ()
 
ProductRegistryproductRegistryUpdate ()
 
void reset () const
 
void resetEventCached ()
 
void resetLuminosityBlockAuxiliary (bool isNewLumi=true) const
 
void resetNewLumi ()
 
void resetNewRun ()
 
void resetRunAuxiliary (bool isNewRun=true) const
 
void setEventCached ()
 Called by the framework to merge or ached() const {return eventCached_;}. More...
 
void setLuminosityBlockAuxiliary (LuminosityBlockAuxiliary *lbp)
 
void setNewLumi ()
 
void setNewRun ()
 
void setRunAuxiliary (RunAuxiliary *rp)
 
void setTimestamp (Timestamp const &theTime)
 To set the current time, as seen by the input source. More...
 
virtual void skip (int offset)
 
ItemType state () const
 

Private Types

typedef std::pair< InputFile
*, InputChunk * > 
ReaderInfo
 

Private Member Functions

bool exceptionState ()
 
edm::Timestamp fillFEDRawDataCollection (FEDRawDataCollection &rawData, bool &tcdsInRange)
 
evf::EvFDaqDirector::FileStatus getFile (unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
 
evf::EvFDaqDirector::FileStatus getNextEvent ()
 
long initFileList ()
 
void maybeOpenNewLumiSection (const uint32_t lumiSection)
 
evf::EvFDaqDirector::FileStatus nextEvent ()
 
void readNextChunkIntoBuffer (InputFile *file)
 
void readSupervisor ()
 
void readWorker (unsigned int tid)
 
void reportEventsThisLumiInSource (unsigned int lumi, unsigned int events)
 
void rewind_ () override
 
void threadError ()
 

Private Attributes

const bool alwaysStartFromFirstLS_
 
uint32_t bufferInputRead_ = 0
 
unsigned int checkEvery_ = 10
 
bool chunkIsFree_ = false
 
std::unique_ptr< InputFilecurrentFile_
 
int currentFileIndex_ = -1
 
unsigned int currentLumiSection_
 
std::vector
< std::condition_variable * > 
cvReader_
 
std::condition_variable cvWakeup_
 
evf::EvFDaqDirectordaqDirector_ = nullptr
 
const edm::DaqProvenanceHelper daqProvenanceHelper_
 
std::string defPath_
 
uint16_t detectedFRDversion_ = 0
 
std::unique_ptr< FRDEventMsgViewevent_
 
unsigned int eventChunkBlock_
 
unsigned int eventChunkSize_
 
edm::EventID eventID_
 
uint32_t eventRunNumber_ = 0
 
unsigned int eventsThisLumi_
 
unsigned long eventsThisRun_ = 0
 
std::mutex fileDeleteLock_
 
int fileDescriptor_ = -1
 
unsigned int fileListIndex_ = 0
 
const bool fileListLoopMode_
 
const bool fileListMode_
 
std::vector< std::string > fileNames_
 
std::list< std::pair< int,
std::string > > 
fileNamesToDelete_
 
tbb::concurrent_queue
< std::unique_ptr< InputFile > > 
fileQueue_
 
std::list< std::pair< int,
std::unique_ptr< InputFile > > > 
filesToDelete_
 
evf::FastMonitoringServicefms_ = nullptr
 
tbb::concurrent_queue
< InputChunk * > 
freeChunks_
 
std::string fuOutputDir_
 
const bool getLSFromFilename_
 
uint32_t GTPEventID_ = 0
 
uint32_t L1EventID_ = 0
 
unsigned int loopModeIterationInc_ = 0
 
unsigned int maxBufferedFiles_
 
uint16_t MAXTCDSuTCAFEDID_ = FEDNumbering::MAXTCDSuTCAFEDID
 
uint16_t MINTCDSuTCAFEDID_ = FEDNumbering::MINTCDSuTCAFEDID
 
std::mutex monlock_
 
std::mutex mReader_
 
std::mutex mWakeup_
 
unsigned int nStreams_ = 0
 
unsigned int numBuffers_
 
unsigned int numConcurrentReads_
 
edm::ProcessHistoryID processHistoryID_
 
std::atomic< bool > quit_threads_
 
unsigned int readBlocks_
 
std::atomic< unsigned int > readingFilesCount_
 
std::unique_ptr< std::thread > readSupervisorThread_
 
edm::RunNumber_t runNumber_
 
bool setExceptionState_ = false
 
bool singleBufferMode_
 
std::map< unsigned int,
unsigned int > 
sourceEventsReport_
 
bool startedSupervisorThread_ = false
 
std::condition_variable startupCv_
 
std::mutex startupLock_
 
std::vector< int > streamFileTracker_
 
unsigned char * tcds_pointer_
 
const std::vector< unsigned int > testTCDSFEDRange_
 
std::vector< unsigned int > thread_quit_signal
 
std::atomic< bool > threadInit_
 
std::vector< unsigned int > tid_active_
 
bool useFileBroker_
 
const bool useL1EventID_
 
const bool verifyChecksum_
 
std::vector< ReaderInfoworkerJob_
 
tbb::concurrent_queue
< unsigned int > 
workerPool_
 
std::vector< std::thread * > workerThreads_
 

Friends

struct InputChunk
 
struct InputFile
 Open Root file and provide MEs ############. More...
 

Additional Inherited Members

- Public Types inherited from edm::RawInputSource
enum  Next { Next::kEvent, Next::kFile, Next::kStop }
 
- Public Types inherited from edm::InputSource
enum  ItemType {
  IsInvalid, IsStop, IsFile, IsRun,
  IsLumi, IsEvent, IsRepeat, IsSynchronize
}
 
enum  ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents }
 
- Public Attributes inherited from edm::InputSource
signalslot::Signal< void(StreamContext
const &, ModuleCallingContext
const &)> 
postEventReadFromSourceSignal_
 
signalslot::Signal< void(StreamContext
const &, ModuleCallingContext
const &)> 
preEventReadFromSourceSignal_
 

Detailed Description

Definition at line 40 of file FedRawDataInputSource.h.

Member Typedef Documentation

Definition at line 138 of file FedRawDataInputSource.h.

Constructor & Destructor Documentation

FedRawDataInputSource::FedRawDataInputSource ( edm::ParameterSet const &  pset,
edm::InputSourceDescription const &  desc 
)
explicit

Definition at line 51 of file FedRawDataInputSource.cc.

References cms::cuda::assert(), edm::Timestamp::beginOfTime(), crc32c_hw_test(), cvReader_, daqDirector_, edm::DaqProvenanceHelper::daqInit(), daqProvenanceHelper_, eventChunkBlock_, eventChunkSize_, Exception, fileDeleteLock_, fileListLoopMode_, fileListMode_, filesToDelete_, fms_, freeChunks_, mps_fire::i, evf::FastMonState::inInit, initFileList(), InputChunk, edm::Timestamp::invalidTimestamp(), MAXTCDSuTCAFEDID_, MINTCDSuTCAFEDID_, numBuffers_, numConcurrentReads_, Utilities::operator, processHistoryID_, edm::InputSource::processHistoryRegistryForUpdate(), edm::InputSource::productRegistryUpdate(), quit_threads_, readBlocks_, readingFilesCount_, readWorker(), edm::InputSource::runAuxiliary(), runNumber_, evf::EvFDaqDirector::setDeleteTracking(), evf::EvFDaqDirector::setFMS(), evf::FastMonitoringService::setInputSource(), evf::FastMonitoringService::setInState(), evf::FastMonitoringService::setInStateSup(), edm::InputSource::setNewRun(), edm::InputSource::setRunAuxiliary(), singleBufferMode_, startupCv_, startupLock_, testTCDSFEDRange_, thread_quit_signal, threadInit_, tid_active_, evf::EvFDaqDirector::useFileBroker(), useFileBroker_, workerJob_, and workerThreads_.

53  defPath_(pset.getUntrackedParameter<std::string>("buDefPath", "")),
54  eventChunkSize_(pset.getUntrackedParameter<unsigned int>("eventChunkSize", 32) * 1048576),
55  eventChunkBlock_(pset.getUntrackedParameter<unsigned int>("eventChunkBlock", 32) * 1048576),
56  numBuffers_(pset.getUntrackedParameter<unsigned int>("numBuffers", 2)),
57  maxBufferedFiles_(pset.getUntrackedParameter<unsigned int>("maxBufferedFiles", 2)),
58  getLSFromFilename_(pset.getUntrackedParameter<bool>("getLSFromFilename", true)),
59  alwaysStartFromFirstLS_(pset.getUntrackedParameter<bool>("alwaysStartFromFirstLS", false)),
60  verifyChecksum_(pset.getUntrackedParameter<bool>("verifyChecksum", true)),
61  useL1EventID_(pset.getUntrackedParameter<bool>("useL1EventID", false)),
63  pset.getUntrackedParameter<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())),
64  fileNames_(pset.getUntrackedParameter<std::vector<std::string>>("fileNames", std::vector<std::string>())),
65  fileListMode_(pset.getUntrackedParameter<bool>("fileListMode", false)),
66  fileListLoopMode_(pset.getUntrackedParameter<bool>("fileListLoopMode", false)),
69  eventID_(),
72  tcds_pointer_(nullptr),
73  eventsThisLumi_(0) {
74  char thishost[256];
75  gethostname(thishost, 255);
76  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: " << std::endl
77  << (eventChunkSize_ / 1048576) << " MB on host " << thishost;
78 
79  if (!testTCDSFEDRange_.empty()) {
80  if (testTCDSFEDRange_.size() != 2) {
81  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection")
82  << "Invalid TCDS Test FED range parameter";
83  }
86  }
87 
88  long autoRunNumber = -1;
89  if (fileListMode_) {
90  autoRunNumber = initFileList();
91  if (!fileListLoopMode_) {
92  if (autoRunNumber < 0)
93  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") << "Run number not found from filename";
94  //override run number
95  runNumber_ = (edm::RunNumber_t)autoRunNumber;
96  edm::Service<evf::EvFDaqDirector>()->overrideRunNumber((unsigned int)autoRunNumber);
97  }
98  }
99 
101  setNewRun();
102  //todo:autodetect from file name (assert if names differ)
104 
105  //make sure that chunk size is N * block size
110 
111  if (!numBuffers_)
112  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource")
113  << "no reading enabled with numBuffers parameter 0";
114 
117  readingFilesCount_ = 0;
118 
119  if (!crc32c_hw_test())
120  edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
121 
122  //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
123  if (fileListMode_) {
124  try {
126  } catch (cms::Exception const&) {
127  edm::LogInfo("FedRawDataInputSource") << "No FastMonitoringService found in the configuration";
128  }
129  } else {
131  if (!fms_) {
132  throw cms::Exception("FedRawDataInputSource") << "FastMonitoringService not found";
133  }
134  }
135 
137  if (!daqDirector_)
138  cms::Exception("FedRawDataInputSource") << "EvFDaqDirector not found";
139 
141  if (useFileBroker_)
142  edm::LogInfo("FedRawDataInputSource") << "EvFDaqDirector/Source configured to use file service";
143  //set DaqDirector to delete files in preGlobalEndLumi callback
145  if (fms_) {
147  fms_->setInputSource(this);
150  }
151  //should delete chunks when run stops
152  for (unsigned int i = 0; i < numBuffers_; i++) {
154  }
155 
156  quit_threads_ = false;
157 
158  for (unsigned int i = 0; i < numConcurrentReads_; i++) {
159  std::unique_lock<std::mutex> lk(startupLock_);
160  //issue a memory fence here and in threads (constructor was segfaulting without this)
161  thread_quit_signal.push_back(false);
162  workerJob_.push_back(ReaderInfo(nullptr, nullptr));
163  cvReader_.push_back(new std::condition_variable);
164  tid_active_.push_back(0);
165  threadInit_.store(false, std::memory_order_release);
166  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker, this, i));
167  startupCv_.wait(lk);
168  }
169 
170  runAuxiliary()->setProcessHistoryID(processHistoryID_);
171 }
std::vector< std::string > fileNames_
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:330
static Timestamp invalidTimestamp()
Definition: Timestamp.h:82
bool crc32c_hw_test()
Definition: crc32c.cc:354
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
std::pair< InputFile *, InputChunk * > ReaderInfo
assert(be >=bs)
std::vector< std::condition_variable * > cvReader_
bool useFileBroker() const
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
static Timestamp beginOfTime()
Definition: Timestamp.h:84
const std::vector< unsigned int > testTCDSFEDRange_
edm::ProcessHistoryID processHistoryID_
const edm::DaqProvenanceHelper daqProvenanceHelper_
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
std::vector< std::thread * > workerThreads_
void setInputSource(FedRawDataInputSource *inputSource)
Log< level::Info, false > LogInfo
void setInStateSup(FastMonState::InputState inputState)
std::condition_variable startupCv_
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:230
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:331
std::atomic< bool > threadInit_
evf::EvFDaqDirector * daqDirector_
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:333
void readWorker(unsigned int tid)
std::vector< unsigned int > thread_quit_signal
std::atomic< unsigned int > readingFilesCount_
evf::FastMonitoringService * fms_
std::vector< unsigned int > tid_active_
unsigned int RunNumber_t
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
tbb::concurrent_queue< InputChunk * > freeChunks_
void setFMS(evf::FastMonitoringService *fms)
void setInState(FastMonState::InputState inputState)
FedRawDataInputSource::~FedRawDataInputSource ( )
override

Definition at line 173 of file FedRawDataInputSource.cc.

References cvReader_, filesToDelete_, mps_fire::i, mReader_, numConcurrentReads_, quit_threads_, readSupervisorThread_, startedSupervisorThread_, thread_quit_signal, and workerThreads_.

173  {
174  quit_threads_ = true;
175 
176  //delete any remaining open files
177  for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++)
178  it->second.reset();
179 
181  readSupervisorThread_->join();
182  } else {
183  //join aux threads in case the supervisor thread was not started
184  for (unsigned int i = 0; i < workerThreads_.size(); i++) {
185  std::unique_lock<std::mutex> lk(mReader_);
186  thread_quit_signal[i] = true;
187  cvReader_[i]->notify_one();
188  lk.unlock();
189  workerThreads_[i]->join();
190  delete workerThreads_[i];
191  }
192  }
193  for (unsigned int i = 0; i < numConcurrentReads_; i++)
194  delete cvReader_[i];
195  /*
196  for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
197  InputChunk *ch;
198  while (!freeChunks_.try_pop(ch)) {}
199  delete ch;
200  }
201  */
202 }
std::atomic< bool > quit_threads_
std::unique_ptr< std::thread > readSupervisorThread_
std::vector< std::condition_variable * > cvReader_
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
std::vector< std::thread * > workerThreads_
std::vector< unsigned int > thread_quit_signal

Member Function Documentation

edm::RawInputSource::Next FedRawDataInputSource::checkNext ( )
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 229 of file FedRawDataInputSource.cc.

References visDQMUpload::buf, evf::EvFDaqDirector::createProcessingNotificationMaybe(), currentLumiSection_, daqDirector_, event_, eventRunNumber_, eventsThisLumi_, fileListLoopMode_, fileListMode_, newFWLiteAna::found, evf::EvFDaqDirector::getEoLSFilePathOnBU(), evf::EvFDaqDirector::getEoLSFilePathOnFU(), evf::EvFDaqDirector::getEoRFilePathOnFU(), getLSFromFilename_, evf::FastMonState::inWaitInput, edm::RawInputSource::kEvent, edm::RawInputSource::kStop, L1EventID_, evf::EvFDaqDirector::lockFULocal2(), maybeOpenNewLumiSection(), evf::EvFDaqDirector::newLumi, nextEvent(), evf::EvFDaqDirector::noFile, readSupervisor(), readSupervisorThread_, reportEventsThisLumiInSource(), edm::InputSource::resetLuminosityBlockAuxiliary(), evf::EvFDaqDirector::runEnded, runNumber_, edm::InputSource::setEventCached(), setMonState(), startedSupervisorThread_, startupCv_, startupLock_, edm_modernize_messagelogger::stat, AlCaHLTBitMon_QueryRunRegistry::string, evf::EvFDaqDirector::unlockFULocal2(), and useFileBroker_.

229  {
231  //this thread opens new files and dispatches reading to worker readers
232  //threadInit_.store(false,std::memory_order_release);
233  std::unique_lock<std::mutex> lk(startupLock_);
234  readSupervisorThread_ = std::make_unique<std::thread>(&FedRawDataInputSource::readSupervisor, this);
236  startupCv_.wait(lk);
237  }
238  //signal hltd to start event accounting
239  if (!currentLumiSection_)
242  switch (nextEvent()) {
244  //maybe create EoL file in working directory before ending run
245  struct stat buf;
246  if (!useFileBroker_ && currentLumiSection_ > 0) {
247  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
248  if (eolFound) {
250  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
251  if (!found) {
253  int eol_fd =
254  open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
255  close(eol_fd);
257  }
258  }
259  }
260  //also create EoR file in FU data directory
261  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(), &buf) == 0);
262  if (!eorFound) {
263  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(),
264  O_RDWR | O_CREAT,
265  S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
266  close(eor_fd);
267  }
269  eventsThisLumi_ = 0;
271  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
272  return Next::kStop;
273  }
275  //this is not reachable
276  return Next::kEvent;
277  }
279  //std::cout << "--------------NEW LUMI---------------" << std::endl;
280  return Next::kEvent;
281  }
282  default: {
283  if (!getLSFromFilename_) {
284  //get new lumi from file header
285  if (event_->lumi() > currentLumiSection_) {
287  eventsThisLumi_ = 0;
289  }
290  }
293  else
294  eventRunNumber_ = event_->run();
295  L1EventID_ = event_->event();
296 
297  setEventCached();
298 
299  return Next::kEvent;
300  }
301  }
302 }
void maybeOpenNewLumiSection(const uint32_t lumiSection)
void createProcessingNotificationMaybe() const
std::unique_ptr< std::thread > readSupervisorThread_
std::string getEoLSFilePathOnBU(const unsigned int ls) const
void setMonState(evf::FastMonState::InputState state)
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:362
Log< level::Info, false > LogInfo
std::unique_ptr< FRDEventMsgView > event_
std::condition_variable startupCv_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:345
evf::EvFDaqDirector * daqDirector_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
std::string getEoLSFilePathOnFU(const unsigned int ls) const
evf::EvFDaqDirector::FileStatus nextEvent()
std::string getEoRFilePathOnFU() const
bool FedRawDataInputSource::exceptionState ( )
inlineprivate

Definition at line 68 of file FedRawDataInputSource.h.

References setExceptionState_.

Referenced by InputFile::advance().

void FedRawDataInputSource::fillDescriptions ( edm::ConfigurationDescriptions descriptions)
static

Definition at line 204 of file FedRawDataInputSource.cc.

References edm::ConfigurationDescriptions::add(), edm::ParameterSetDescription::addUntracked(), submitPVResolutionJobs::desc, edm::ParameterSetDescription::setAllowAnything(), edm::ParameterSetDescription::setComment(), and edm::ParameterDescriptionNode::setComment().

204  {
206  desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk");
207  desc.addUntracked<unsigned int>("eventChunkSize", 32)->setComment("Input buffer (chunk) size");
208  desc.addUntracked<unsigned int>("eventChunkBlock", 32)
209  ->setComment("Block size used in a single file read call (must be smaller or equal to buffer size)");
210  desc.addUntracked<unsigned int>("numBuffers", 2)->setComment("Number of buffers used for reading input");
211  desc.addUntracked<unsigned int>("maxBufferedFiles", 2)
212  ->setComment("Maximum number of simultaneously buffered raw files");
213  desc.addUntracked<unsigned int>("alwaysStartFromfirstLS", false)
214  ->setComment("Force source to start from LS 1 if server provides higher lumisection number");
215  desc.addUntracked<bool>("verifyChecksum", true)
216  ->setComment("Verify event CRC-32C checksum of FRDv5 and higher or Adler32 with v3 and v4");
217  desc.addUntracked<bool>("useL1EventID", false)
218  ->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
219  desc.addUntracked<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())
220  ->setComment("[min, max] range to search for TCDS FED ID in test setup");
221  desc.addUntracked<bool>("fileListMode", false)
222  ->setComment("Use fileNames parameter to directly specify raw files to open");
223  desc.addUntracked<std::vector<std::string>>("fileNames", std::vector<std::string>())
224  ->setComment("file list used when fileListMode is enabled");
225  desc.setAllowAnything();
226  descriptions.add("source", desc);
227 }
void setComment(std::string const &value)
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
void setAllowAnything()
allow any parameter label/value pairs
void setComment(std::string const &value)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection ( FEDRawDataCollection rawData,
bool &  tcdsInRange 
)
private

Definition at line 687 of file FedRawDataInputSource.cc.

References cms::cuda::assert(), FEDRawData::data(), edmPickEvents::event, event_, evf::evtn::evm_board_sense(), Exception, FEDRawDataCollection::FEDData(), l1tstage2_dqm_sourceclient-live_cfg::fedId, FEDTrailer::fragmentLength(), evf::evtn::get(), evf::evtn::getgpshigh(), evf::evtn::getgpslow(), evf::evtn::gtpe_board_sense(), evf::evtn::gtpe_get(), GTPEventID_, FEDHeader::length, FEDTrailer::length, FEDNumbering::MAXFEDID, FEDNumbering::MAXTCDSuTCAFEDID, MAXTCDSuTCAFEDID_, FEDNumbering::MINTCDSuTCAFEDID, MINTCDSuTCAFEDID_, FEDNumbering::MINTriggerEGTPFEDID, FEDNumbering::MINTriggerGTPFEDID, FEDRawData::resize(), FEDHeader::sourceID(), and tcds_pointer_.

Referenced by read().

687  {
688  edm::TimeValue_t time;
689  timeval stv;
690  gettimeofday(&stv, nullptr);
691  time = stv.tv_sec;
692  time = (time << 32) + stv.tv_usec;
693  edm::Timestamp tstamp(time);
694 
695  uint32_t eventSize = event_->eventSize();
696  unsigned char* event = (unsigned char*)event_->payload();
697  GTPEventID_ = 0;
698  tcds_pointer_ = nullptr;
699  tcdsInRange = false;
700  uint16_t selectedTCDSFed = 0;
701  while (eventSize > 0) {
702  assert(eventSize >= FEDTrailer::length);
703  eventSize -= FEDTrailer::length;
704  const FEDTrailer fedTrailer(event + eventSize);
705  const uint32_t fedSize = fedTrailer.fragmentLength() << 3; //trailer length counts in 8 bytes
706  assert(eventSize >= fedSize - FEDHeader::length);
707  eventSize -= (fedSize - FEDHeader::length);
708  const FEDHeader fedHeader(event + eventSize);
709  const uint16_t fedId = fedHeader.sourceID();
710  if (fedId > FEDNumbering::MAXFEDID) {
711  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
712  } else if (fedId >= MINTCDSuTCAFEDID_ && fedId <= MAXTCDSuTCAFEDID_) {
713  if (!selectedTCDSFed) {
714  selectedTCDSFed = fedId;
715  tcds_pointer_ = event + eventSize;
717  tcdsInRange = true;
718  }
719  } else
720  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection")
721  << "Second TCDS FED ID " << fedId << " found. First ID: " << selectedTCDSFed;
722  }
723  if (fedId == FEDNumbering::MINTriggerGTPFEDID) {
724  if (evf::evtn::evm_board_sense(event + eventSize, fedSize))
725  GTPEventID_ = evf::evtn::get(event + eventSize, true);
726  else
727  GTPEventID_ = evf::evtn::get(event + eventSize, false);
728  //evf::evtn::evm_board_setformat(fedSize);
729  const uint64_t gpsl = evf::evtn::getgpslow(event + eventSize);
730  const uint64_t gpsh = evf::evtn::getgpshigh(event + eventSize);
731  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t>((gpsh << 32) + gpsl));
732  }
733  //take event ID from GTPE FED
734  if (fedId == FEDNumbering::MINTriggerEGTPFEDID && GTPEventID_ == 0) {
735  if (evf::evtn::gtpe_board_sense(event + eventSize)) {
736  GTPEventID_ = evf::evtn::gtpe_get(event + eventSize);
737  }
738  }
739  FEDRawData& fedData = rawData.FEDData(fedId);
740  fedData.resize(fedSize);
741  memcpy(fedData.data(), event + eventSize, fedSize);
742  }
743  assert(eventSize == 0);
744 
745  return tstamp;
746 }
unsigned int getgpshigh(const unsigned char *)
bool gtpe_board_sense(const unsigned char *p)
static const uint32_t length
Definition: FEDTrailer.h:57
unsigned int get(const unsigned char *, bool)
static const uint32_t length
Definition: FEDHeader.h:54
assert(be >=bs)
const FEDRawData & FEDData(int fedid) const
retrieve data for fed
void resize(size_t newsize)
Definition: FEDRawData.cc:28
unsigned long long TimeValue_t
Definition: Timestamp.h:28
bool evm_board_sense(const unsigned char *p, size_t size)
unsigned long long uint64_t
Definition: Time.h:13
std::unique_ptr< FRDEventMsgView > event_
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:24
unsigned int gtpe_get(const unsigned char *)
unsigned int getgpslow(const unsigned char *)
std::pair< bool, unsigned int > FedRawDataInputSource::getEventReport ( unsigned int  lumi,
bool  erase 
)

Definition at line 1541 of file FedRawDataInputSource.cc.

References CommonMethods::lock(), monlock_, runTheMatrix::ret, and sourceEventsReport_.

Referenced by evf::FastMonitoringService::preGlobalEndLumi().

1541  {
1542  std::lock_guard<std::mutex> lock(monlock_);
1543  auto itr = sourceEventsReport_.find(lumi);
1544  if (itr != sourceEventsReport_.end()) {
1545  std::pair<bool, unsigned int> ret(true, itr->second);
1546  if (erase)
1547  sourceEventsReport_.erase(itr);
1548  return ret;
1549  } else
1550  return std::pair<bool, unsigned int>(false, 0);
1551 }
tuple ret
prodAgent to be discontinued
std::map< unsigned int, unsigned int > sourceEventsReport_
list lumi
Definition: dqmdumpme.py:53
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getFile ( unsigned int &  ls,
std::string &  nextFile,
uint32_t &  fsize,
uint64_t &  lockWaitTime 
)
private

Definition at line 1588 of file FedRawDataInputSource.cc.

References fileListIndex_, fileListLoopMode_, MillePedeFileConverter_cfg::fileName, fileNames_, loopModeIterationInc_, evf::EvFDaqDirector::newFile, fed_dqm_sourceclient-live_cfg::path, evf::EvFDaqDirector::runEnded, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by readSupervisor().

1591  {
1592  if (fileListIndex_ < fileNames_.size()) {
1593  nextFile = fileNames_[fileListIndex_];
1594  if (nextFile.find("file://") == 0)
1595  nextFile = nextFile.substr(7);
1596  else if (nextFile.find("file:") == 0)
1597  nextFile = nextFile.substr(5);
1598  std::filesystem::path fileName = nextFile;
1599  std::string fileStem = fileName.stem().string();
1600  if (fileStem.find("ls"))
1601  fileStem = fileStem.substr(fileStem.find("ls") + 2);
1602  if (fileStem.find('_'))
1603  fileStem = fileStem.substr(0, fileStem.find('_'));
1604 
1605  if (!fileListLoopMode_)
1606  ls = std::stoul(fileStem);
1607  else //always starting from LS 1 in loop mode
1608  ls = 1 + loopModeIterationInc_;
1609 
1610  //fsize = 0;
1611  //lockWaitTime = 0;
1612  fileListIndex_++;
1614  } else {
1615  if (!fileListLoopMode_)
1617  else {
1618  //loop through files until interrupted
1620  fileListIndex_ = 0;
1621  return getFile(ls, nextFile, fsize, lockWaitTime);
1622  }
1623  }
1624 }
std::vector< std::string > fileNames_
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
def ls
Definition: eostools.py:349
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent ( )
inlineprivate

Definition at line 350 of file FedRawDataInputSource.cc.

References cms::cuda::assert(), bufferInputRead_, chunkIsFree_, crc32c(), currentFile_, currentFileIndex_, currentLumiSection_, cvWakeup_, daqDirector_, detectedFRDversion_, event_, eventChunkSize_, eventsThisLumi_, Exception, fileDeleteLock_, fileListMode_, fileQueue_, filesToDelete_, fms_, FRDHeaderMaxVersion, FRDHeaderVersionSize, freeChunks_, getLSFromFilename_, evf::FastMonState::inCachedEvent, evf::FastMonState::inChecksumEvent, evf::FastMonState::inChunkReceived, evf::FastMonState::inNewLumi, evf::FastMonState::inProcessingFile, evf::FastMonState::inRunEnd, evf::FastMonState::inWaitChunk, evf::FastMonState::inWaitInput, evf::EvFDaqDirector::isSingleStreamThread(), dttmaxenums::L, maybeOpenNewLumiSection(), eostools::move(), mWakeup_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, readingFilesCount_, readNextChunkIntoBuffer(), reportEventsThisLumiInSource(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, evf::EvFDaqDirector::sameFile, evf::FastMonitoringService::setExceptionDetected(), setExceptionState_, setMonState(), singleBufferMode_, mps_update::status, threadError(), mps_check::timeout, and verifyChecksum_.

Referenced by nextEvent().

350  {
351  if (setExceptionState_)
352  threadError();
353  if (!currentFile_.get()) {
356  if (!fileQueue_.try_pop(currentFile_)) {
357  //sleep until wakeup (only in single-buffer mode) or timeout
358  std::unique_lock<std::mutex> lkw(mWakeup_);
359  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_.get())
361  }
362  status = currentFile_->status_;
363  if (status == evf::EvFDaqDirector::runEnded) {
365  currentFile_.reset();
366  return status;
367  } else if (status == evf::EvFDaqDirector::runAbort) {
368  throw cms::Exception("FedRawDataInputSource::getNextEvent")
369  << "Run has been aborted by the input source reader thread";
370  } else if (status == evf::EvFDaqDirector::newLumi) {
372  if (getLSFromFilename_) {
373  if (currentFile_->lumi_ > currentLumiSection_) {
375  eventsThisLumi_ = 0;
377  }
378  } else { //let this be picked up from next event
380  }
381  currentFile_.reset();
382  return status;
383  } else if (status == evf::EvFDaqDirector::newFile) {
385  } else
386  assert(false);
387  }
389 
390  //file is empty
391  if (!currentFile_->fileSize_) {
393  //try to open new lumi
394  assert(currentFile_->nChunks_ == 0);
395  if (getLSFromFilename_)
396  if (currentFile_->lumi_ > currentLumiSection_) {
398  eventsThisLumi_ = 0;
400  }
401  //immediately delete empty file
402  currentFile_.reset();
404  }
405 
406  //file is finished
407  if (currentFile_->bufferPosition_ == currentFile_->fileSize_) {
409  //release last chunk (it is never released elsewhere)
410  freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]);
411  if (currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_)) {
412  throw cms::Exception("FedRawDataInputSource::getNextEvent")
413  << "Fully processed " << currentFile_->nProcessed_ << " from the file " << currentFile_->fileName_
414  << " but according to BU JSON there should be " << currentFile_->nEvents_ << " events";
415  }
416  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
417  if (singleBufferMode_) {
418  std::unique_lock<std::mutex> lkw(mWakeup_);
419  cvWakeup_.notify_one();
420  }
421  bufferInputRead_ = 0;
423  //put the file in pending delete list;
424  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
425  filesToDelete_.push_back(std::pair<int, std::unique_ptr<InputFile>>(currentFileIndex_, std::move(currentFile_)));
426  } else {
427  //in single-thread and stream jobs, events are already processed
428  currentFile_.reset();
429  }
431  }
432 
433  //handle RAW file header
434  if (currentFile_->bufferPosition_ == 0 && currentFile_->rawHeaderSize_ > 0) {
435  if (currentFile_->fileSize_ <= currentFile_->rawHeaderSize_) {
436  if (currentFile_->fileSize_ < currentFile_->rawHeaderSize_)
437  throw cms::Exception("FedRawDataInputSource::getNextEvent")
438  << "Premature end of input file while reading file header";
439 
440  edm::LogWarning("FedRawDataInputSource")
441  << "File with only raw header and no events received in LS " << currentFile_->lumi_;
442  if (getLSFromFilename_)
443  if (currentFile_->lumi_ > currentLumiSection_) {
445  eventsThisLumi_ = 0;
447  }
448  }
449 
450  //advance buffer position to skip file header (chunk will be acquired later)
451  currentFile_->chunkPosition_ += currentFile_->rawHeaderSize_;
452  currentFile_->bufferPosition_ += currentFile_->rawHeaderSize_;
453  }
454 
455  //file is too short
456  if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < FRDHeaderVersionSize[detectedFRDversion_]) {
457  throw cms::Exception("FedRawDataInputSource::getNextEvent")
458  << "Premature end of input file while reading event header";
459  }
460  if (singleBufferMode_) {
461  //should already be there
463  while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
464  usleep(10000);
465  if (currentFile_->parent_->exceptionState() || setExceptionState_)
466  currentFile_->parent_->threadError();
467  }
469 
470  unsigned char* dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
471 
472  //conditions when read amount is not sufficient for the header to fit
473  if (!bufferInputRead_ || bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_] ||
474  eventChunkSize_ - currentFile_->chunkPosition_ < FRDHeaderVersionSize[detectedFRDversion_]) {
476 
477  if (detectedFRDversion_ == 0) {
478  detectedFRDversion_ = *((uint16_t*)dataPosition);
479  if (detectedFRDversion_ > FRDHeaderMaxVersion)
480  throw cms::Exception("FedRawDataInputSource::getNextEvent")
481  << "Unknown FRD version -: " << detectedFRDversion_;
482  assert(detectedFRDversion_ >= 1);
483  }
484 
485  //recalculate chunk position
486  dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
487  if (bufferInputRead_ < FRDHeaderVersionSize[detectedFRDversion_]) {
488  throw cms::Exception("FedRawDataInputSource::getNextEvent")
489  << "Premature end of input file while reading event header";
490  }
491  }
492 
493  event_ = std::make_unique<FRDEventMsgView>(dataPosition);
494  if (event_->size() > eventChunkSize_) {
495  throw cms::Exception("FedRawDataInputSource::getNextEvent")
496  << " event id:" << event_->event() << " lumi:" << event_->lumi() << " run:" << event_->run()
497  << " of size:" << event_->size() << " bytes does not fit into a chunk of size:" << eventChunkSize_
498  << " bytes";
499  }
500 
501  const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];
502 
503  if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize) {
504  throw cms::Exception("FedRawDataInputSource::getNextEvent")
505  << "Premature end of input file while reading event data";
506  }
507  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
509  //recalculate chunk position
510  dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
511  event_ = std::make_unique<FRDEventMsgView>(dataPosition);
512  }
513  currentFile_->bufferPosition_ += event_->size();
514  currentFile_->chunkPosition_ += event_->size();
515  //last chunk is released when this function is invoked next time
516 
517  }
518  //multibuffer mode:
519  else {
520  //wait for the current chunk to become added to the vector
522  while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
523  usleep(10000);
524  if (setExceptionState_)
525  threadError();
526  }
528 
529  //check if header is at the boundary of two chunks
530  chunkIsFree_ = false;
531  unsigned char* dataPosition;
532 
533  //read header, copy it to a single chunk if necessary
534  bool chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_]);
535 
536  event_ = std::make_unique<FRDEventMsgView>(dataPosition);
537  if (event_->size() > eventChunkSize_) {
538  throw cms::Exception("FedRawDataInputSource::getNextEvent")
539  << " event id:" << event_->event() << " lumi:" << event_->lumi() << " run:" << event_->run()
540  << " of size:" << event_->size() << " bytes does not fit into a chunk of size:" << eventChunkSize_
541  << " bytes";
542  }
543 
544  const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];
545 
546  if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize) {
547  throw cms::Exception("FedRawDataInputSource::getNextEvent")
548  << "Premature end of input file while reading event data";
549  }
550 
551  if (chunkEnd) {
552  //header was at the chunk boundary, we will have to move payload as well
553  currentFile_->moveToPreviousChunk(msgSize, FRDHeaderVersionSize[detectedFRDversion_]);
554  chunkIsFree_ = true;
555  } else {
556  //header was contiguous, but check if payload fits the chunk
557  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
558  //rewind to header start position
559  currentFile_->rewindChunk(FRDHeaderVersionSize[detectedFRDversion_]);
560  //copy event to a chunk start and move pointers
561 
563 
564  chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_] + msgSize);
565 
567 
568  assert(chunkEnd);
569  chunkIsFree_ = true;
570  //header is moved
571  event_ = std::make_unique<FRDEventMsgView>(dataPosition);
572  } else {
573  //everything is in a single chunk, only move pointers forward
574  chunkEnd = currentFile_->advance(dataPosition, msgSize);
575  assert(!chunkEnd);
576  chunkIsFree_ = false;
577  }
578  }
579  } //end multibuffer mode
581 
582  if (verifyChecksum_ && event_->version() >= 5) {
583  uint32_t crc = 0;
584  crc = crc32c(crc, (const unsigned char*)event_->payload(), event_->eventSize());
585  if (crc != event_->crc32c()) {
586  if (fms_)
588  throw cms::Exception("FedRawDataInputSource::getNextEvent")
589  << "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() << " but calculated 0x"
590  << crc;
591  }
592  } else if (verifyChecksum_ && event_->version() >= 3) {
593  uint32_t adler = adler32(0L, Z_NULL, 0);
594  adler = adler32(adler, (Bytef*)event_->payload(), event_->eventSize());
595 
596  if (adler != event_->adler32()) {
597  if (fms_)
599  throw cms::Exception("FedRawDataInputSource::getNextEvent")
600  << "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() << " but calculated 0x"
601  << adler;
602  }
603  }
605 
606  currentFile_->nProcessed_++;
607 
609 }
std::condition_variable cvWakeup_
constexpr size_t FRDHeaderMaxVersion
void setExceptionDetected(unsigned int ls)
void maybeOpenNewLumiSection(const uint32_t lumiSection)
tbb::concurrent_queue< std::unique_ptr< InputFile > > fileQueue_
list status
Definition: mps_update.py:107
int timeout
Definition: mps_check.py:53
assert(be >=bs)
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
def move
Definition: eostools.py:511
void setMonState(evf::FastMonState::InputState state)
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
Definition: crc32c.cc:340
std::unique_ptr< FRDEventMsgView > event_
evf::EvFDaqDirector * daqDirector_
std::atomic< unsigned int > readingFilesCount_
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
evf::FastMonitoringService * fms_
constexpr std::array< uint32, FRDHeaderMaxVersion+1 > FRDHeaderVersionSize
tbb::concurrent_queue< InputChunk * > freeChunks_
Log< level::Warning, false > LogWarning
void readNextChunkIntoBuffer(InputFile *file)
std::unique_ptr< InputFile > currentFile_
long FedRawDataInputSource::initFileList ( )
private

Definition at line 1553 of file FedRawDataInputSource.cc.

References a, b, dataset::end, cppFunctionSkipper::exception, MillePedeFileConverter_cfg::fileName, fileNames_, fed_dqm_sourceclient-live_cfg::path, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FedRawDataInputSource().

1553  {
1554  std::sort(fileNames_.begin(), fileNames_.end(), [](std::string a, std::string b) {
1555  if (a.rfind('/') != std::string::npos)
1556  a = a.substr(a.rfind('/'));
1557  if (b.rfind('/') != std::string::npos)
1558  b = b.substr(b.rfind('/'));
1559  return b > a;
1560  });
1561 
1562  if (!fileNames_.empty()) {
1563  //get run number from first file in the vector
1565  std::string fileStem = fileName.stem().string();
1566  if (fileStem.find("file://") == 0)
1567  fileStem = fileStem.substr(7);
1568  else if (fileStem.find("file:") == 0)
1569  fileStem = fileStem.substr(5);
1570  auto end = fileStem.find('_');
1571 
1572  if (fileStem.find("run") == 0) {
1573  std::string runStr = fileStem.substr(3, end - 3);
1574  try {
1575  //get long to support test run numbers < 2^32
1576  long rval = std::stol(runStr);
1577  edm::LogInfo("FedRawDataInputSource") << "Autodetected run number in fileListMode -: " << rval;
1578  return rval;
1579  } catch (const std::exception&) {
1580  edm::LogWarning("FedRawDataInputSource")
1581  << "Unable to autodetect run number in fileListMode from file -: " << fileName;
1582  }
1583  }
1584  }
1585  return -1;
1586 }
std::vector< std::string > fileNames_
Log< level::Info, false > LogInfo
double b
Definition: hdecay.h:118
double a
Definition: hdecay.h:119
string end
Definition: dataset.py:937
Log< level::Warning, false > LogWarning
void FedRawDataInputSource::maybeOpenNewLumiSection ( const uint32_t  lumiSection)
private

Definition at line 304 of file FedRawDataInputSource.cc.

References visDQMUpload::buf, evf::EvFDaqDirector::createBoLSFile(), currentLumiSection_, daqDirector_, newFWLiteAna::found, evf::EvFDaqDirector::getEoLSFilePathOnFU(), edm::Timestamp::invalidTimestamp(), evf::EvFDaqDirector::lockFULocal2(), edm::InputSource::luminosityBlock(), edm::InputSource::luminosityBlockAuxiliary(), processHistoryID_, edm::InputSource::resetLuminosityBlockAuxiliary(), edm::InputSource::run(), edm::InputSource::runAuxiliary(), edm::InputSource::setLuminosityBlockAuxiliary(), edm_modernize_messagelogger::stat, AlCaHLTBitMon_QueryRunRegistry::string, evf::EvFDaqDirector::unlockFULocal2(), and useFileBroker_.

Referenced by checkNext(), and getNextEvent().

304  {
305  if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
306  if (!useFileBroker_) {
307  if (currentLumiSection_ > 0) {
309  struct stat buf;
310  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
311  if (!found) {
313  int eol_fd =
314  open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
315  close(eol_fd);
316  daqDirector_->createBoLSFile(lumiSection, false);
318  }
319  } else
320  daqDirector_->createBoLSFile(lumiSection, true); //needed for initial lumisection
321  }
322 
323  currentLumiSection_ = lumiSection;
324 
326 
327  timeval tv;
328  gettimeofday(&tv, nullptr);
329  const edm::Timestamp lsopentime((unsigned long long)tv.tv_sec * 1000000 + (unsigned long long)tv.tv_usec);
330 
332  runAuxiliary()->run(), lumiSection, lsopentime, edm::Timestamp::invalidTimestamp());
333 
334  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
335  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
336 
337  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: " << lumiSection;
338  }
339 }
static Timestamp invalidTimestamp()
Definition: Timestamp.h:82
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:457
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:337
edm::ProcessHistoryID processHistoryID_
Log< level::Info, false > LogInfo
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:462
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:230
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:345
evf::EvFDaqDirector * daqDirector_
std::string getEoLSFilePathOnFU(const unsigned int ls) const
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:233
evf::EvFDaqDirector::FileStatus FedRawDataInputSource::nextEvent ( )
inlineprivate

Definition at line 341 of file FedRawDataInputSource.cc.

References getNextEvent(), svgfig::load(), evf::EvFDaqDirector::noFile, edm::shutdown_flag, and mps_update::status.

Referenced by checkNext().

341  {
343  while ((status = getNextEvent()) == evf::EvFDaqDirector::noFile) {
344  if (edm::shutdown_flag.load(std::memory_order_relaxed))
345  break;
346  }
347  return status;
348 }
volatile std::atomic< bool > shutdown_flag
list status
Definition: mps_update.py:107
def load
Definition: svgfig.py:547
evf::EvFDaqDirector::FileStatus getNextEvent()
void FedRawDataInputSource::read ( edm::EventPrincipal eventPrincipal)
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 611 of file FedRawDataInputSource.cc.

References printConversionInfo::aux, edm::DaqProvenanceHelper::branchDescription(), checkEvery_, chunkIsFree_, runTheMatrix::const, currentFile_, currentFileIndex_, currentLumiSection_, daqProvenanceHelper_, edm::DaqProvenanceHelper::dummyProvenance(), event_, eventID_, eventRunNumber_, eventsThisLumi_, Exception, fileDeleteLock_, fileListLoopMode_, filesToDelete_, fillFEDRawDataCollection(), freeChunks_, GTPEventID_, mps_fire::i, evf::FastMonState::inNoRequest, evf::FastMonState::inReadCleanup, evf::FastMonState::inReadEvent, L1EventID_, FEDHeader::length, edm::RawInputSource::makeEvent(), evf::evtn::makeEventAuxiliary(), eostools::move(), nStreams_, edm::EventAuxiliary::PhysicsTrigger, edm::InputSource::processGUID(), processHistoryID_, edm::EventPrincipal::put(), setMonState(), edm::EventAuxiliary::setProcessHistoryID(), streamFileTracker_, edm::EventPrincipal::streamID(), AlCaHLTBitMon_QueryRunRegistry::string, tcds_pointer_, FEDHeader::triggerType(), and useL1EventID_.

Referenced by edmIntegrityCheck.PublishToFileSystem::get(), readNextChunkIntoBuffer(), and readWorker().

611  {
613  std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
614  bool tcdsInRange;
615  edm::Timestamp tstamp = fillFEDRawDataCollection(*rawData, tcdsInRange);
616 
617  if (useL1EventID_) {
620  aux.setProcessHistoryID(processHistoryID_);
621  makeEvent(eventPrincipal, aux);
622  } else if (tcds_pointer_ == nullptr) {
623  if (!GTPEventID_) {
624  throw cms::Exception("FedRawDataInputSource::read")
625  << "No TCDS or GTP FED in event with FEDHeader EID -: " << L1EventID_;
626  }
629  aux.setProcessHistoryID(processHistoryID_);
630  makeEvent(eventPrincipal, aux);
631  } else {
632  const FEDHeader fedHeader(tcds_pointer_);
633  tcds::Raw_v1 const* tcds = reinterpret_cast<tcds::Raw_v1 const*>(tcds_pointer_ + FEDHeader::length);
638  event_->isRealData(),
639  static_cast<edm::EventAuxiliary::ExperimentType>(fedHeader.triggerType()),
640  processGUID(),
642  !tcdsInRange);
644  makeEvent(eventPrincipal, aux);
645  }
646 
647  std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
648 
650 
651  eventsThisLumi_++;
653 
654  //resize vector if needed
655  while (streamFileTracker_.size() <= eventPrincipal.streamID())
656  streamFileTracker_.push_back(-1);
657 
658  streamFileTracker_[eventPrincipal.streamID()] = currentFileIndex_;
659 
660  //this old file check runs no more often than every 10 events
661  if (!((currentFile_->nProcessed_ - 1) % (checkEvery_))) {
662  //delete files that are not in processing
663  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
664  auto it = filesToDelete_.begin();
665  while (it != filesToDelete_.end()) {
666  bool fileIsBeingProcessed = false;
667  for (unsigned int i = 0; i < nStreams_; i++) {
668  if (it->first == streamFileTracker_.at(i)) {
669  fileIsBeingProcessed = true;
670  break;
671  }
672  }
673  if (!fileIsBeingProcessed) {
674  std::string fileToDelete = it->second->fileName_;
675  it = filesToDelete_.erase(it);
676  } else
677  it++;
678  }
679  }
680  if (chunkIsFree_)
681  freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_ - 1]);
682  chunkIsFree_ = false;
684  return;
685 }
static const uint32_t length
Definition: FEDHeader.h:54
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection &rawData, bool &tcdsInRange)
ProductProvenance const & dummyProvenance() const
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:195
edm::EventAuxiliary makeEventAuxiliary(const tcds::Raw_v1 *, unsigned int runNumber, unsigned int lumiSection, bool isRealData, const edm::EventAuxiliary::ExperimentType &, const std::string &processGUID, bool verifyLumiSection, bool suppressWarning)
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
std::vector< int > streamFileTracker_
def move
Definition: eostools.py:511
StreamID streamID() const
BranchDescription const & branchDescription() const
edm::ProcessHistoryID processHistoryID_
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance) const
const edm::DaqProvenanceHelper daqProvenanceHelper_
void setMonState(evf::FastMonState::InputState state)
std::unique_ptr< FRDEventMsgView > event_
void setProcessHistoryID(ProcessHistoryID const &phid)
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
tbb::concurrent_queue< InputChunk * > freeChunks_
std::unique_ptr< InputFile > currentFile_
void FedRawDataInputSource::readNextChunkIntoBuffer ( InputFile file)
private

Definition at line 1461 of file FedRawDataInputSource.cc.

References bufferInputRead_, InputFile::chunkPosition_, InputFile::chunks_, eventChunkBlock_, eventChunkSize_, Exception, fileDescriptor_, InputFile::fileName_, InputFile::fileSize_, mps_fire::i, dqmdumpme::last, LogDebug, InputFile::rawFd_, InputFile::rawHeaderSize_, read(), and readBlocks_.

Referenced by getNextEvent().

1461  {
1462  uint32_t existingSize = 0;
1463 
1464  if (fileDescriptor_ < 0) {
1465  bufferInputRead_ = 0;
1466  if (file->rawFd_ == -1) {
1467  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1468  if (file->rawHeaderSize_)
1469  lseek(fileDescriptor_, file->rawHeaderSize_, SEEK_SET);
1470  } else
1471  fileDescriptor_ = file->rawFd_;
1472 
1473  //skip header size in destination buffer (chunk position was already adjusted)
1475  existingSize += file->rawHeaderSize_;
1476 
1477  if (fileDescriptor_ >= 0)
1478  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1479  else {
1480  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer")
1481  << "failed to open file " << std::endl
1482  << file->fileName_ << " fd:" << fileDescriptor_;
1483  }
1484  //fill chunk (skipping file header if present)
1485  for (unsigned int i = 0; i < readBlocks_; i++) {
1486  const ssize_t last = ::read(fileDescriptor_,
1487  (void*)(file->chunks_[0]->buf_ + existingSize),
1488  eventChunkBlock_ - (i == readBlocks_ - 1 ? existingSize : 0));
1490  existingSize += last;
1491  }
1492 
1493  } else {
1494  //continue reading
1495  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1496  for (unsigned int i = 0; i < readBlocks_; i++) {
1497  const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1499  existingSize += last;
1500  }
1501  } else {
1502  //event didn't fit in last chunk, so leftover must be moved to the beginning and completed
1503  uint32_t existingSizeLeft = eventChunkSize_ - file->chunkPosition_;
1504  memmove((void*)file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSizeLeft);
1505 
1506  //calculate amount of data that can be added
1507  const uint32_t blockcount = file->chunkPosition_ / eventChunkBlock_;
1508  const uint32_t leftsize = file->chunkPosition_ % eventChunkBlock_;
1509 
1510  for (uint32_t i = 0; i < blockcount; i++) {
1511  const ssize_t last =
1512  ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), eventChunkBlock_);
1514  existingSizeLeft += last;
1515  }
1516  if (leftsize) {
1517  const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), leftsize);
1519  }
1520  file->chunkPosition_ = 0; //data was moved to beginning of the chunk
1521  }
1522  }
1523  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1524  if (fileDescriptor_ != -1) {
1525  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1526  close(fileDescriptor_);
1527  file->rawFd_ = fileDescriptor_ = -1;
1528  }
1529  }
1530 }
uint32_t chunkPosition_
void read(edm::EventPrincipal &eventPrincipal) override
uint16_t rawHeaderSize_
std::string fileName_
tbb::concurrent_vector< InputChunk * > chunks_
tuple last
Definition: dqmdumpme.py:56
#define LogDebug(id)
void FedRawDataInputSource::readSupervisor ( )
private

Definition at line 750 of file FedRawDataInputSource.cc.

References alwaysStartFromFirstLS_, cms::cuda::assert(), counter, cvReader_, cvWakeup_, daqDirector_, relativeConstraints::empty, eventChunkSize_, fileListMode_, fileQueue_, fms_, freeChunks_, getFile(), getLSFromFilename_, evf::EvFDaqDirector::getLumisectionToStart(), evf::EvFDaqDirector::getNextFromFileBroker(), evf::EvFDaqDirector::getStartLumisectionFromEnv(), evf::EvFDaqDirector::grabNextJsonFileAndUnlock(), evf::EvFDaqDirector::grabNextJsonFromRaw(), mps_fire::i, EcalCondDB::inf, InputFile, evf::EvFDaqDirector::inputThrottled(), evf::FastMonState::inRunEnd, evf::FastMonState::inSupBusy, evf::FastMonState::inSupFileLimit, evf::FastMonState::inSupLockPolling, evf::FastMonState::inSupNewFile, evf::FastMonState::inSupNewFileWaitChunk, evf::FastMonState::inSupNewFileWaitChunkCopying, evf::FastMonState::inSupNewFileWaitThread, evf::FastMonState::inSupNewFileWaitThreadCopying, evf::FastMonState::inSupNoFile, evf::FastMonState::inSupWaitFreeChunk, evf::FastMonState::inSupWaitFreeChunkCopying, evf::FastMonState::inSupWaitFreeThread, evf::FastMonState::inSupWaitFreeThreadCopying, evf::FastMonState::inThrottled, dqmiolumiharvest::j, LogDebug, eostools::ls(), maxBufferedFiles_, min(), eostools::move(), mReader_, mWakeup_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, evf::EvFDaqDirector::parseFRDFileHeader(), fed_dqm_sourceclient-live_cfg::path, funct::pow(), quit_threads_, InputChunk::readComplete_, readingFilesCount_, evf::FastMonitoringService::reportLockWait(), InputChunk::reset(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, setExceptionState_, setMonStateSup(), edm::shutdown_flag, singleBufferMode_, evf::FastMonitoringService::startedLookingForFile(), startupCv_, startupLock_, edm_modernize_messagelogger::stat, mps_update::status, evf::FastMonitoringService::stoppedLookingForFile(), AlCaHLTBitMon_QueryRunRegistry::string, thread_quit_signal, tid_active_, mps_check::timeout, evf::EvFDaqDirector::unlockFULocal(), evf::EvFDaqDirector::updateFuLock(), useFileBroker_, workerJob_, workerPool_, and workerThreads_.

Referenced by checkNext().

750  {
751  bool stop = false;
752  unsigned int currentLumiSection = 0;
753  //threadInit_.exchange(true,std::memory_order_acquire);
754 
755  {
756  std::unique_lock<std::mutex> lk(startupLock_);
757  startupCv_.notify_one();
758  }
759 
760  uint32_t ls = 0;
761  uint32_t monLS = 1;
762  uint32_t lockCount = 0;
763  uint64_t sumLockWaitTimeUs = 0.;
764 
765  while (!stop) {
766  //wait for at least one free thread and chunk
767  int counter = 0;
768 
769  while ((workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty() ||
771  //report state to monitoring
772  if (fms_) {
773  bool copy_active = false;
774  for (auto j : tid_active_)
775  if (j)
776  copy_active = true;
779  else if (freeChunks_.empty()) {
780  if (copy_active)
782  else
784  } else {
785  if (copy_active)
787  else
789  }
790  }
791  std::unique_lock<std::mutex> lkw(mWakeup_);
792  //sleep until woken up by condition or a timeout
793  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
794  counter++;
795  //if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads...";
796  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
797  } else {
798  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
799  }
800  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
801  stop = true;
802  break;
803  }
804  }
805  //if this is reached, there are enough buffers and threads to proceed or processing is instructed to stop
806 
807  if (stop)
808  break;
809 
810  //look for a new file
811  std::string nextFile;
812  uint32_t fileSizeIndex;
813  int64_t fileSizeFromMetadata;
814 
815  if (fms_) {
818  }
819 
821  uint16_t rawHeaderSize = 0;
822  uint32_t lsFromRaw = 0;
823  int32_t serverEventsInNewFile = -1;
824  int rawFd = -1;
825 
826  int backoff_exp = 0;
827 
828  //entering loop which tries to grab new file from ramdisk
829  while (status == evf::EvFDaqDirector::noFile) {
830  //check if hltd has signalled to throttle input
831  counter = 0;
832  while (daqDirector_->inputThrottled()) {
833  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed))
834  break;
836  if (!(counter % 50))
837  edm::LogWarning("FedRawDataInputSource") << "Input throttled detected, reading files is paused...";
838  usleep(100000);
839  counter++;
840  }
841 
842  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
843  stop = true;
844  break;
845  }
846 
847  assert(rawFd == -1);
848  uint64_t thisLockWaitTimeUs = 0.;
850  if (fileListMode_) {
851  //return LS if LS not set, otherwise return file
852  status = getFile(ls, nextFile, fileSizeIndex, thisLockWaitTimeUs);
853  if (status == evf::EvFDaqDirector::newFile) {
855  rawFd,
856  rawHeaderSize,
857  lsFromRaw,
858  serverEventsInNewFile,
859  fileSizeFromMetadata,
860  false,
861  false,
862  false) != 0) {
863  //error
864  setExceptionState_ = true;
865  stop = true;
866  break;
867  }
868  if (!getLSFromFilename_)
869  ls = lsFromRaw;
870  }
871  } else if (!useFileBroker_)
872  status = daqDirector_->updateFuLock(
873  ls, nextFile, fileSizeIndex, rawHeaderSize, thisLockWaitTimeUs, setExceptionState_);
874  else {
875  status = daqDirector_->getNextFromFileBroker(currentLumiSection,
876  ls,
877  nextFile,
878  rawFd,
879  rawHeaderSize,
880  serverEventsInNewFile,
881  fileSizeFromMetadata,
882  thisLockWaitTimeUs);
883  }
884 
886 
887  //cycle through all remaining LS even if no files get assigned
888  if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
890 
891  //monitoring of lock wait time
892  if (thisLockWaitTimeUs > 0.)
893  sumLockWaitTimeUs += thisLockWaitTimeUs;
894  lockCount++;
895  if (ls > monLS) {
896  monLS = ls;
897  if (lockCount)
898  if (fms_)
899  fms_->reportLockWait(monLS, sumLockWaitTimeUs, lockCount);
900  lockCount = 0;
901  sumLockWaitTimeUs = 0;
902  }
903 
904  //check again for any remaining index/EoLS files after EoR file is seen
907  usleep(100000);
908  //now all files should have appeared in ramdisk, check again if any raw files were left behind
909  status = daqDirector_->updateFuLock(
910  ls, nextFile, fileSizeIndex, rawHeaderSize, thisLockWaitTimeUs, setExceptionState_);
911  if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
913  }
914 
915  if (status == evf::EvFDaqDirector::runEnded) {
916  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runEnded));
917  fileQueue_.push(std::move(inf));
918  stop = true;
919  break;
920  }
921 
922  //error from filelocking function
923  if (status == evf::EvFDaqDirector::runAbort) {
924  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
925  fileQueue_.push(std::move(inf));
926  stop = true;
927  break;
928  }
929  //queue new lumisection
930  if (getLSFromFilename_) {
931  if (ls > currentLumiSection) {
932  if (!useFileBroker_) {
933  //file locking
934  //setMonStateSup(inSupNewLumi);
935  currentLumiSection = ls;
936  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
937  fileQueue_.push(std::move(inf));
938  } else {
939  //new file service
940  if (currentLumiSection == 0 && !alwaysStartFromFirstLS_) {
942  //start transitions from LS specified by env, continue if not reached
943  if (ls < daqDirector_->getStartLumisectionFromEnv()) {
944  //skip file if from earlier LS than specified by env
945  if (rawFd != -1) {
946  close(rawFd);
947  rawFd = -1;
948  }
950  continue;
951  } else {
952  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, ls));
953  fileQueue_.push(std::move(inf));
954  }
955  } else if (ls < 100) {
956  //look at last LS file on disk to start from that lumisection (only within first 100 LS)
957  unsigned int lsToStart = daqDirector_->getLumisectionToStart();
958 
959  for (unsigned int nextLS = std::min(lsToStart, ls); nextLS <= ls; nextLS++) {
960  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
961  fileQueue_.push(std::move(inf));
962  }
963  } else {
964  //start from current LS
965  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, ls));
966  fileQueue_.push(std::move(inf));
967  }
968  } else {
969  //queue all lumisections after last one seen to avoid gaps
970  for (unsigned int nextLS = currentLumiSection + 1; nextLS <= ls; nextLS++) {
971  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
972  fileQueue_.push(std::move(inf));
973  }
974  }
975  currentLumiSection = ls;
976  }
977  }
978  //else
979  if (currentLumiSection > 0 && ls < currentLumiSection) {
980  edm::LogError("FedRawDataInputSource")
981  << "Got old LS (" << ls << ") file from EvFDAQDirector! Expected LS:" << currentLumiSection
982  << ". Aborting execution." << std::endl;
983  if (rawFd != -1)
984  close(rawFd);
985  rawFd = -1;
986  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
987  fileQueue_.push(std::move(inf));
988  stop = true;
989  break;
990  }
991  }
992 
993  int dbgcount = 0;
994  if (status == evf::EvFDaqDirector::noFile) {
996  dbgcount++;
997  if (!(dbgcount % 20))
998  LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
999  if (!useFileBroker_)
1000  usleep(100000);
1001  else {
1002  backoff_exp = std::min(4, backoff_exp); // max 1.6 seconds
1003  //backoff_exp=0; // disabled!
1004  int sleeptime = (int)(100000. * pow(2, backoff_exp));
1005  usleep(sleeptime);
1006  backoff_exp++;
1007  }
1008  } else
1009  backoff_exp = 0;
1010  }
1011  //end of file grab loop, parse result
1012  if (status == evf::EvFDaqDirector::newFile) {
1014  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
1015 
1016  std::string rawFile;
1017  //file service will report raw extension
1018  if (useFileBroker_ || rawHeaderSize)
1019  rawFile = nextFile;
1020  else {
1021  std::filesystem::path rawFilePath(nextFile);
1022  rawFile = rawFilePath.replace_extension(".raw").string();
1023  }
1024 
1025  struct stat st;
1026  int stat_res = stat(rawFile.c_str(), &st);
1027  if (stat_res == -1) {
1028  edm::LogError("FedRawDataInputSource") << "Can not stat file (" << errno << "):-" << rawFile << std::endl;
1029  setExceptionState_ = true;
1030  break;
1031  }
1032  uint64_t fileSize = st.st_size;
1033 
1034  if (fms_) {
1038  }
1039  int eventsInNewFile;
1040  if (fileListMode_) {
1041  if (fileSize == 0)
1042  eventsInNewFile = 0;
1043  else
1044  eventsInNewFile = -1;
1045  } else {
1047  if (!useFileBroker_) {
1048  if (rawHeaderSize) {
1049  int rawFdEmpty = -1;
1050  uint16_t rawHeaderCheck;
1051  bool fileFound;
1052  eventsInNewFile = daqDirector_->grabNextJsonFromRaw(
1053  nextFile, rawFdEmpty, rawHeaderCheck, fileSizeFromMetadata, fileFound, 0, true);
1054  assert(fileFound && rawHeaderCheck == rawHeaderSize);
1056  } else
1057  eventsInNewFile = daqDirector_->grabNextJsonFileAndUnlock(nextFile);
1058  } else
1059  eventsInNewFile = serverEventsInNewFile;
1060  assert(eventsInNewFile >= 0);
1061  assert((eventsInNewFile > 0) ==
1062  (fileSize > rawHeaderSize)); //file without events must be empty or contain only header
1063  }
1064 
1065  if (!singleBufferMode_) {
1066  //calculate number of needed chunks
1067  unsigned int neededChunks = fileSize / eventChunkSize_;
1068  if (fileSize % eventChunkSize_)
1069  neededChunks++;
1070 
1071  std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1072  ls,
1073  rawFile,
1074  !fileListMode_,
1075  rawFd,
1076  fileSize,
1077  rawHeaderSize,
1078  neededChunks,
1079  eventsInNewFile,
1080  this));
1082  auto newInputFilePtr = newInputFile.get();
1083  fileQueue_.push(std::move(newInputFile));
1084 
1085  for (unsigned int i = 0; i < neededChunks; i++) {
1086  if (fms_) {
1087  bool copy_active = false;
1088  for (auto j : tid_active_)
1089  if (j)
1090  copy_active = true;
1091  if (copy_active)
1093  else
1095  }
1096  //get thread
1097  unsigned int newTid = 0xffffffff;
1098  while (!workerPool_.try_pop(newTid)) {
1099  usleep(100000);
1100  if (quit_threads_.load(std::memory_order_relaxed)) {
1101  stop = true;
1102  break;
1103  }
1104  }
1105 
1106  if (fms_) {
1107  bool copy_active = false;
1108  for (auto j : tid_active_)
1109  if (j)
1110  copy_active = true;
1111  if (copy_active)
1113  else
1115  }
1116  InputChunk* newChunk = nullptr;
1117  while (!freeChunks_.try_pop(newChunk)) {
1118  usleep(100000);
1119  if (quit_threads_.load(std::memory_order_relaxed)) {
1120  stop = true;
1121  break;
1122  }
1123  }
1124 
1125  if (newChunk == nullptr) {
1126  //return unused tid if we received shutdown (nullptr chunk)
1127  if (newTid != 0xffffffff)
1128  workerPool_.push(newTid);
1129  stop = true;
1130  break;
1131  }
1132  if (stop)
1133  break;
1135 
1136  std::unique_lock<std::mutex> lk(mReader_);
1137 
1138  unsigned int toRead = eventChunkSize_;
1139  if (i == neededChunks - 1 && fileSize % eventChunkSize_)
1140  toRead = fileSize % eventChunkSize_;
1141  newChunk->reset(i * eventChunkSize_, toRead, i);
1142 
1143  workerJob_[newTid].first = newInputFilePtr;
1144  workerJob_[newTid].second = newChunk;
1145 
1146  //wake up the worker thread
1147  cvReader_[newTid]->notify_one();
1148  }
1149  } else {
1150  if (!eventsInNewFile) {
1151  if (rawFd) {
1152  close(rawFd);
1153  rawFd = -1;
1154  }
1155  //still queue file for lumi update
1156  std::unique_lock<std::mutex> lkw(mWakeup_);
1157  //TODO: also file with only file header fits in this edge case. Check if read correctly in single buffer mode
1158  std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1159  ls,
1160  rawFile,
1161  !fileListMode_,
1162  rawFd,
1163  fileSize,
1164  rawHeaderSize,
1165  (rawHeaderSize > 0),
1166  0,
1167  this));
1169  fileQueue_.push(std::move(newInputFile));
1170  cvWakeup_.notify_one();
1171  break;
1172  }
1173  //in single-buffer mode put single chunk in the file and let the main thread read the file
1174  InputChunk* newChunk;
1175  //should be available immediately
1176  while (!freeChunks_.try_pop(newChunk)) {
1177  usleep(100000);
1178  if (quit_threads_.load(std::memory_order_relaxed))
1179  break;
1180  }
1181 
1182  std::unique_lock<std::mutex> lkw(mWakeup_);
1183 
1184  unsigned int toRead = eventChunkSize_;
1185  if (fileSize % eventChunkSize_)
1186  toRead = fileSize % eventChunkSize_;
1187  newChunk->reset(0, toRead, 0);
1188  newChunk->readComplete_ = true;
1189 
1190  //push file and wakeup main thread
1191  std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1192  ls,
1193  rawFile,
1194  !fileListMode_,
1195  rawFd,
1196  fileSize,
1197  rawHeaderSize,
1198  1,
1199  eventsInNewFile,
1200  this));
1201  newInputFile->chunks_[0] = newChunk;
1203  fileQueue_.push(std::move(newInputFile));
1204  cvWakeup_.notify_one();
1205  }
1206  }
1207  }
1209  //make sure threads finish reading
1210  unsigned numFinishedThreads = 0;
1211  while (numFinishedThreads < workerThreads_.size()) {
1212  unsigned tid = 0;
1213  while (!workerPool_.try_pop(tid)) {
1214  usleep(10000);
1215  }
1216  std::unique_lock<std::mutex> lk(mReader_);
1217  thread_quit_signal[tid] = true;
1218  cvReader_[tid]->notify_one();
1219  numFinishedThreads++;
1220  }
1221  for (unsigned int i = 0; i < workerThreads_.size(); i++) {
1222  workerThreads_[i]->join();
1223  delete workerThreads_[i];
1224  }
1225 }
std::condition_variable cvWakeup_
tbb::concurrent_queue< unsigned int > workerPool_
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
friend struct InputFile
Open Root file and provide MEs ############.
volatile std::atomic< bool > shutdown_flag
def ls
Definition: eostools.py:349
tbb::concurrent_queue< std::unique_ptr< InputFile > > fileQueue_
list status
Definition: mps_update.py:107
std::atomic< bool > quit_threads_
std::vector< ReaderInfo > workerJob_
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs)
Log< level::Error, false > LogError
int timeout
Definition: mps_check.py:53
assert(be >=bs)
std::vector< std::condition_variable * > cvReader_
void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex)
def move
Definition: eostools.py:511
string inf
Definition: EcalCondDB.py:96
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
void setMonStateSup(evf::FastMonState::InputState state)
int grabNextJsonFileAndUnlock(std::filesystem::path const &jsonSourcePath)
T min(T a, T b)
Definition: MathUtil.h:58
std::vector< std::thread * > workerThreads_
std::atomic< bool > readComplete_
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint16_t &rawHeaderSize, uint64_t &lockWaitTime, bool &setExceptionState)
int grabNextJsonFromRaw(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, int64_t &fileSizeFromHeader, bool &fileFound, uint32_t serverLS, bool closeFile)
unsigned int getLumisectionToStart() const
unsigned long long uint64_t
Definition: Time.h:13
std::condition_variable startupCv_
void stoppedLookingForFile(unsigned int lumi)
evf::EvFDaqDirector * daqDirector_
static std::atomic< unsigned int > counter
std::vector< unsigned int > thread_quit_signal
std::atomic< unsigned int > readingFilesCount_
unsigned int getStartLumisectionFromEnv() const
evf::FastMonitoringService * fms_
std::vector< unsigned int > tid_active_
tbb::concurrent_queue< InputChunk * > freeChunks_
Log< level::Warning, false > LogWarning
Power< A, B >::type pow(const A &a, const B &b)
Definition: Power.h:29
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
#define LogDebug(id)
void FedRawDataInputSource::readWorker ( unsigned int  tid)
private

Definition at line 1227 of file FedRawDataInputSource.cc.

References cms::cuda::assert(), InputChunk::buf_, InputFile::chunks_, cvReader_, detectedFRDversion_, change_name::diff, dataset::end, eventChunkBlock_, mergeVDriftHistosByStation::file, InputChunk::fileIndex_, InputFile::fileName_, InputFile::fileSize_, first, FRDHeaderMaxVersion, mps_fire::i, if(), init, dqmdumpme::last, LogDebug, min(), mReader_, submitPVValidationJobs::now, numConcurrentReads_, InputChunk::offset_, InputFile::rawFd_, InputFile::rawHeaderSize_, read(), readBlocks_, InputChunk::readComplete_, edm::second(), setExceptionState_, command_line::start, startupCv_, startupLock_, thread_quit_signal, threadInit_, tid_active_, InputChunk::usedSize_, workerJob_, and workerPool_.

Referenced by FedRawDataInputSource().

1227  {
1228  bool init = true;
1229  threadInit_.exchange(true, std::memory_order_acquire);
1230 
1231  while (true) {
1232  tid_active_[tid] = false;
1233  std::unique_lock<std::mutex> lk(mReader_);
1234  workerJob_[tid].first = nullptr;
1235  workerJob_[tid].first = nullptr;
1236 
1237  assert(!thread_quit_signal[tid]); //should never get it here
1238  workerPool_.push(tid);
1239 
1240  if (init) {
1241  std::unique_lock<std::mutex> lk(startupLock_);
1242  init = false;
1243  startupCv_.notify_one();
1244  }
1245  cvReader_[tid]->wait(lk);
1246 
1247  if (thread_quit_signal[tid])
1248  return;
1249  tid_active_[tid] = true;
1250 
1251  InputFile* file;
1252  InputChunk* chunk;
1253 
1254  assert(workerJob_[tid].first != nullptr && workerJob_[tid].second != nullptr);
1255 
1256  file = workerJob_[tid].first;
1257  chunk = workerJob_[tid].second;
1258 
1259  //skip reading initial header size in first chunk if inheriting file descriptor (already set at appropriate position)
1260  unsigned int bufferLeft = (chunk->offset_ == 0 && file->rawFd_ != -1) ? file->rawHeaderSize_ : 0;
1261 
1262  //if only one worker thread exists, use single fd for all operations
1263  //if more worker threads exist, use rawFd_ for only the first read operation and then close file
1264  int fileDescriptor;
1265  bool fileOpenedHere = false;
1266 
1267  if (numConcurrentReads_ == 1) {
1268  fileDescriptor = file->rawFd_;
1269  if (fileDescriptor == -1) {
1270  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1271  fileOpenedHere = true;
1272  file->rawFd_ = fileDescriptor;
1273  }
1274  } else {
1275  if (chunk->offset_ == 0) {
1276  fileDescriptor = file->rawFd_;
1277  file->rawFd_ = -1;
1278  if (fileDescriptor == -1) {
1279  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1280  fileOpenedHere = true;
1281  }
1282  } else {
1283  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1284  fileOpenedHere = true;
1285  }
1286  }
1287 
1288  if (fileDescriptor < 0) {
1289  edm::LogError("FedRawDataInputSource") << "readWorker failed to open file -: " << file->fileName_
1290  << " fd:" << fileDescriptor << " error: " << strerror(errno);
1291  setExceptionState_ = true;
1292  continue;
1293  }
1294  if (fileOpenedHere) { //fast forward to this chunk position
1295  off_t pos = 0;
1296  pos = lseek(fileDescriptor, chunk->offset_, SEEK_SET);
1297  if (pos == -1) {
1298  edm::LogError("FedRawDataInputSource")
1299  << "readWorker failed to seek file -: " << file->fileName_ << " fd:" << fileDescriptor << " to offset "
1300  << chunk->offset_ << " error: " << strerror(errno);
1301  setExceptionState_ = true;
1302  continue;
1303  }
1304  }
1305 
1306  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_
1307  << " at offset " << lseek(fileDescriptor, 0, SEEK_CUR);
1308 
1309  unsigned int skipped = bufferLeft;
1311  for (unsigned int i = 0; i < readBlocks_; i++) {
1312  ssize_t last;
1313 
1314  //protect against reading into next block
1315  last = ::read(
1316  fileDescriptor, (void*)(chunk->buf_ + bufferLeft), std::min(chunk->usedSize_ - bufferLeft, eventChunkBlock_));
1317 
1318  if (last < 0) {
1319  edm::LogError("FedRawDataInputSource") << "readWorker failed to read file -: " << file->fileName_
1320  << " fd:" << fileDescriptor << " error: " << strerror(errno);
1321  setExceptionState_ = true;
1322  break;
1323  }
1324  if (last > 0)
1325  bufferLeft += last;
1326  if (last < eventChunkBlock_) { //last read
1327  //check if this is last block, then total read size must match file size
1328  if (!(chunk->usedSize_ - skipped == i * eventChunkBlock_ + last)) {
1329  edm::LogError("FedRawDataInputSource")
1330  << "readWorker failed to read file -: " << file->fileName_ << " fd:" << fileDescriptor << " last:" << last
1331  << " expectedChunkSize:" << chunk->usedSize_
1332  << " readChunkSize:" << (skipped + i * eventChunkBlock_ + last) << " skipped:" << skipped
1333  << " block:" << (i + 1) << "/" << readBlocks_ << " error: " << strerror(errno);
1334  setExceptionState_ = true;
1335  }
1336  break;
1337  }
1338  }
1339  if (setExceptionState_)
1340  continue;
1341 
1343  auto diff = end - start;
1344  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1345  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1346  << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1347  << " GB/s)";
1348 
1349  if (chunk->offset_ + bufferLeft == file->fileSize_) { //file reading finished using same fd
1350  close(fileDescriptor);
1351  fileDescriptor = -1;
1352  if (numConcurrentReads_ == 1)
1353  file->rawFd_ = -1;
1354  }
1355  if (numConcurrentReads_ > 1 && fileDescriptor != -1)
1356  close(fileDescriptor);
1357 
1358  //detect FRD event version. Skip file Header if it exists
1359  if (detectedFRDversion_ == 0 && chunk->offset_ == 0) {
1360  detectedFRDversion_ = *((uint16_t*)(chunk->buf_ + file->rawHeaderSize_));
1361  }
1363  chunk->readComplete_ =
1364  true; //this is atomic to secure the sequential buffer fill before becoming available for processing)
1365  file->chunks_[chunk->fileIndex_] = chunk; //put the completed chunk in the file chunk vector at predetermined index
1366  }
1367 }
void read(edm::EventPrincipal &eventPrincipal) override
constexpr size_t FRDHeaderMaxVersion
unsigned int offset_
tbb::concurrent_queue< unsigned int > workerPool_
int init
Definition: HydjetWrapper.h:64
std::vector< ReaderInfo > workerJob_
uint16_t rawHeaderSize_
Log< level::Error, false > LogError
assert(be >=bs)
std::vector< std::condition_variable * > cvReader_
U second(std::pair< T, U > const &p)
unsigned char * buf_
if(conf_.getParameter< bool >("UseStripCablingDB"))
T min(T a, T b)
Definition: MathUtil.h:58
unsigned int fileIndex_
std::atomic< bool > readComplete_
std::string fileName_
std::condition_variable startupCv_
tbb::concurrent_vector< InputChunk * > chunks_
std::atomic< bool > threadInit_
string end
Definition: dataset.py:937
std::vector< unsigned int > thread_quit_signal
std::vector< unsigned int > tid_active_
tuple last
Definition: dqmdumpme.py:56
#define LogDebug(id)
void FedRawDataInputSource::reportEventsThisLumiInSource ( unsigned int  lumi,
unsigned int  events 
)
private

Definition at line 1532 of file FedRawDataInputSource.cc.

References CommonMethods::lock(), monlock_, and sourceEventsReport_.

Referenced by checkNext(), and getNextEvent().

1532  {
1533  std::lock_guard<std::mutex> lock(monlock_);
1534  auto itr = sourceEventsReport_.find(lumi);
1535  if (itr != sourceEventsReport_.end())
1536  itr->second += events;
1537  else
1539 }
std::map< unsigned int, unsigned int > sourceEventsReport_
list lumi
Definition: dqmdumpme.py:53
tuple events
Definition: patZpeak.py:20
void FedRawDataInputSource::rewind_ ( )
overrideprivatevirtual

Reimplemented from edm::InputSource.

Definition at line 748 of file FedRawDataInputSource.cc.

748 {}
void FedRawDataInputSource::setMonState ( evf::FastMonState::InputState  state)
inlineprotected

Definition at line 1374 of file FedRawDataInputSource.cc.

References fms_, and evf::FastMonitoringService::setInState().

Referenced by InputFile::advance(), checkNext(), getNextEvent(), and read().

1374  {
1375  if (fms_)
1376  fms_->setInState(state);
1377 }
ItemType state() const
Definition: InputSource.h:332
evf::FastMonitoringService * fms_
void setInState(FastMonState::InputState inputState)
void FedRawDataInputSource::setMonStateSup ( evf::FastMonState::InputState  state)
inlineprotected

Definition at line 1379 of file FedRawDataInputSource.cc.

References fms_, and evf::FastMonitoringService::setInStateSup().

Referenced by readSupervisor().

1379  {
1380  if (fms_)
1382 }
ItemType state() const
Definition: InputSource.h:332
void setInStateSup(FastMonState::InputState inputState)
evf::FastMonitoringService * fms_
void FedRawDataInputSource::threadError ( )
private

Definition at line 1369 of file FedRawDataInputSource.cc.

References Exception, and quit_threads_.

Referenced by InputFile::advance(), and getNextEvent().

1369  {
1370  quit_threads_ = true;
1371  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1372 }
std::atomic< bool > quit_threads_

Friends And Related Function Documentation

friend struct InputChunk
friend

Definition at line 42 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

friend struct InputFile
friend

Open Root file and provide MEs ############.

Definition at line 41 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

Member Data Documentation

const bool FedRawDataInputSource::alwaysStartFromFirstLS_
private

Definition at line 98 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

uint32_t FedRawDataInputSource::bufferInputRead_ = 0
private

Definition at line 179 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readNextChunkIntoBuffer().

unsigned int FedRawDataInputSource::checkEvery_ = 10
private

Definition at line 170 of file FedRawDataInputSource.h.

Referenced by read().

bool FedRawDataInputSource::chunkIsFree_ = false
private

Definition at line 142 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

std::unique_ptr<InputFile> FedRawDataInputSource::currentFile_
private

Definition at line 141 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

int FedRawDataInputSource::currentFileIndex_ = -1
private

Definition at line 164 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

unsigned int FedRawDataInputSource::currentLumiSection_
private

Definition at line 121 of file FedRawDataInputSource.h.

Referenced by checkNext(), getNextEvent(), maybeOpenNewLumiSection(), and read().

std::vector<std::condition_variable*> FedRawDataInputSource::cvReader_
private
std::condition_variable FedRawDataInputSource::cvWakeup_
private

Definition at line 174 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

evf::EvFDaqDirector* FedRawDataInputSource::daqDirector_ = nullptr
private
const edm::DaqProvenanceHelper FedRawDataInputSource::daqProvenanceHelper_
private

Definition at line 114 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and read().

std::string FedRawDataInputSource::defPath_
private

Definition at line 86 of file FedRawDataInputSource.h.

uint16_t FedRawDataInputSource::detectedFRDversion_ = 0
private

Definition at line 140 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readWorker().

std::unique_ptr<FRDEventMsgView> FedRawDataInputSource::event_
private

Definition at line 116 of file FedRawDataInputSource.h.

Referenced by checkNext(), fillFEDRawDataCollection(), getNextEvent(), and read().

unsigned int FedRawDataInputSource::eventChunkBlock_
private
unsigned int FedRawDataInputSource::eventChunkSize_
private
edm::EventID FedRawDataInputSource::eventID_
private

Definition at line 118 of file FedRawDataInputSource.h.

Referenced by read().

uint32_t FedRawDataInputSource::eventRunNumber_ = 0
private

Definition at line 122 of file FedRawDataInputSource.h.

Referenced by checkNext(), and read().

unsigned int FedRawDataInputSource::eventsThisLumi_
private

Definition at line 126 of file FedRawDataInputSource.h.

Referenced by checkNext(), getNextEvent(), and read().

unsigned long FedRawDataInputSource::eventsThisRun_ = 0
private

Definition at line 127 of file FedRawDataInputSource.h.

std::mutex FedRawDataInputSource::fileDeleteLock_
private

Definition at line 167 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and read().

int FedRawDataInputSource::fileDescriptor_ = -1
private

Definition at line 178 of file FedRawDataInputSource.h.

Referenced by readNextChunkIntoBuffer().

unsigned int FedRawDataInputSource::fileListIndex_ = 0
private

Definition at line 107 of file FedRawDataInputSource.h.

Referenced by getFile().

const bool FedRawDataInputSource::fileListLoopMode_
private

Definition at line 108 of file FedRawDataInputSource.h.

Referenced by checkNext(), FedRawDataInputSource(), getFile(), and read().

const bool FedRawDataInputSource::fileListMode_
private
std::vector<std::string> FedRawDataInputSource::fileNames_
private

Definition at line 102 of file FedRawDataInputSource.h.

Referenced by getFile(), and initFileList().

std::list<std::pair<int, std::string> > FedRawDataInputSource::fileNamesToDelete_
private

Definition at line 166 of file FedRawDataInputSource.h.

tbb::concurrent_queue<std::unique_ptr<InputFile> > FedRawDataInputSource::fileQueue_
private

Definition at line 152 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

std::list<std::pair<int, std::unique_ptr<InputFile> > > FedRawDataInputSource::filesToDelete_
private
evf::FastMonitoringService* FedRawDataInputSource::fms_ = nullptr
private
tbb::concurrent_queue<InputChunk*> FedRawDataInputSource::freeChunks_
private

Definition at line 151 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), read(), and readSupervisor().

std::string FedRawDataInputSource::fuOutputDir_
private

Definition at line 112 of file FedRawDataInputSource.h.

const bool FedRawDataInputSource::getLSFromFilename_
private

Definition at line 97 of file FedRawDataInputSource.h.

Referenced by checkNext(), getNextEvent(), and readSupervisor().

uint32_t FedRawDataInputSource::GTPEventID_ = 0
private

Definition at line 123 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

uint32_t FedRawDataInputSource::L1EventID_ = 0
private

Definition at line 124 of file FedRawDataInputSource.h.

Referenced by checkNext(), and read().

unsigned int FedRawDataInputSource::loopModeIterationInc_ = 0
private

Definition at line 109 of file FedRawDataInputSource.h.

Referenced by getFile().

unsigned int FedRawDataInputSource::maxBufferedFiles_
private

Definition at line 92 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

uint16_t FedRawDataInputSource::MAXTCDSuTCAFEDID_ = FEDNumbering::MAXTCDSuTCAFEDID
private

Definition at line 130 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and fillFEDRawDataCollection().

uint16_t FedRawDataInputSource::MINTCDSuTCAFEDID_ = FEDNumbering::MINTCDSuTCAFEDID
private

Definition at line 129 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and fillFEDRawDataCollection().

std::mutex FedRawDataInputSource::monlock_
private

Definition at line 184 of file FedRawDataInputSource.h.

Referenced by getEventReport(), and reportEventsThisLumiInSource().

std::mutex FedRawDataInputSource::mReader_
private

Definition at line 154 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), readWorker(), and ~FedRawDataInputSource().

std::mutex FedRawDataInputSource::mWakeup_
private

Definition at line 173 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

unsigned int FedRawDataInputSource::nStreams_ = 0
private

Definition at line 169 of file FedRawDataInputSource.h.

Referenced by read().

unsigned int FedRawDataInputSource::numBuffers_
private

Definition at line 91 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

unsigned int FedRawDataInputSource::numConcurrentReads_
private
edm::ProcessHistoryID FedRawDataInputSource::processHistoryID_
private

Definition at line 119 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), maybeOpenNewLumiSection(), and read().

std::atomic<bool> FedRawDataInputSource::quit_threads_
private
unsigned int FedRawDataInputSource::readBlocks_
private
std::atomic<unsigned int> FedRawDataInputSource::readingFilesCount_
private

Definition at line 94 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

std::unique_ptr<std::thread> FedRawDataInputSource::readSupervisorThread_
private

Definition at line 145 of file FedRawDataInputSource.h.

Referenced by checkNext(), and ~FedRawDataInputSource().

edm::RunNumber_t FedRawDataInputSource::runNumber_
private

Definition at line 111 of file FedRawDataInputSource.h.

Referenced by checkNext(), and FedRawDataInputSource().

bool FedRawDataInputSource::setExceptionState_ = false
private

Definition at line 160 of file FedRawDataInputSource.h.

Referenced by exceptionState(), getNextEvent(), readSupervisor(), and readWorker().

bool FedRawDataInputSource::singleBufferMode_
private

Definition at line 177 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

std::map<unsigned int, unsigned int> FedRawDataInputSource::sourceEventsReport_
private

Definition at line 183 of file FedRawDataInputSource.h.

Referenced by getEventReport(), and reportEventsThisLumiInSource().

bool FedRawDataInputSource::startedSupervisorThread_ = false
private

Definition at line 144 of file FedRawDataInputSource.h.

Referenced by checkNext(), and ~FedRawDataInputSource().

std::condition_variable FedRawDataInputSource::startupCv_
private
std::mutex FedRawDataInputSource::startupLock_
private
std::vector<int> FedRawDataInputSource::streamFileTracker_
private

Definition at line 168 of file FedRawDataInputSource.h.

Referenced by read().

unsigned char* FedRawDataInputSource::tcds_pointer_
private

Definition at line 125 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

const std::vector<unsigned int> FedRawDataInputSource::testTCDSFEDRange_
private

Definition at line 101 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

std::vector<unsigned int> FedRawDataInputSource::thread_quit_signal
private
std::atomic<bool> FedRawDataInputSource::threadInit_
private

Definition at line 181 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and readWorker().

std::vector<unsigned int> FedRawDataInputSource::tid_active_
private

Definition at line 156 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), readSupervisor(), and readWorker().

bool FedRawDataInputSource::useFileBroker_
private
const bool FedRawDataInputSource::useL1EventID_
private

Definition at line 100 of file FedRawDataInputSource.h.

Referenced by read().

const bool FedRawDataInputSource::verifyChecksum_
private

Definition at line 99 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

std::vector<ReaderInfo> FedRawDataInputSource::workerJob_
private

Definition at line 149 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), readSupervisor(), and readWorker().

tbb::concurrent_queue<unsigned int> FedRawDataInputSource::workerPool_
private

Definition at line 148 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), and readWorker().

std::vector<std::thread*> FedRawDataInputSource::workerThreads_
private