CMS 3D CMS Logo

List of all members | Public Member Functions | Static Public Member Functions | Protected Member Functions | Private Types | Private Member Functions | Private Attributes | Friends
DAQSource Class Reference

#include <DAQSource.h>

Inheritance diagram for DAQSource:
edm::RawInputSource edm::InputSource

Public Member Functions

int currentLumiSection () const
 
 DAQSource (edm::ParameterSet const &, edm::InputSourceDescription const &)
 
int eventRunNumber () const
 
bool fileListLoopMode ()
 
std::pair< bool, unsigned int > getEventReport (unsigned int lumi, bool erase)
 
void makeEventWrapper (edm::EventPrincipal &eventPrincipal, edm::EventAuxiliary &aux)
 
edm::ProcessHistoryIDprocessHistoryID ()
 
bool useL1EventID () const
 
 ~DAQSource () override
 
- Public Member Functions inherited from edm::RawInputSource
 RawInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
 
 ~RawInputSource () override
 
- Public Member Functions inherited from edm::InputSource
std::shared_ptr< ActivityRegistryactReg () const
 Accessor for Activity Registry. More...
 
std::shared_ptr< BranchIDListHelper const > branchIDListHelper () const
 Accessors for branchIDListHelper. More...
 
std::shared_ptr< BranchIDListHelper > & branchIDListHelper ()
 
void closeFile (FileBlock *, bool cleaningUpAfterException)
 close current file More...
 
void doBeginJob ()
 Called by framework at beginning of job. More...
 
virtual void doBeginLumi (LuminosityBlockPrincipal &lbp, ProcessContext const *)
 Called by framework at beginning of lumi block. More...
 
virtual void doBeginRun (RunPrincipal &rp, ProcessContext const *)
 Called by framework at beginning of run. More...
 
void doEndJob ()
 Called by framework at end of job. More...
 
void fillProcessBlockHelper ()
 Fill the ProcessBlockHelper with info for the current file. More...
 
ProcessingController::ForwardState forwardState () const
 
bool goToEvent (EventID const &eventID)
 
 InputSource (ParameterSet const &, InputSourceDescription const &)
 Constructor. More...
 
 InputSource (InputSource const &)=delete
 
void issueReports (EventID const &eventID, StreamID streamID)
 issue an event report More...
 
LuminosityBlockNumber_t luminosityBlock () const
 Accessor for current luminosity block number. More...
 
std::shared_ptr< LuminosityBlockAuxiliaryluminosityBlockAuxiliary () const
 Called by the framework to merge or insert lumi in principal cache. More...
 
int maxEvents () const
 
int maxLuminosityBlocks () const
 
ModuleDescription const & moduleDescription () const
 Accessor for 'module' description. More...
 
ItemTypeInfo nextItemType ()
 Advances the source to the next item. More...
 
bool nextProcessBlock (ProcessBlockPrincipal &)
 Next process block, return false if there is none, sets the processName in the principal. More...
 
InputSourceoperator= (InputSource const &)=delete
 
std::shared_ptr< ProcessBlockHelper const > processBlockHelper () const
 Accessors for processBlockHelper. More...
 
std::shared_ptr< ProcessBlockHelper > & processBlockHelper ()
 
ProcessConfiguration const & processConfiguration () const
 Accessor for Process Configuration. More...
 
std::string const & processGUID () const
 Accessor for global process identifier. More...
 
ProcessHistoryRegistry const & processHistoryRegistry () const
 Accessors for process history registry. More...
 
ProcessHistoryRegistryprocessHistoryRegistry ()
 
ProcessingMode processingMode () const
 RunsLumisAndEvents (default), RunsAndLumis, or Runs. More...
 
std::shared_ptr< ProductRegistry const > productRegistry () const
 Accessors for product registry. More...
 
bool randomAccess () const
 
void readAndMergeLumi (LuminosityBlockPrincipal &lbp)
 Read next luminosity block (same as a prior lumi) More...
 
void readAndMergeRun (RunPrincipal &rp)
 Read next run (same as a prior run) More...
 
void readEvent (EventPrincipal &ep, StreamContext &)
 Read next event. More...
 
bool readEvent (EventPrincipal &ep, EventID const &, StreamContext &)
 Read a specific event. More...
 
std::shared_ptr< FileBlockreadFile ()
 Read next file. More...
 
void readLuminosityBlock (LuminosityBlockPrincipal &lumiPrincipal, HistoryAppender &historyAppender)
 Read next luminosity block (new lumi) More...
 
std::shared_ptr< LuminosityBlockAuxiliaryreadLuminosityBlockAuxiliary ()
 Read next luminosity block Auxilary. More...
 
void readProcessBlock (ProcessBlockPrincipal &)
 Read next process block. More...
 
void readRun (RunPrincipal &runPrincipal, HistoryAppender &historyAppender)
 Read next run (new run) More...
 
std::shared_ptr< RunAuxiliaryreadRunAuxiliary ()
 Read next run Auxiliary. More...
 
ProcessHistoryID const & reducedProcessHistoryID () const
 
virtual void registerProducts ()
 Register any produced products. More...
 
int remainingEvents () const
 
int remainingLuminosityBlocks () const
 
void repeat ()
 Reset the remaining number of events/lumis to the maximum number. More...
 
std::pair< SharedResourcesAcquirer *, std::recursive_mutex * > resourceSharedWithDelayedReader ()
 Returns nullptr if no resource shared between the Source and a DelayedReader. More...
 
ProcessingController::ReverseState reverseState () const
 
void rewind ()
 Begin again at the first event. More...
 
RunNumber_t run () const
 Accessor for current run number. More...
 
std::shared_ptr< RunAuxiliaryrunAuxiliary () const
 Called by the framework to merge or insert run in principal cache. More...
 
void setLuminosityBlockNumber_t (LuminosityBlockNumber_t lb)
 Set the luminosity block ID. More...
 
void setRunNumber (RunNumber_t r)
 Set the run number. More...
 
void skipEvents (int offset)
 
void switchTo (std::shared_ptr< ProductRegistry > iOther)
 switch to a different ProductRegistry. More...
 
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper () const
 Accessors for thinnedAssociationsHelper. More...
 
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper ()
 
Timestamp const & timestamp () const
 Accessor for the current time, as seen by the input source. More...
 
virtual ~InputSource () noexcept(false)
 Destructor. More...
 

Static Public Member Functions

static void fillDescriptions (edm::ConfigurationDescriptions &descriptions)
 
- Static Public Member Functions inherited from edm::RawInputSource
static void fillDescription (ParameterSetDescription &description)
 
- Static Public Member Functions inherited from edm::InputSource
static const std::string & baseType ()
 
static void fillDescription (ParameterSetDescription &desc)
 
static void fillDescriptions (ConfigurationDescriptions &descriptions)
 
static void prevalidate (ConfigurationDescriptions &)
 

Protected Member Functions

Next checkNext () override
 
void read (edm::EventPrincipal &eventPrincipal) override
 
void setMonState (evf::FastMonState::InputState state)
 
void setMonStateSup (evf::FastMonState::InputState state)
 
- Protected Member Functions inherited from edm::RawInputSource
void makeEvent (EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
 
void setInputFileTransitionsEachEvent ()
 
- Protected Member Functions inherited from edm::InputSource
virtual void beginJob ()
 Begin protected makes it easier to do template programming. More...
 
void decreaseRemainingEventsBy (int iSkipped)
 
bool eventCached () const
 
bool newLumi () const
 
bool newRun () const
 
ProcessHistoryRegistryprocessHistoryRegistryForUpdate ()
 
ProductRegistryproductRegistryUpdate ()
 
void reset () const
 
void resetEventCached ()
 
void resetLuminosityBlockAuxiliary (bool isNewLumi=true) const
 
void resetNewLumi ()
 
void resetNewRun ()
 
void resetRunAuxiliary (bool isNewRun=true) const
 
void setEventCached ()
 Called by the framework to merge or ached() const {return eventCached_;}. More...
 
void setLuminosityBlockAuxiliary (LuminosityBlockAuxiliary *lbp)
 
void setNewLumi ()
 
void setNewRun ()
 
void setRunAuxiliary (RunAuxiliary *rp)
 
void setTimestamp (Timestamp const &theTime)
 To set the current time, as seen by the input source. More...
 
virtual void skip (int offset)
 
ItemTypeInfo state () const
 

Private Types

typedef std::pair< RawInputFile *, InputChunk * > ReaderInfo
 

Private Member Functions

void dataArranger ()
 
bool exceptionState ()
 
evf::EvFDaqDirector::FileStatus getFile (unsigned int &ls, std::string &nextFile, uint64_t &lockWaitTime)
 
evf::EvFDaqDirector::FileStatus getNextDataBlock ()
 
evf::EvFDaqDirector::FileStatus getNextEventFromDataBlock ()
 
long initFileList ()
 
void maybeOpenNewLumiSection (const uint32_t lumiSection)
 
void readSupervisor ()
 
void readWorker (unsigned int tid)
 
void reportEventsThisLumiInSource (unsigned int lumi, unsigned int events)
 
void rewind_ () override
 
void threadError ()
 

Private Attributes

const bool alwaysStartFromFirstLS_
 
unsigned int checkEvery_ = 10
 
bool chunkIsFree_ = false
 
std::unique_ptr< RawInputFilecurrentFile_
 
int currentFileIndex_ = -1
 
unsigned int currentLumiSection_
 
std::vector< std::unique_ptr< std::condition_variable > > cvReader_
 
std::condition_variable cvWakeup_
 
evf::EvFDaqDirectordaqDirector_ = nullptr
 
std::unique_ptr< std::thread > dataArrangerThread_
 
std::shared_ptr< DataModedataMode_
 
const std::string dataModeConfig_
 
uint64_t eventChunkBlock_
 
uint64_t eventChunkSize_
 
uint32_t eventRunNumber_ = 0
 
unsigned int eventsThisLumi_
 
unsigned long eventsThisRun_ = 0
 
std::mutex fileDeleteLock_
 
int fileDescriptor_ = -1
 
unsigned int fileListIndex_ = 0
 
const bool fileListLoopMode_
 
const bool fileListMode_
 
tbb::concurrent_queue< std::unique_ptr< RawInputFile > > fileQueue_
 
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
 
evf::FastMonitoringServicefms_ = nullptr
 
tbb::concurrent_queue< InputChunk * > freeChunks_
 
std::string fuOutputDir_
 
uint32_t GTPEventID_ = 0
 
std::vector< std::string > listFileNames_
 
unsigned int loopModeIterationInc_ = 0
 
unsigned int maxBufferedFiles_
 
uint64_t maxChunkSize_
 
std::mutex monlock_
 
std::mutex mReader_
 
std::mutex mWakeup_
 
unsigned int numBuffers_
 
unsigned int numConcurrentReads_
 
edm::ProcessHistoryID processHistoryID_
 
std::atomic< bool > quit_threads_
 
unsigned int readBlocks_
 
std::atomic< unsigned int > readingFilesCount_
 
std::unique_ptr< std::thread > readSupervisorThread_
 
std::default_random_engine rng_
 
edm::RunNumber_t runNumber_
 
bool setExceptionState_ = false
 
std::map< unsigned int, unsigned int > sourceEventsReport_
 
bool startedSupervisorThread_ = false
 
std::condition_variable startupCv_
 
std::mutex startupLock_
 
std::vector< int > streamFileTracker_
 
const std::vector< unsigned int > testTCDSFEDRange_
 
std::vector< unsigned int > thread_quit_signal
 
std::atomic< bool > threadInit_
 
std::vector< unsigned int > tid_active_
 
bool useFileBroker_
 
const bool useL1EventID_
 
const bool verifyChecksum_
 
std::vector< ReaderInfoworkerJob_
 
tbb::concurrent_queue< unsigned int > workerPool_
 
std::vector< std::thread * > workerThreads_
 

Friends

struct InputChunk
 
class RawInputFile
 

Additional Inherited Members

- Public Types inherited from edm::RawInputSource
enum  Next { Next::kEvent, Next::kFile, Next::kStop }
 
- Public Types inherited from edm::InputSource
enum  ItemPosition : char { ItemPosition::Invalid, ItemPosition::LastItemToBeMerged, ItemPosition::NotLastItemToBeMerged }
 
enum  ItemType : char {
  ItemType::IsInvalid, ItemType::IsStop, ItemType::IsFile, ItemType::IsRun,
  ItemType::IsLumi, ItemType::IsEvent, ItemType::IsRepeat, ItemType::IsSynchronize
}
 
enum  ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents }
 
- Public Attributes inherited from edm::InputSource
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> postEventReadFromSourceSignal_
 
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> preEventReadFromSourceSignal_
 

Detailed Description

Definition at line 41 of file DAQSource.h.

Member Typedef Documentation

◆ ReaderInfo

typedef std::pair<RawInputFile*, InputChunk*> DAQSource::ReaderInfo
private

Definition at line 132 of file DAQSource.h.

Constructor & Destructor Documentation

◆ DAQSource()

DAQSource::DAQSource ( edm::ParameterSet const &  pset,
edm::InputSourceDescription const &  desc 
)
explicit

Definition at line 32 of file DAQSource.cc.

References cms::cuda::assert(), edm::Timestamp::beginOfTime(), crc32c_hw_test(), cvReader_, daqDirector_, dataMode_, dataModeConfig_, eventChunkBlock_, eventChunkSize_, Exception, fileDeleteLock_, fileListLoopMode_, fileListMode_, filesToDelete_, fms_, freeChunks_, evf::EvFDaqDirector::getBUBaseDirs(), evf::EvFDaqDirector::getBUBaseDirsNSources(), mps_fire::i, evf::FastMonState::inInit, initFileList(), InputChunk, edm::Timestamp::invalidTimestamp(), maxChunkSize_, FEDNumbering::MAXTCDSuTCAFEDID, FEDNumbering::MINTCDSuTCAFEDID, numBuffers_, numConcurrentReads_, Utilities::operator, evf::EvFDaqDirector::overrideRunNumber(), processHistoryID_, edm::InputSource::processHistoryRegistryForUpdate(), edm::InputSource::productRegistryUpdate(), muonDTDigis_cfi::pset, quit_threads_, readBlocks_, readingFilesCount_, readWorker(), edm::InputSource::runAuxiliary(), runNumber_, evf::EvFDaqDirector::runString(), evf::EvFDaqDirector::setDeleteTracking(), evf::EvFDaqDirector::setFMS(), evf::FastMonitoringService::setInputSource(), evf::FastMonitoringService::setInState(), evf::FastMonitoringService::setInStateSup(), edm::InputSource::setNewRun(), edm::InputSource::setRunAuxiliary(), startupCv_, startupLock_, testTCDSFEDRange_, thread_quit_signal, tid_active_, workerJob_, and workerThreads_.

34  dataModeConfig_(pset.getUntrackedParameter<std::string>("dataMode")),
35  eventChunkSize_(uint64_t(pset.getUntrackedParameter<unsigned int>("eventChunkSize")) << 20),
36  maxChunkSize_(uint64_t(pset.getUntrackedParameter<unsigned int>("maxChunkSize")) << 20),
37  eventChunkBlock_(uint64_t(pset.getUntrackedParameter<unsigned int>("eventChunkBlock")) << 20),
38  numBuffers_(pset.getUntrackedParameter<unsigned int>("numBuffers")),
39  maxBufferedFiles_(pset.getUntrackedParameter<unsigned int>("maxBufferedFiles")),
40  alwaysStartFromFirstLS_(pset.getUntrackedParameter<bool>("alwaysStartFromFirstLS", false)),
41  verifyChecksum_(pset.getUntrackedParameter<bool>("verifyChecksum")),
42  useL1EventID_(pset.getUntrackedParameter<bool>("useL1EventID")),
43  testTCDSFEDRange_(pset.getUntrackedParameter<std::vector<unsigned int>>("testTCDSFEDRange")),
44  listFileNames_(pset.getUntrackedParameter<std::vector<std::string>>("fileNames")),
45  fileListMode_(pset.getUntrackedParameter<bool>("fileListMode")),
46  fileListLoopMode_(pset.getUntrackedParameter<bool>("fileListLoopMode", false)),
50  eventsThisLumi_(0),
51  rng_(std::chrono::system_clock::now().time_since_epoch().count()) {
52  char thishost[256];
53  gethostname(thishost, 255);
54 
55  if (maxChunkSize_ == 0)
57  else if (maxChunkSize_ < eventChunkSize_)
58  throw cms::Exception("DAQSource::DAQSource") << "maxChunkSize must be equal or larger than eventChunkSize";
59 
60  if (eventChunkBlock_ == 0)
63  throw cms::Exception("DAQSource::DAQSource") << "eventChunkBlock must be equal or smaller than eventChunkSize";
64 
65  edm::LogInfo("DAQSource") << "Construction. read-ahead chunk size -: " << std::endl
66  << (eventChunkSize_ >> 20) << " MB on host " << thishost << " in mode " << dataModeConfig_;
67 
68  uint16_t MINTCDSuTCAFEDID = FEDNumbering::MINTCDSuTCAFEDID;
69  uint16_t MAXTCDSuTCAFEDID = FEDNumbering::MAXTCDSuTCAFEDID;
70 
71  if (!testTCDSFEDRange_.empty()) {
72  if (testTCDSFEDRange_.size() != 2) {
73  throw cms::Exception("DAQSource::DAQSource") << "Invalid TCDS Test FED range parameter";
74  }
75  MINTCDSuTCAFEDID = testTCDSFEDRange_[0];
76  MAXTCDSuTCAFEDID = testTCDSFEDRange_[1];
77  }
78 
79  //load mode class based on parameter
80  if (dataModeConfig_ == "FRD") {
81  dataMode_.reset(new DataModeFRD(this));
82  } else if (dataModeConfig_ == "FRDStriped") {
83  dataMode_.reset(new DataModeFRDStriped(this));
84  } else if (dataModeConfig_ == "ScoutingRun3") {
85  dataMode_.reset(new DataModeScoutingRun3(this));
86  } else
87  throw cms::Exception("DAQSource::DAQSource") << "Unknown data mode " << dataModeConfig_;
88 
90 
91  dataMode_->setTCDSSearchRange(MINTCDSuTCAFEDID, MAXTCDSuTCAFEDID);
92  dataMode_->setTesting(pset.getUntrackedParameter<bool>("testing", false));
93 
94  long autoRunNumber = -1;
95  if (fileListMode_) {
96  autoRunNumber = initFileList();
97  if (!fileListLoopMode_) {
98  if (autoRunNumber < 0)
99  throw cms::Exception("DAQSource::DAQSource") << "Run number not found from filename";
100  //override run number
101  runNumber_ = (edm::RunNumber_t)autoRunNumber;
102  daqDirector_->overrideRunNumber((unsigned int)autoRunNumber);
103  }
104  }
105 
106  dataMode_->makeDirectoryEntries(
108 
109  auto& daqProvenanceHelpers = dataMode_->makeDaqProvenanceHelpers();
110  for (const auto& daqProvenanceHelper : daqProvenanceHelpers)
111  processHistoryID_ = daqProvenanceHelper->daqInit(productRegistryUpdate(), processHistoryRegistryForUpdate());
112  setNewRun();
113  //todo:autodetect from file name (assert if names differ)
115 
116  //make sure that chunk size is N * block size
121 
122  if (!numBuffers_)
123  throw cms::Exception("DAQSource::DAQSource") << "no reading enabled with numBuffers parameter 0";
124 
126  assert(numBuffers_ > 1);
127  readingFilesCount_ = 0;
128 
129  if (!crc32c_hw_test())
130  edm::LogError("DAQSource::DAQSource") << "Intel crc32c checksum computation unavailable";
131 
132  //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
133  if (fileListMode_) {
134  try {
136  } catch (cms::Exception const&) {
137  edm::LogInfo("DAQSource") << "No FastMonitoringService found in the configuration";
138  }
139  } else {
141  if (!fms_) {
142  throw cms::Exception("DAQSource") << "FastMonitoringService not found";
143  }
144  }
145 
147  if (!daqDirector_)
148  cms::Exception("DAQSource") << "EvFDaqDirector not found";
149 
150  edm::LogInfo("DAQSource") << "EvFDaqDirector/Source configured to use file service";
151  //set DaqDirector to delete files in preGlobalEndLumi callback
153  if (fms_) {
155  fms_->setInputSource(this);
158  }
159  //should delete chunks when run stops
160  for (unsigned int i = 0; i < numBuffers_; i++) {
162  }
163 
164  quit_threads_ = false;
165 
166  //prepare data shared by threads
167  for (unsigned int i = 0; i < numConcurrentReads_; i++) {
168  thread_quit_signal.push_back(false);
169  workerJob_.push_back(ReaderInfo(nullptr, nullptr));
170  cvReader_.push_back(std::make_unique<std::condition_variable>());
171  tid_active_.push_back(0);
172  }
173 
174  //start threads
175  for (unsigned int i = 0; i < numConcurrentReads_; i++) {
176  //wait for each thread to complete initialization
177  std::unique_lock<std::mutex> lk(startupLock_);
178  workerThreads_.push_back(new std::thread(&DAQSource::readWorker, this, i));
179  startupCv_.wait(lk);
180  }
181 
182  runAuxiliary()->setProcessHistoryID(processHistoryID_);
183 }
long initFileList()
Definition: DAQSource.cc:1400
std::string const & runString() const
std::pair< RawInputFile *, InputChunk * > ReaderInfo
Definition: DAQSource.h:132
edm::RunNumber_t runNumber_
Definition: DAQSource.h:114
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:261
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:359
static Timestamp invalidTimestamp()
Definition: Timestamp.h:75
bool crc32c_hw_test()
Definition: crc32c.cc:354
uint64_t maxChunkSize_
Definition: DAQSource.h:92
std::mutex startupLock_
Definition: DAQSource.h:155
std::default_random_engine rng_
Definition: DAQSource.h:124
std::condition_variable startupCv_
Definition: DAQSource.h:156
const bool useL1EventID_
Definition: DAQSource.h:103
std::mutex fileDeleteLock_
Definition: DAQSource.h:160
const std::string dataModeConfig_
Definition: DAQSource.h:90
std::vector< std::thread * > workerThreads_
Definition: DAQSource.h:140
Log< level::Error, false > LogError
assert(be >=bs)
edm::ProcessHistoryID processHistoryID_
Definition: DAQSource.h:117
unsigned int currentLumiSection_
Definition: DAQSource.h:119
void overrideRunNumber(unsigned int run)
std::atomic< unsigned int > readingFilesCount_
Definition: DAQSource.h:98
uint64_t eventChunkSize_
Definition: DAQSource.h:91
static Timestamp beginOfTime()
Definition: Timestamp.h:77
uint64_t eventChunkBlock_
Definition: DAQSource.h:93
unsigned int eventsThisLumi_
Definition: DAQSource.h:122
const bool fileListLoopMode_
Definition: DAQSource.h:111
friend struct InputChunk
Definition: DAQSource.h:43
unsigned int maxBufferedFiles_
Definition: DAQSource.h:96
evf::FastMonitoringService * fms_
Definition: DAQSource.h:87
void readWorker(unsigned int tid)
Definition: DAQSource.cc:1064
unsigned int numBuffers_
Definition: DAQSource.h:95
std::atomic< bool > quit_threads_
Definition: DAQSource.h:152
const bool alwaysStartFromFirstLS_
Definition: DAQSource.h:101
std::shared_ptr< DataMode > dataMode_
Definition: DAQSource.h:176
const bool fileListMode_
Definition: DAQSource.h:109
void setInputSource(FedRawDataInputSource *inputSource)
Log< level::Info, false > LogInfo
const std::vector< unsigned int > testTCDSFEDRange_
Definition: DAQSource.h:104
def getRunNumber(filename)
void setInStateSup(FastMonState::InputState inputState)
unsigned long long uint64_t
Definition: Time.h:13
std::vector< std::string > listFileNames_
Definition: DAQSource.h:105
std::vector< unsigned int > tid_active_
Definition: DAQSource.h:150
std::vector< int > const & getBUBaseDirsNSources() const
std::vector< unsigned int > thread_quit_signal
Definition: DAQSource.h:153
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:360
unsigned int readBlocks_
Definition: DAQSource.h:94
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:362
std::vector< std::unique_ptr< std::condition_variable > > cvReader_
Definition: DAQSource.h:149
evf::EvFDaqDirector * daqDirector_
Definition: DAQSource.h:88
tbb::concurrent_queue< InputChunk * > freeChunks_
Definition: DAQSource.h:145
unsigned int numConcurrentReads_
Definition: DAQSource.h:97
unsigned int RunNumber_t
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
std::vector< ReaderInfo > workerJob_
Definition: DAQSource.h:143
void setFMS(evf::FastMonitoringService *fms)
const bool verifyChecksum_
Definition: DAQSource.h:102
std::vector< std::string > const & getBUBaseDirs() const
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
Definition: DAQSource.h:159
void setInState(FastMonState::InputState inputState)

◆ ~DAQSource()

DAQSource::~DAQSource ( )
override

Definition at line 185 of file DAQSource.cc.

References currentFile_, cvReader_, evf::FastMonitoringService::exceptionDetected(), fileDeleteLock_, filesToDelete_, fms_, mps_fire::i, evf::FastMonitoringService::isExceptionOnData(), mReader_, quit_threads_, readSupervisorThread_, startedSupervisorThread_, thread_quit_signal, and workerThreads_.

185  {
186  quit_threads_ = true;
187 
188  //delete any remaining open files
189  if (!fms_ || !fms_->exceptionDetected()) {
190  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
191  for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++)
192  it->second.reset();
193  } else {
194  //skip deleting files with exception
195  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
196  for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++) {
197  if (fms_->isExceptionOnData(it->second->lumi_))
198  it->second->unsetDeleteFile();
199  else
200  it->second.reset();
201  }
202  //disable deleting current file with exception
203  if (currentFile_.get())
204  if (fms_->isExceptionOnData(currentFile_->lumi_))
205  currentFile_->unsetDeleteFile();
206  }
207 
209  readSupervisorThread_->join();
210  } else {
211  //join aux threads in case the supervisor thread was not started
212  for (unsigned int i = 0; i < workerThreads_.size(); i++) {
213  std::unique_lock<std::mutex> lk(mReader_);
214  thread_quit_signal[i] = true;
215  cvReader_[i]->notify_one();
216  lk.unlock();
217  workerThreads_[i]->join();
218  delete workerThreads_[i];
219  }
220  }
221 }
std::unique_ptr< RawInputFile > currentFile_
Definition: DAQSource.h:134
std::unique_ptr< std::thread > readSupervisorThread_
Definition: DAQSource.h:138
std::mutex fileDeleteLock_
Definition: DAQSource.h:160
std::vector< std::thread * > workerThreads_
Definition: DAQSource.h:140
std::mutex mReader_
Definition: DAQSource.h:148
bool isExceptionOnData(unsigned int ls)
evf::FastMonitoringService * fms_
Definition: DAQSource.h:87
std::atomic< bool > quit_threads_
Definition: DAQSource.h:152
std::vector< unsigned int > thread_quit_signal
Definition: DAQSource.h:153
std::vector< std::unique_ptr< std::condition_variable > > cvReader_
Definition: DAQSource.h:149
bool startedSupervisorThread_
Definition: DAQSource.h:137
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
Definition: DAQSource.h:159

Member Function Documentation

◆ checkNext()

edm::RawInputSource::Next DAQSource::checkNext ( )
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 254 of file DAQSource.cc.

References visDQMUpload::buf, evf::EvFDaqDirector::createProcessingNotificationMaybe(), currentLumiSection_, daqDirector_, dataMode_, eventRunNumber_, eventsThisLumi_, fileListLoopMode_, fileListMode_, evf::EvFDaqDirector::getEoRFilePathOnFU(), getNextDataBlock(), getNextEventFromDataBlock(), evf::FastMonState::inWaitInput, edm::RawInputSource::kEvent, edm::RawInputSource::kStop, svgfig::load(), evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, readSupervisor(), readSupervisorThread_, reportEventsThisLumiInSource(), edm::InputSource::resetLuminosityBlockAuxiliary(), evf::EvFDaqDirector::runEnded, runNumber_, edm::InputSource::setEventCached(), setMonState(), edm::shutdown_flag, startedSupervisorThread_, startupCv_, startupLock_, edm_modernize_messagelogger::stat, and mps_update::status.

254  {
256  std::unique_lock<std::mutex> lk(startupLock_);
257 
258  //this thread opens new files and dispatches reading to worker readers
259  readSupervisorThread_ = std::make_unique<std::thread>(&DAQSource::readSupervisor, this);
261 
262  startupCv_.wait(lk);
263  }
264 
265  //signal hltd to start event accounting
266  if (!currentLumiSection_)
269 
270  auto nextEvent = [this]() {
271  auto getNextEvent = [this]() {
272  //for some models this is always true (if one event is one block)
273  if (dataMode_->dataBlockCompleted()) {
274  return getNextDataBlock();
275  } else {
276  return getNextEventFromDataBlock();
277  }
278  };
279 
281  while ((status = getNextEvent()) == evf::EvFDaqDirector::noFile) {
282  if (edm::shutdown_flag.load(std::memory_order_relaxed))
283  break;
284  }
285  return status;
286  };
287 
288  switch (nextEvent()) {
290  //maybe create EoL file in working directory before ending run
291  struct stat buf;
292  //also create EoR file in FU data directory
293  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(), &buf) == 0);
294  if (!eorFound) {
295  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(),
296  O_RDWR | O_CREAT,
297  S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
298  close(eor_fd);
299  }
301  eventsThisLumi_ = 0;
303  edm::LogInfo("DAQSource") << "----------------RUN ENDED----------------";
304  return Next::kStop;
305  }
307  //this is not reachable
308  return Next::kEvent;
309  }
311  //std::cout << "--------------NEW LUMI---------------" << std::endl;
312  return Next::kEvent;
313  }
314  default: {
317  else
318  eventRunNumber_ = dataMode_->run();
319 
320  setEventCached();
321 
322  return Next::kEvent;
323  }
324  }
325 }
edm::RunNumber_t runNumber_
Definition: DAQSource.h:114
std::unique_ptr< std::thread > readSupervisorThread_
Definition: DAQSource.h:138
std::mutex startupLock_
Definition: DAQSource.h:155
std::condition_variable startupCv_
Definition: DAQSource.h:156
volatile std::atomic< bool > shutdown_flag
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
Definition: DAQSource.cc:1379
unsigned int currentLumiSection_
Definition: DAQSource.h:119
evf::EvFDaqDirector::FileStatus getNextEventFromDataBlock()
Definition: DAQSource.cc:347
std::string getEoRFilePathOnFU() const
evf::EvFDaqDirector::FileStatus getNextDataBlock()
Definition: DAQSource.cc:373
unsigned int eventsThisLumi_
Definition: DAQSource.h:122
const bool fileListLoopMode_
Definition: DAQSource.h:111
std::shared_ptr< DataMode > dataMode_
Definition: DAQSource.h:176
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:391
const bool fileListMode_
Definition: DAQSource.h:109
void createProcessingNotificationMaybe() const
Log< level::Info, false > LogInfo
uint32_t eventRunNumber_
Definition: DAQSource.h:120
def load(fileName)
Definition: svgfig.py:547
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:374
bool startedSupervisorThread_
Definition: DAQSource.h:137
evf::EvFDaqDirector * daqDirector_
Definition: DAQSource.h:88
void setMonState(evf::FastMonState::InputState state)
Definition: DAQSource.cc:1328
void readSupervisor()
Definition: DAQSource.cc:593

◆ currentLumiSection()

int DAQSource::currentLumiSection ( ) const
inline

◆ dataArranger()

void DAQSource::dataArranger ( )
private

Definition at line 591 of file DAQSource.cc.

591 {}

◆ eventRunNumber()

int DAQSource::eventRunNumber ( ) const
inline

Definition at line 53 of file DAQSource.h.

References eventRunNumber_.

Referenced by DataModeFRD::readEvent(), DataModeScoutingRun3::readEvent(), and DataModeFRDStriped::readEvent().

53 { return eventRunNumber_; }
uint32_t eventRunNumber_
Definition: DAQSource.h:120

◆ exceptionState()

bool DAQSource::exceptionState ( )
inlineprivate

Definition at line 78 of file DAQSource.h.

References setExceptionState_.

Referenced by RawInputFile::advance().

78 { return setExceptionState_; }
bool setExceptionState_
Definition: DAQSource.h:154

◆ fileListLoopMode()

bool DAQSource::fileListLoopMode ( )
inline

Definition at line 57 of file DAQSource.h.

References fileListLoopMode_.

Referenced by DataModeFRD::readEvent(), and DataModeFRDStriped::readEvent().

57 { return fileListLoopMode_; }
const bool fileListLoopMode_
Definition: DAQSource.h:111

◆ fillDescriptions()

void DAQSource::fillDescriptions ( edm::ConfigurationDescriptions descriptions)
static

Definition at line 223 of file DAQSource.cc.

References edm::ConfigurationDescriptions::add(), submitPVResolutionJobs::desc, and AlCaHLTBitMon_QueryRunRegistry::string.

223  {
225  desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk (unified)");
226  desc.addUntracked<std::string>("dataMode", "FRD")->setComment("Data mode: event 'FRD', 'FRSStriped', 'ScoutingRun2'");
227  desc.addUntracked<unsigned int>("eventChunkSize", 64)->setComment("Input buffer (chunk) size");
228  desc.addUntracked<unsigned int>("maxChunkSize", 0)
229  ->setComment("Maximum chunk size allowed if buffer is resized to fit data. If 0 is specifier, use chunk size");
230  desc.addUntracked<unsigned int>("eventChunkBlock", 0)
231  ->setComment(
232  "Block size used in a single file read call (must be smaller or equal to the initial chunk buffer size). If "
233  "0 is specified, use chunk size.");
234 
235  desc.addUntracked<unsigned int>("numBuffers", 2)->setComment("Number of buffers used for reading input");
236  desc.addUntracked<unsigned int>("maxBufferedFiles", 2)
237  ->setComment("Maximum number of simultaneously buffered raw files");
238  desc.addUntracked<unsigned int>("alwaysStartFromfirstLS", false)
239  ->setComment("Force source to start from LS 1 if server provides higher lumisection number");
240  desc.addUntracked<bool>("verifyChecksum", true)
241  ->setComment("Verify event CRC-32C checksum of FRDv5 and higher or Adler32 with v3 and v4");
242  desc.addUntracked<bool>("useL1EventID", false)
243  ->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
244  desc.addUntracked<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())
245  ->setComment("[min, max] range to search for TCDS FED ID in test setup");
246  desc.addUntracked<bool>("fileListMode", false)
247  ->setComment("Use fileNames parameter to directly specify raw files to open");
248  desc.addUntracked<std::vector<std::string>>("fileNames", std::vector<std::string>())
249  ->setComment("file list used when fileListMode is enabled");
250  desc.setAllowAnything();
251  descriptions.add("source", desc);
252 }
void add(std::string const &label, ParameterSetDescription const &psetDescription)

◆ getEventReport()

std::pair< bool, unsigned int > DAQSource::getEventReport ( unsigned int  lumi,
bool  erase 
)

Definition at line 1388 of file DAQSource.cc.

References CommonMethods::lock(), monlock_, runTheMatrix::ret, and sourceEventsReport_.

Referenced by evf::FastMonitoringService::preGlobalEndLumi().

1388  {
1389  std::lock_guard<std::mutex> lock(monlock_);
1390  auto itr = sourceEventsReport_.find(lumi);
1391  if (itr != sourceEventsReport_.end()) {
1392  std::pair<bool, unsigned int> ret(true, itr->second);
1393  if (erase)
1394  sourceEventsReport_.erase(itr);
1395  return ret;
1396  } else
1397  return std::pair<bool, unsigned int>(false, 0);
1398 }
std::map< unsigned int, unsigned int > sourceEventsReport_
Definition: DAQSource.h:173
ret
prodAgent to be discontinued
std::mutex monlock_
Definition: DAQSource.h:174

◆ getFile()

evf::EvFDaqDirector::FileStatus DAQSource::getFile ( unsigned int &  ls,
std::string &  nextFile,
uint64_t &  lockWaitTime 
)
private

Definition at line 1434 of file DAQSource.cc.

References fileListIndex_, fileListLoopMode_, MillePedeFileConverter_cfg::fileName, listFileNames_, loopModeIterationInc_, eostools::ls(), evf::EvFDaqDirector::newFile, castor_dqm_sourceclient_file_cfg::path, evf::EvFDaqDirector::runEnded, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by readSupervisor().

1434  {
1435  if (fileListIndex_ < listFileNames_.size()) {
1436  nextFile = listFileNames_[fileListIndex_];
1437  if (nextFile.find("file://") == 0)
1438  nextFile = nextFile.substr(7);
1439  else if (nextFile.find("file:") == 0)
1440  nextFile = nextFile.substr(5);
1441  std::filesystem::path fileName = nextFile;
1442  std::string fileStem = fileName.stem().string();
1443  if (fileStem.find("ls"))
1444  fileStem = fileStem.substr(fileStem.find("ls") + 2);
1445  if (fileStem.find('_'))
1446  fileStem = fileStem.substr(0, fileStem.find('_'));
1447 
1448  if (!fileListLoopMode_)
1449  ls = std::stoul(fileStem);
1450  else //always starting from LS 1 in loop mode
1451  ls = 1 + loopModeIterationInc_;
1452 
1453  //fsize = 0;
1454  //lockWaitTime = 0;
1455  fileListIndex_++;
1457  } else {
1458  if (!fileListLoopMode_)
1460  else {
1461  //loop through files until interrupted
1463  fileListIndex_ = 0;
1464  return getFile(ls, nextFile, lockWaitTime);
1465  }
1466  }
1467 }
unsigned int loopModeIterationInc_
Definition: DAQSource.h:112
unsigned int fileListIndex_
Definition: DAQSource.h:110
const bool fileListLoopMode_
Definition: DAQSource.h:111
def ls(path, rec=False)
Definition: eostools.py:349
std::vector< std::string > listFileNames_
Definition: DAQSource.h:105
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint64_t &lockWaitTime)
Definition: DAQSource.cc:1434

◆ getNextDataBlock()

evf::EvFDaqDirector::FileStatus DAQSource::getNextDataBlock ( )
inlineprivate

Definition at line 373 of file DAQSource.cc.

References cms::cuda::assert(), chunkIsFree_, currentFile_, currentFileIndex_, currentLumiSection_, cvWakeup_, daqDirector_, dataMode_, eventsThisLumi_, Exception, fileDeleteLock_, fileListMode_, fileQueue_, filesToDelete_, fms_, freeChunks_, getNextEventFromDataBlock(), evf::FastMonState::inChunkReceived, evf::FastMonState::inNewLumi, evf::FastMonState::inProcessingFile, evf::FastMonState::inRunEnd, evf::FastMonState::inWaitChunk, evf::FastMonState::inWaitInput, evf::EvFDaqDirector::isSingleStreamThread(), maybeOpenNewLumiSection(), eostools::move(), mWakeup_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, readingFilesCount_, reportEventsThisLumiInSource(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, setExceptionState_, setMonState(), mps_update::status, threadError(), and mps_check::timeout.

Referenced by checkNext().

373  {
374  if (setExceptionState_)
375  threadError();
376  if (!currentFile_.get()) {
379  {
380  IdleSourceSentry ids(fms_);
381  if (!fileQueue_.try_pop(currentFile_)) {
382  //sleep until wakeup (only in single-buffer mode) or timeout
383  std::unique_lock<std::mutex> lkw(mWakeup_);
384  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_.get())
386  }
387  }
388  status = currentFile_->status_;
391  currentFile_.reset();
392  return status;
393  } else if (status == evf::EvFDaqDirector::runAbort) {
394  throw cms::Exception("DAQSource::getNextDataBlock") << "Run has been aborted by the input source reader thread";
395  } else if (status == evf::EvFDaqDirector::newLumi) {
397  if (currentFile_->lumi_ > currentLumiSection_) {
399  eventsThisLumi_ = 0;
401  }
402  currentFile_.reset();
403  return status;
404  } else if (status == evf::EvFDaqDirector::newFile) {
406  } else
407  assert(false);
408  }
410 
411  //file is empty
412  if (!currentFile_->fileSize_) {
414  //try to open new lumi
415  assert(currentFile_->nChunks_ == 0);
416  if (currentFile_->lumi_ > currentLumiSection_) {
418  eventsThisLumi_ = 0;
420  }
421  //immediately delete empty file
422  currentFile_.reset();
424  }
425 
426  //file is finished
427  if (currentFile_->bufferPosition_ == currentFile_->fileSize_) {
429  //release last chunk (it is never released elsewhere)
430  freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]);
431  if (currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_)) {
432  throw cms::Exception("DAQSource::getNextDataBlock")
433  << "Fully processed " << currentFile_->nProcessed_ << " from the file " << currentFile_->fileName_
434  << " but according to BU JSON there should be " << currentFile_->nEvents_ << " events";
435  }
437  //put the file in pending delete list;
438  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
439  filesToDelete_.push_back(
440  std::pair<int, std::unique_ptr<RawInputFile>>(currentFileIndex_, std::move(currentFile_)));
441  } else {
442  //in single-thread and stream jobs, events are already processed
443  currentFile_.reset();
444  }
446  }
447 
448  //assert(currentFile_->status_ == evf::EvFDaqDirector::newFile);
449 
450  //handle RAW file header
451  if (currentFile_->bufferPosition_ == 0 && currentFile_->rawHeaderSize_ > 0) {
452  if (currentFile_->fileSize_ <= currentFile_->rawHeaderSize_) {
453  if (currentFile_->fileSize_ < currentFile_->rawHeaderSize_)
454  throw cms::Exception("DAQSource::getNextDataBlock") << "Premature end of input file while reading file header";
455 
456  edm::LogWarning("DAQSource") << "File with only raw header and no events received in LS " << currentFile_->lumi_;
457  if (currentFile_->lumi_ > currentLumiSection_) {
459  eventsThisLumi_ = 0;
461  }
462  }
463 
464  //advance buffer position to skip file header (chunk will be acquired later)
465  currentFile_->advance(currentFile_->rawHeaderSize_);
466  }
467 
468  //file is too short to fit event header
469  if (currentFile_->fileSizeLeft() < dataMode_->headerSize())
470  throw cms::Exception("DAQSource::getNextDataBlock")
471  << "Premature end of input file while reading event header. Missing: "
472  << (dataMode_->headerSize() - currentFile_->fileSizeLeft()) << " bytes";
473 
474  //multibuffer mode
475  //wait for the current chunk to become added to the vector
477  {
478  IdleSourceSentry ids(fms_);
479  while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
480  usleep(10000);
481  if (setExceptionState_)
482  threadError();
483  }
484  }
486 
487  chunkIsFree_ = false;
488  bool chunkEnd;
489  unsigned char* dataPosition;
490 
491  //read event header, copy it to a single chunk if necessary
492  chunkEnd = currentFile_->advance(dataPosition, dataMode_->headerSize());
493 
494  //get buffer size of current chunk (can be resized)
495  uint64_t currentChunkSize = currentFile_->currentChunkSize();
496 
497  //prepare view based on header that was read
498  dataMode_->makeDataBlockView(dataPosition, currentChunkSize, currentFile_->fileSizes_, currentFile_->rawHeaderSize_);
499 
500  //check that payload size is within the file
501  const size_t msgSize = dataMode_->dataBlockSize() - dataMode_->headerSize();
502 
503  if (currentFile_->fileSizeLeft() < (int64_t)msgSize)
504  throw cms::Exception("DAQSource::getNextEventDataBlock")
505  << "Premature end of input file (missing:" << (msgSize - currentFile_->fileSizeLeft())
506  << ") while parsing block";
507 
508  //for cross-buffer models
509  if (chunkEnd) {
510  //header was at the chunk boundary, move payload into the starting chunk as well. No need to update block view here
511  currentFile_->moveToPreviousChunk(msgSize, dataMode_->headerSize());
512  //mark to release old chunk
513  chunkIsFree_ = true;
514  } else if (currentChunkSize - currentFile_->chunkPosition_ < msgSize) {
515  //header was contiguous, but payload does not fit in the chunk
516  //rewind to header start position and then together with payload will be copied together to the old chunk
517  currentFile_->rewindChunk(dataMode_->headerSize());
518 
520  {
521  IdleSourceSentry ids(fms_);
522  //do the copy to the beginning of the starting chunk. move pointers for next event in the next chunk
523  chunkEnd = currentFile_->advance(dataPosition, dataMode_->headerSize() + msgSize);
524  assert(chunkEnd);
525  //mark to release old chunk
526  chunkIsFree_ = true;
527  }
529  //header and payload is moved, update view
530  dataMode_->makeDataBlockView(
531  dataPosition, currentFile_->currentChunkSize(), currentFile_->fileSizes_, currentFile_->rawHeaderSize_);
532  } else {
533  //everything is in a single chunk, only move pointers forward
534  chunkEnd = currentFile_->advance(dataPosition, msgSize);
535  assert(!chunkEnd);
536  chunkIsFree_ = false;
537  }
538 
539  //sanity-check check that the buffer position has not exceeded file size after preparing event
540  if (currentFile_->fileSize_ < currentFile_->bufferPosition_)
541  throw cms::Exception("DAQSource::getNextEventDataBlock")
542  << "Exceeded file size by " << (currentFile_->bufferPosition_ - currentFile_->fileSize_);
543 
544  //prepare event
545  return getNextEventFromDataBlock();
546 }
std::unique_ptr< RawInputFile > currentFile_
Definition: DAQSource.h:134
int currentFileIndex_
Definition: DAQSource.h:158
void threadError()
Definition: DAQSource.cc:1323
bool chunkIsFree_
Definition: DAQSource.h:135
void maybeOpenNewLumiSection(const uint32_t lumiSection)
Definition: DAQSource.cc:327
std::mutex fileDeleteLock_
Definition: DAQSource.h:160
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
Definition: DAQSource.cc:1379
int timeout
Definition: mps_check.py:53
assert(be >=bs)
std::mutex mWakeup_
Definition: DAQSource.h:165
unsigned int currentLumiSection_
Definition: DAQSource.h:119
std::atomic< unsigned int > readingFilesCount_
Definition: DAQSource.h:98
evf::EvFDaqDirector::FileStatus getNextEventFromDataBlock()
Definition: DAQSource.cc:347
unsigned int eventsThisLumi_
Definition: DAQSource.h:122
std::condition_variable cvWakeup_
Definition: DAQSource.h:166
evf::FastMonitoringService * fms_
Definition: DAQSource.h:87
std::shared_ptr< DataMode > dataMode_
Definition: DAQSource.h:176
tbb::concurrent_queue< std::unique_ptr< RawInputFile > > fileQueue_
Definition: DAQSource.h:146
const bool fileListMode_
Definition: DAQSource.h:109
unsigned long long uint64_t
Definition: Time.h:13
bool setExceptionState_
Definition: DAQSource.h:154
evf::EvFDaqDirector * daqDirector_
Definition: DAQSource.h:88
tbb::concurrent_queue< InputChunk * > freeChunks_
Definition: DAQSource.h:145
Log< level::Warning, false > LogWarning
void setMonState(evf::FastMonState::InputState state)
Definition: DAQSource.cc:1328
def move(src, dest)
Definition: eostools.py:511
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
Definition: DAQSource.h:159

◆ getNextEventFromDataBlock()

evf::EvFDaqDirector::FileStatus DAQSource::getNextEventFromDataBlock ( )
inlineprivate

Definition at line 347 of file DAQSource.cc.

References currentFile_, currentLumiSection_, dataMode_, Exception, fms_, newFWLiteAna::found, evf::FastMonState::inCachedEvent, evf::FastMonState::inChecksumEvent, evf::EvFDaqDirector::noFile, evf::EvFDaqDirector::sameFile, evf::FastMonitoringService::setExceptionDetected(), setMonState(), and verifyChecksum_.

Referenced by checkNext(), and getNextDataBlock().

347  {
349 
350  bool found = dataMode_->nextEventView();
351  //file(s) completely parsed
352  if (!found) {
353  if (dataMode_->dataBlockInitialized()) {
354  dataMode_->setDataBlockInitialized(false);
355  //roll position to the end of the file to close it
356  currentFile_->bufferPosition_ = currentFile_->fileSize_;
357  }
359  }
360 
361  if (verifyChecksum_ && !dataMode_->checksumValid()) {
362  if (fms_)
364  throw cms::Exception("DAQSource::getNextEventFromDataBlock") << dataMode_->getChecksumError();
365  }
367 
368  currentFile_->nProcessed_++;
369 
371 }
std::unique_ptr< RawInputFile > currentFile_
Definition: DAQSource.h:134
void setExceptionDetected(unsigned int ls)
unsigned int currentLumiSection_
Definition: DAQSource.h:119
evf::FastMonitoringService * fms_
Definition: DAQSource.h:87
std::shared_ptr< DataMode > dataMode_
Definition: DAQSource.h:176
void setMonState(evf::FastMonState::InputState state)
Definition: DAQSource.cc:1328
const bool verifyChecksum_
Definition: DAQSource.h:102

◆ initFileList()

long DAQSource::initFileList ( )
private

Definition at line 1400 of file DAQSource.cc.

References a, b, mps_fire::end, cppFunctionSkipper::exception, MillePedeFileConverter_cfg::fileName, listFileNames_, castor_dqm_sourceclient_file_cfg::path, jetUpdater_cfi::sort, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by DAQSource().

1400  {
1402  if (a.rfind('/') != std::string::npos)
1403  a = a.substr(a.rfind('/'));
1404  if (b.rfind('/') != std::string::npos)
1405  b = b.substr(b.rfind('/'));
1406  return b > a;
1407  });
1408 
1409  if (!listFileNames_.empty()) {
1410  //get run number from first file in the vector
1412  std::string fileStem = fileName.stem().string();
1413  if (fileStem.find("file://") == 0)
1414  fileStem = fileStem.substr(7);
1415  else if (fileStem.find("file:") == 0)
1416  fileStem = fileStem.substr(5);
1417  auto end = fileStem.find('_');
1418 
1419  if (fileStem.find("run") == 0) {
1420  std::string runStr = fileStem.substr(3, end - 3);
1421  try {
1422  //get long to support test run numbers < 2^32
1423  long rval = std::stol(runStr);
1424  edm::LogInfo("DAQSource") << "Autodetected run number in fileListMode -: " << rval;
1425  return rval;
1426  } catch (const std::exception&) {
1427  edm::LogWarning("DAQSource") << "Unable to autodetect run number in fileListMode from file -: " << fileName;
1428  }
1429  }
1430  }
1431  return -1;
1432 }
Log< level::Info, false > LogInfo
std::vector< std::string > listFileNames_
Definition: DAQSource.h:105
double b
Definition: hdecay.h:120
double a
Definition: hdecay.h:121
Log< level::Warning, false > LogWarning

◆ makeEventWrapper()

void DAQSource::makeEventWrapper ( edm::EventPrincipal eventPrincipal,
edm::EventAuxiliary aux 
)
inline

Definition at line 54 of file DAQSource.h.

References printConversionInfo::aux, and edm::RawInputSource::makeEvent().

Referenced by DataModeFRD::readEvent(), DataModeScoutingRun3::readEvent(), and DataModeFRDStriped::readEvent().

54  {
55  makeEvent(eventPrincipal, aux);
56  }
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)

◆ maybeOpenNewLumiSection()

void DAQSource::maybeOpenNewLumiSection ( const uint32_t  lumiSection)
private

Definition at line 327 of file DAQSource.cc.

References currentLumiSection_, edm::Timestamp::invalidTimestamp(), edm::InputSource::luminosityBlock(), edm::InputSource::luminosityBlockAuxiliary(), processHistoryID_, edm::InputSource::resetLuminosityBlockAuxiliary(), edm::InputSource::run(), edm::InputSource::runAuxiliary(), and edm::InputSource::setLuminosityBlockAuxiliary().

Referenced by getNextDataBlock().

327  {
328  if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
329  currentLumiSection_ = lumiSection;
330 
332 
333  timeval tv;
334  gettimeofday(&tv, nullptr);
335  const edm::Timestamp lsopentime((unsigned long long)tv.tv_sec * 1000000 + (unsigned long long)tv.tv_usec);
336 
338  runAuxiliary()->run(), lumiSection, lsopentime, edm::Timestamp::invalidTimestamp());
339 
340  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
341  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
342 
343  edm::LogInfo("DAQSource") << "New lumi section was opened. LUMI -: " << lumiSection;
344  }
345 }
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:261
static Timestamp invalidTimestamp()
Definition: Timestamp.h:75
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:458
edm::ProcessHistoryID processHistoryID_
Definition: DAQSource.h:117
unsigned int currentLumiSection_
Definition: DAQSource.h:119
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:366
Log< level::Info, false > LogInfo
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:374
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:463
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:264

◆ processHistoryID()

edm::ProcessHistoryID& DAQSource::processHistoryID ( )
inline

Definition at line 59 of file DAQSource.h.

References processHistoryID_.

Referenced by DataModeFRD::readEvent(), DataModeScoutingRun3::readEvent(), and DataModeFRDStriped::readEvent().

59 { return processHistoryID_; }
edm::ProcessHistoryID processHistoryID_
Definition: DAQSource.h:117

◆ read()

void DAQSource::read ( edm::EventPrincipal eventPrincipal)
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 548 of file DAQSource.cc.

References checkEvery_, chunkIsFree_, currentFile_, currentFileIndex_, dataMode_, eventsThisLumi_, fileDeleteLock_, filesToDelete_, fms_, freeChunks_, mps_fire::i, evf::FastMonState::inNoRequest, evf::FastMonState::inReadCleanup, evf::FastMonState::inReadEvent, evf::FastMonitoringService::isExceptionOnData(), setMonState(), streamFileTracker_, and edm::EventPrincipal::streamID().

Referenced by edmIntegrityCheck.PublishToFileSystem::get(), and readWorker().

548  {
550 
551  dataMode_->readEvent(eventPrincipal);
552 
553  eventsThisLumi_++;
555 
556  //resize vector if needed
557  while (streamFileTracker_.size() <= eventPrincipal.streamID())
558  streamFileTracker_.push_back(-1);
559 
560  streamFileTracker_[eventPrincipal.streamID()] = currentFileIndex_;
561 
562  //this old file check runs no more often than every 10 events
563  if (!((currentFile_->nProcessed_ - 1) % (checkEvery_))) {
564  //delete files that are not in processing
565  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
566  auto it = filesToDelete_.begin();
567  while (it != filesToDelete_.end()) {
568  bool fileIsBeingProcessed = false;
569  for (unsigned int i = 0; i < streamFileTracker_.size(); i++) {
570  if (it->first == streamFileTracker_.at(i)) {
571  fileIsBeingProcessed = true;
572  break;
573  }
574  }
575  if (!fileIsBeingProcessed && !(fms_ && fms_->isExceptionOnData(it->second->lumi_))) {
576  it = filesToDelete_.erase(it);
577  } else
578  it++;
579  }
580  }
581  if (dataMode_->dataBlockCompleted() && chunkIsFree_) {
582  freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_ - 1]);
583  chunkIsFree_ = false;
584  }
586  return;
587 }
std::unique_ptr< RawInputFile > currentFile_
Definition: DAQSource.h:134
int currentFileIndex_
Definition: DAQSource.h:158
unsigned int checkEvery_
Definition: DAQSource.h:162
bool chunkIsFree_
Definition: DAQSource.h:135
std::mutex fileDeleteLock_
Definition: DAQSource.h:160
StreamID streamID() const
bool isExceptionOnData(unsigned int ls)
unsigned int eventsThisLumi_
Definition: DAQSource.h:122
evf::FastMonitoringService * fms_
Definition: DAQSource.h:87
std::shared_ptr< DataMode > dataMode_
Definition: DAQSource.h:176
std::vector< int > streamFileTracker_
Definition: DAQSource.h:161
tbb::concurrent_queue< InputChunk * > freeChunks_
Definition: DAQSource.h:145
void setMonState(evf::FastMonState::InputState state)
Definition: DAQSource.cc:1328
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
Definition: DAQSource.h:159

◆ readSupervisor()

void DAQSource::readSupervisor ( )
private

possibility to use by new formats

Definition at line 593 of file DAQSource.cc.

References addFile(), alwaysStartFromFirstLS_, cms::cuda::assert(), evf::EvFDaqDirector::buBaseRunDir(), visDQMUpload::buf, currentLumiSection(), cvReader_, cvWakeup_, daqDirector_, dataMode_, eventChunkSize_, fileListMode_, fileQueue_, fms_, freeChunks_, evf::EvFDaqDirector::getEoRFileName(), getFile(), evf::EvFDaqDirector::getLumisectionToStart(), evf::EvFDaqDirector::getNextFromFileBroker(), evf::EvFDaqDirector::getStartLumisectionFromEnv(), mps_fire::i, evf::EvFDaqDirector::inputThrottled(), evf::FastMonState::inRunEnd, evf::FastMonState::inSupBusy, evf::FastMonState::inSupFileLimit, evf::FastMonState::inSupLockPolling, evf::FastMonState::inSupNewFile, evf::FastMonState::inSupNewFileWaitChunk, evf::FastMonState::inSupNewFileWaitChunkCopying, evf::FastMonState::inSupNewFileWaitThread, evf::FastMonState::inSupNewFileWaitThreadCopying, evf::FastMonState::inSupNoFile, evf::FastMonState::inSupWaitFreeChunk, evf::FastMonState::inSupWaitFreeChunkCopying, evf::FastMonState::inSupWaitFreeThread, evf::FastMonState::inSupWaitFreeThreadCopying, createfilelist::int, evf::FastMonState::inThrottled, dqmiolumiharvest::j, LogDebug, eostools::ls(), evf::EvFDaqDirector::lumisectionDiscarded(), SiStripPI::max, maxBufferedFiles_, maxChunkSize_, SiStripPI::min, eostools::move(), mReader_, mWakeup_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, evf::EvFDaqDirector::numConcurrentLumis(), evf::EvFDaqDirector::parseFRDFileHeader(), castor_dqm_sourceclient_file_cfg::path, funct::pow(), quit_threads_, RawInputFile, readingFilesCount_, evf::FastMonitoringService::reportLockWait(), InputChunk::reset(), rng_, evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, setExceptionState_, setMonStateSup(), edm::shutdown_flag, evf::FastMonitoringService::startedLookingForFile(), startupCv_, startupLock_, edm_modernize_messagelogger::stat, mps_update::status, evf::FastMonitoringService::stoppedLookingForFile(), AlCaHLTBitMon_QueryRunRegistry::string, thread_quit_signal, tid_active_, mps_check::timeout, workerJob_, workerPool_, and workerThreads_.

Referenced by checkNext().

593  {
594  bool stop = false;
595  unsigned int currentLumiSection = 0;
596 
597  {
598  std::unique_lock<std::mutex> lk(startupLock_);
599  startupCv_.notify_one();
600  }
601 
602  uint32_t ls = 0;
603  uint32_t monLS = 1;
604  uint32_t lockCount = 0;
605  uint64_t sumLockWaitTimeUs = 0.;
606 
607  bool requireHeader = dataMode_->requireHeader();
608 
609  while (!stop) {
610  //wait for at least one free thread and chunk
611  int counter = 0;
612 
613  while (workerPool_.empty() || freeChunks_.empty() || readingFilesCount_ >= maxBufferedFiles_) {
614  //report state to monitoring
615  if (fms_) {
616  bool copy_active = false;
617  for (auto j : tid_active_)
618  if (j)
619  copy_active = true;
622  else if (freeChunks_.empty()) {
623  if (copy_active)
625  else
627  } else {
628  if (copy_active)
630  else
632  }
633  }
634  std::unique_lock<std::mutex> lkw(mWakeup_);
635  //sleep until woken up by condition or a timeout
636  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
637  counter++;
638  if (!(counter % 6000)) {
639  edm::LogWarning("FedRawDataInputSource")
640  << "No free chunks or threads. Worker pool empty:" << workerPool_.empty()
641  << ", free chunks empty:" << freeChunks_.empty() << ", number of files buffered:" << readingFilesCount_
642  << " / " << maxBufferedFiles_;
643  }
644  LogDebug("DAQSource") << "No free chunks or threads...";
645  } else {
646  assert(!workerPool_.empty() || freeChunks_.empty());
647  }
648  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
649  stop = true;
650  break;
651  }
652  }
653  //if this is reached, there are enough buffers and threads to proceed or processing is instructed to stop
654 
655  if (stop)
656  break;
657 
658  //look for a new file
659  std::string nextFile;
660  int64_t fileSizeFromMetadata;
661 
662  if (fms_) {
665  }
666  bool fitToBuffer = dataMode_->fitToBuffer();
667 
669  uint16_t rawHeaderSize = 0;
670  uint32_t lsFromRaw = 0;
671  int32_t serverEventsInNewFile = -1;
672  int rawFd = -1;
673 
674  int backoff_exp = 0;
675 
676  //entering loop which tries to grab new file from ramdisk
678  //check if hltd has signalled to throttle input
679  counter = 0;
680  while (daqDirector_->inputThrottled()) {
681  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed))
682  break;
683 
684  unsigned int nConcurrentLumis = daqDirector_->numConcurrentLumis();
685  unsigned int nOtherLumis = nConcurrentLumis > 0 ? nConcurrentLumis - 1 : 0;
686  unsigned int checkLumiStart = currentLumiSection > nOtherLumis ? currentLumiSection - nOtherLumis : 1;
687  bool hasDiscardedLumi = false;
688  for (unsigned int i = checkLumiStart; i <= currentLumiSection; i++) {
690  edm::LogWarning("DAQSource") << "Source detected that the lumisection is discarded -: " << i;
691  hasDiscardedLumi = true;
692  break;
693  }
694  }
695  if (hasDiscardedLumi)
696  break;
697 
699  if (!(counter % 50))
700  edm::LogWarning("DAQSource") << "Input throttled detected, reading files is paused...";
701  usleep(100000);
702  counter++;
703  }
704 
705  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
706  stop = true;
707  break;
708  }
709 
710  assert(rawFd == -1);
711  uint64_t thisLockWaitTimeUs = 0.;
713  if (fileListMode_) {
714  //return LS if LS not set, otherwise return file
715  status = getFile(ls, nextFile, thisLockWaitTimeUs);
717  uint16_t rawDataType;
719  rawFd,
720  rawHeaderSize,
721  rawDataType,
722  lsFromRaw,
723  serverEventsInNewFile,
724  fileSizeFromMetadata,
725  requireHeader,
726  false,
727  false) != 0) {
728  //error
729  setExceptionState_ = true;
730  stop = true;
731  break;
732  }
733  }
734  } else {
736  ls,
737  nextFile,
738  rawFd,
739  rawHeaderSize, //which format?
740  serverEventsInNewFile,
741  fileSizeFromMetadata,
742  thisLockWaitTimeUs,
743  requireHeader);
744  }
745 
747 
748  //cycle through all remaining LS even if no files get assigned
751 
752  //monitoring of lock wait time
753  if (thisLockWaitTimeUs > 0.)
754  sumLockWaitTimeUs += thisLockWaitTimeUs;
755  lockCount++;
756  if (ls > monLS) {
757  monLS = ls;
758  if (lockCount)
759  if (fms_)
760  fms_->reportLockWait(monLS, sumLockWaitTimeUs, lockCount);
761  lockCount = 0;
762  sumLockWaitTimeUs = 0;
763  }
764 
766  fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runEnded));
767  stop = true;
768  break;
769  }
770 
771  //error from filelocking function
773  fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runAbort, 0));
774  stop = true;
775  break;
776  }
777  //queue new lumisection
778  if (ls > currentLumiSection) {
779  //new file service
782  //start transitions from LS specified by env, continue if not reached
783  if (ls < daqDirector_->getStartLumisectionFromEnv()) {
784  //skip file if from earlier LS than specified by env
785  if (rawFd != -1) {
786  close(rawFd);
787  rawFd = -1;
788  }
790  continue;
791  } else {
792  fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, ls));
793  }
794  } else if (ls < 100) {
795  //look at last LS file on disk to start from that lumisection (only within first 100 LS)
796  unsigned int lsToStart = daqDirector_->getLumisectionToStart();
797 
798  for (unsigned int nextLS = std::min(lsToStart, ls); nextLS <= ls; nextLS++) {
799  fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, nextLS));
800  }
801  } else {
802  //start from current LS
803  fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, ls));
804  }
805  } else {
806  //queue all lumisections after last one seen to avoid gaps
807  for (unsigned int nextLS = currentLumiSection + 1; nextLS <= ls; nextLS++) {
808  fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, nextLS));
809  }
810  }
812  }
813  //else
815  edm::LogError("DAQSource") << "Got old LS (" << ls
816  << ") file from EvFDAQDirector! Expected LS:" << currentLumiSection
817  << ". Aborting execution." << std::endl;
818  if (rawFd != -1)
819  close(rawFd);
820  rawFd = -1;
821  fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runAbort, 0));
822  stop = true;
823  break;
824  }
825 
826  int dbgcount = 0;
829  dbgcount++;
830  if (!(dbgcount % 20))
831  LogDebug("DAQSource") << "No file for me... sleep and try again...";
832 
833  backoff_exp = std::min(4, backoff_exp); // max 1.6 seconds
834  //backoff_exp=0; // disabled!
835  int sleeptime = (int)(100000. * pow(2, backoff_exp));
836  usleep(sleeptime);
837  backoff_exp++;
838  } else
839  backoff_exp = 0;
840  }
841  //end of file grab loop, parse result
844  LogDebug("DAQSource") << "The director says to grab -: " << nextFile;
845 
846  std::string rawFile;
847  //file service will report raw extension
848  rawFile = nextFile;
849 
850  struct stat st;
851  int stat_res = stat(rawFile.c_str(), &st);
852  if (stat_res == -1) {
853  edm::LogError("DAQSource") << "Can not stat file (" << errno << "):-" << rawFile << std::endl;
854  setExceptionState_ = true;
855  break;
856  }
857  uint64_t fileSize = st.st_size;
858 
859  if (fms_) {
863  }
864  int eventsInNewFile;
865  if (fileListMode_) {
866  if (fileSize == 0)
867  eventsInNewFile = 0;
868  else
869  eventsInNewFile = -1;
870  } else {
871  eventsInNewFile = serverEventsInNewFile;
872  assert(eventsInNewFile >= 0);
873  assert((eventsInNewFile > 0) ==
874  (fileSize > rawHeaderSize)); //file without events must be empty or contain only header
875  }
876 
877  std::pair<bool, std::vector<std::string>> additionalFiles =
878  dataMode_->defineAdditionalFiles(rawFile, fileListMode_);
879  if (!additionalFiles.first) {
880  //skip secondary files from file broker
881  if (rawFd > -1)
882  close(rawFd);
883  continue;
884  }
885 
886  std::unique_ptr<RawInputFile> newInputFile(new RawInputFile(evf::EvFDaqDirector::FileStatus::newFile,
887  ls,
888  rawFile,
889  !fileListMode_,
890  rawFd,
891  fileSize,
892  rawHeaderSize, //for which format
893  0,
894  eventsInNewFile,
895  this));
896 
897  uint64_t neededSize = fileSize;
898  for (const auto& addFile : additionalFiles.second) {
899  struct stat buf;
900  //wait for secondary files to appear
901  unsigned int fcnt = 0;
902  while (stat(addFile.c_str(), &buf) != 0) {
903  if (fileListMode_) {
904  edm::LogError("DAQSource") << "additional file is missing -: " << addFile;
905  stop = true;
906  setExceptionState_ = true;
907  break;
908  }
909  usleep(10000);
910  fcnt++;
911  //report and EoR check every 30 seconds
912  if ((fcnt && fcnt % 3000 == 0) || quit_threads_.load(std::memory_order_relaxed)) {
913  edm::LogWarning("DAQSource") << "Additional file is still missing after 30 seconds -: " << addFile;
914  struct stat bufEoR;
915  auto secondaryPath = std::filesystem::path(addFile).parent_path();
917  std::string mainEoR = (std::filesystem::path(daqDirector_->buBaseRunDir()) / eorName).generic_string();
918  std::string secondaryEoR = (secondaryPath / eorName).generic_string();
919  bool prematureEoR = false;
920  if (stat(secondaryEoR.c_str(), &bufEoR) == 0) {
921  if (stat(addFile.c_str(), &bufEoR) != 0) {
922  edm::LogError("DAQSource")
923  << "EoR file appeared in -: " << secondaryPath << " while waiting for index file " << addFile;
924  prematureEoR = true;
925  }
926  } else if (stat(mainEoR.c_str(), &bufEoR) == 0) {
927  //wait another 10 seconds
928  usleep(10000000);
929  if (stat(addFile.c_str(), &bufEoR) != 0) {
930  edm::LogError("DAQSource")
931  << "Main EoR file appeared -: " << mainEoR << " while waiting for index file " << addFile;
932  prematureEoR = true;
933  }
934  }
935  if (prematureEoR) {
936  //queue EoR since this is not FU error
937  fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runEnded, 0));
938  stop = true;
939  break;
940  }
941  }
942 
943  if (quit_threads_) {
944  edm::LogError("DAQSource") << "Quitting while waiting for file -: " << addFile;
945  stop = true;
946  setExceptionState_ = true;
947  break;
948  }
949  }
950  LogDebug("DAQSource") << " APPEND NAME " << addFile;
951  if (stop)
952  break;
953 
954  newInputFile->appendFile(addFile, buf.st_size);
955  neededSize += buf.st_size;
956  }
957  if (stop)
958  break;
959 
960  //calculate number of needed chunks and size if resizing will be applied
961  uint16_t neededChunks;
962  uint64_t chunkSize;
963 
964  if (fitToBuffer) {
965  chunkSize = std::min(maxChunkSize_, std::max(eventChunkSize_, neededSize));
966  neededChunks = 1;
967  } else {
968  chunkSize = eventChunkSize_;
969  neededChunks = neededSize / eventChunkSize_ + uint16_t((neededSize % eventChunkSize_) > 0);
970  }
971  newInputFile->setChunks(neededChunks);
972 
973  newInputFile->randomizeOrder(rng_);
974 
976  auto newInputFilePtr = newInputFile.get();
977  fileQueue_.push(std::move(newInputFile));
978 
979  for (size_t i = 0; i < neededChunks; i++) {
980  if (fms_) {
981  bool copy_active = false;
982  for (auto j : tid_active_)
983  if (j)
984  copy_active = true;
985  if (copy_active)
987  else
989  }
990  //get thread
991  unsigned int newTid = 0xffffffff;
992  while (!workerPool_.try_pop(newTid)) {
993  usleep(100000);
994  if (quit_threads_.load(std::memory_order_relaxed)) {
995  stop = true;
996  break;
997  }
998  }
999 
1000  if (fms_) {
1001  bool copy_active = false;
1002  for (auto j : tid_active_)
1003  if (j)
1004  copy_active = true;
1005  if (copy_active)
1007  else
1009  }
1010  InputChunk* newChunk = nullptr;
1011  while (!freeChunks_.try_pop(newChunk)) {
1012  usleep(100000);
1013  if (quit_threads_.load(std::memory_order_relaxed)) {
1014  stop = true;
1015  break;
1016  }
1017  }
1018 
1019  if (newChunk == nullptr) {
1020  //return unused tid if we received shutdown (nullptr chunk)
1021  if (newTid != 0xffffffff)
1022  workerPool_.push(newTid);
1023  stop = true;
1024  break;
1025  }
1026  if (stop)
1027  break;
1029 
1030  std::unique_lock<std::mutex> lk(mReader_);
1031 
1032  uint64_t toRead = chunkSize;
1033  if (i == (uint64_t)neededChunks - 1 && neededSize % chunkSize)
1034  toRead = neededSize % chunkSize;
1035  newChunk->reset(i * chunkSize, toRead, i);
1036 
1037  workerJob_[newTid].first = newInputFilePtr;
1038  workerJob_[newTid].second = newChunk;
1039 
1040  //wake up the worker thread
1041  cvReader_[newTid]->notify_one();
1042  }
1043  }
1044  }
1046  //make sure threads finish reading
1047  unsigned int numFinishedThreads = 0;
1048  while (numFinishedThreads < workerThreads_.size()) {
1049  unsigned int tid = 0;
1050  while (!workerPool_.try_pop(tid)) {
1051  usleep(10000);
1052  }
1053  std::unique_lock<std::mutex> lk(mReader_);
1054  thread_quit_signal[tid] = true;
1055  cvReader_[tid]->notify_one();
1056  numFinishedThreads++;
1057  }
1058  for (unsigned int i = 0; i < workerThreads_.size(); i++) {
1059  workerThreads_[i]->join();
1060  delete workerThreads_[i];
1061  }
1062 }
std::string & buBaseRunDir()
uint64_t maxChunkSize_
Definition: DAQSource.h:92
std::mutex startupLock_
Definition: DAQSource.h:155
std::default_random_engine rng_
Definition: DAQSource.h:124
std::condition_variable startupCv_
Definition: DAQSource.h:156
volatile std::atomic< bool > shutdown_flag
bool lumisectionDiscarded(unsigned int ls)
std::vector< std::thread * > workerThreads_
Definition: DAQSource.h:140
Log< level::Error, false > LogError
std::mutex mReader_
Definition: DAQSource.h:148
unsigned int numConcurrentLumis() const
int timeout
Definition: mps_check.py:53
assert(be >=bs)
std::mutex mWakeup_
Definition: DAQSource.h:165
std::atomic< unsigned int > readingFilesCount_
Definition: DAQSource.h:98
uint64_t eventChunkSize_
Definition: DAQSource.h:91
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint16_t &rawDataType, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs, bool requireHeader=true)
int currentLumiSection() const
Definition: DAQSource.h:52
std::condition_variable cvWakeup_
Definition: DAQSource.h:166
void reset(uint64_t newOffset, uint64_t toRead, unsigned int fileIndex)
unsigned int maxBufferedFiles_
Definition: DAQSource.h:96
evf::FastMonitoringService * fms_
Definition: DAQSource.h:87
std::atomic< bool > quit_threads_
Definition: DAQSource.h:152
const bool alwaysStartFromFirstLS_
Definition: DAQSource.h:101
std::shared_ptr< DataMode > dataMode_
Definition: DAQSource.h:176
tbb::concurrent_queue< std::unique_ptr< RawInputFile > > fileQueue_
Definition: DAQSource.h:146
const bool fileListMode_
Definition: DAQSource.h:109
def ls(path, rec=False)
Definition: eostools.py:349
unsigned int getStartLumisectionFromEnv() const
unsigned long long uint64_t
Definition: Time.h:13
std::string getEoRFileName() const
std::vector< unsigned int > tid_active_
Definition: DAQSource.h:150
void stoppedLookingForFile(unsigned int lumi)
std::vector< unsigned int > thread_quit_signal
Definition: DAQSource.h:153
void setMonStateSup(evf::FastMonState::InputState state)
Definition: DAQSource.cc:1333
friend class RawInputFile
Definition: DAQSource.h:42
bool setExceptionState_
Definition: DAQSource.h:154
std::vector< std::unique_ptr< std::condition_variable > > cvReader_
Definition: DAQSource.h:149
unsigned int getLumisectionToStart() const
evf::EvFDaqDirector * daqDirector_
Definition: DAQSource.h:88
tbb::concurrent_queue< InputChunk * > freeChunks_
Definition: DAQSource.h:145
int addFile(MEStore &micromes, int fd)
Definition: fastHadd.cc:352
tbb::concurrent_queue< unsigned int > workerPool_
Definition: DAQSource.h:142
Log< level::Warning, false > LogWarning
std::vector< ReaderInfo > workerJob_
Definition: DAQSource.h:143
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint64_t &lockWaitTime)
Definition: DAQSource.cc:1434
Power< A, B >::type pow(const A &a, const B &b)
Definition: Power.h:29
def move(src, dest)
Definition: eostools.py:511
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
#define LogDebug(id)

◆ readWorker()

void DAQSource::readWorker ( unsigned int  tid)
private

Definition at line 1064 of file DAQSource.cc.

References addFile(), cms::cuda::assert(), InputChunk::buf_, cvReader_, dataMode_, change_name::diff, mps_fire::end, eventChunkBlock_, eventChunkSize_, geometryDiff::file, InputChunk::fileIndex_, dqmdumpme::first, mps_fire::i, dqmiolumiharvest::j, dqmdumpme::last, LogDebug, maxChunkSize_, SiStripPI::min, mReader_, submitPVValidationJobs::now, numConcurrentReads_, InputChunk::offset_, read(), InputChunk::readComplete_, InputChunk::resize(), alignCSCRings::s, edm::second(), setExceptionState_, InputChunk::size_, runEdmFileComparison::skipped, command_line::start, startupCv_, startupLock_, AlCaHLTBitMon_QueryRunRegistry::string, thread_quit_signal, threadInit_, tid_active_, InputChunk::usedSize_, workerJob_, and workerPool_.

Referenced by DAQSource().

1064  {
1065  bool init = true;
1066  threadInit_.exchange(true, std::memory_order_acquire);
1067 
1068  while (true) {
1069  tid_active_[tid] = false;
1070  std::unique_lock<std::mutex> lk(mReader_);
1071  workerJob_[tid].first = nullptr;
1072  workerJob_[tid].first = nullptr;
1073 
1074  assert(!thread_quit_signal[tid]); //should never get it here
1075  workerPool_.push(tid);
1076 
1077  if (init) {
1078  std::unique_lock<std::mutex> lk(startupLock_);
1079  init = false;
1080  startupCv_.notify_one();
1081  }
1082  cvReader_[tid]->wait(lk);
1083 
1084  if (thread_quit_signal[tid])
1085  return;
1086  tid_active_[tid] = true;
1087 
1088  RawInputFile* file;
1089  InputChunk* chunk;
1090 
1091  assert(workerJob_[tid].first != nullptr && workerJob_[tid].second != nullptr);
1092 
1093  file = workerJob_[tid].first;
1094  chunk = workerJob_[tid].second;
1095 
1096  bool fitToBuffer = dataMode_->fitToBuffer();
1097 
1098  //resize if multi-chunked reading is not possible
1099  if (fitToBuffer) {
1100  uint64_t accum = 0;
1101  for (auto s : file->diskFileSizes_)
1102  accum += s;
1103  if (accum > eventChunkSize_) {
1104  if (!chunk->resize(accum, maxChunkSize_)) {
1105  edm::LogError("DAQSource")
1106  << "maxChunkSize can not accomodate the file set. Try increasing chunk size and/or chunk maximum size.";
1107  if (file->rawFd_ != -1 && (numConcurrentReads_ == 1 || chunk->offset_ == 0))
1108  close(file->rawFd_);
1109  setExceptionState_ = true;
1110  continue;
1111  } else {
1112  edm::LogInfo("DAQSource") << "chunk size was increased to " << (chunk->size_ >> 20) << " MB";
1113  }
1114  }
1115  }
1116 
1117  //skip reading initial header size in first chunk if inheriting file descriptor (already set at appropriate position)
1118  unsigned int bufferLeftInitial = (chunk->offset_ == 0 && file->rawFd_ != -1) ? file->rawHeaderSize_ : 0;
1119  const uint16_t readBlocks = chunk->size_ / eventChunkBlock_ + uint16_t(chunk->size_ % eventChunkBlock_ > 0);
1120 
1121  auto readPrimary = [&](uint64_t bufferLeft) {
1122  //BEGIN reading primary file - check if file descriptor is already open
1123  //in multi-threaded chunked mode, only first thread will use already open fd for reading the first file
1124  //fd will not be closed in other case (used by other threads)
1125  int fileDescriptor = -1;
1126  bool fileOpenedHere = false;
1127 
1128  if (numConcurrentReads_ == 1) {
1129  fileDescriptor = file->rawFd_;
1130  file->rawFd_ = -1;
1131  if (fileDescriptor == -1) {
1132  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1133  fileOpenedHere = true;
1134  }
1135  } else {
1136  if (chunk->offset_ == 0) {
1137  fileDescriptor = file->rawFd_;
1138  file->rawFd_ = -1;
1139  if (fileDescriptor == -1) {
1140  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1141  fileOpenedHere = true;
1142  }
1143  } else {
1144  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1145  fileOpenedHere = true;
1146  }
1147  }
1148 
1149  if (fileDescriptor == -1) {
1150  edm::LogError("DAQSource") << "readWorker failed to open file -: " << file->fileName_
1151  << " fd:" << fileDescriptor << " error: " << strerror(errno);
1152  setExceptionState_ = true;
1153  return;
1154  }
1155 
1156  if (fileOpenedHere) { //fast forward to this chunk position
1157  off_t pos = lseek(fileDescriptor, chunk->offset_, SEEK_SET);
1158  if (pos == -1) {
1159  edm::LogError("DAQSource") << "readWorker failed to seek file -: " << file->fileName_
1160  << " fd:" << fileDescriptor << " to offset " << chunk->offset_
1161  << " error: " << strerror(errno);
1162  setExceptionState_ = true;
1163  return;
1164  }
1165  }
1166 
1167  LogDebug("DAQSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_
1168  << " at offset " << lseek(fileDescriptor, 0, SEEK_CUR);
1169 
1170  size_t skipped = bufferLeft;
1172  for (unsigned int i = 0; i < readBlocks; i++) {
1173  ssize_t last;
1174  edm::LogInfo("DAQSource") << "readWorker read -: " << (int64_t)(chunk->usedSize_ - bufferLeft) << " or "
1175  << (int64_t)eventChunkBlock_;
1176 
1177  //protect against reading into next block
1178  last = ::read(fileDescriptor,
1179  (void*)(chunk->buf_ + bufferLeft),
1180  std::min((int64_t)(chunk->usedSize_ - bufferLeft), (int64_t)eventChunkBlock_));
1181 
1182  if (last < 0) {
1183  edm::LogError("DAQSource") << "readWorker failed to read file -: " << file->fileName_
1184  << " fd:" << fileDescriptor << " last: " << last << " error: " << strerror(errno);
1185  setExceptionState_ = true;
1186  break;
1187  }
1188  if (last > 0) {
1189  bufferLeft += last;
1190  }
1191  if ((uint64_t)last < eventChunkBlock_) { //last read
1192  edm::LogInfo("DAQSource") << "chunkUsedSize" << chunk->usedSize_ << " u-s:" << (chunk->usedSize_ - skipped)
1193  << " ix:" << i * eventChunkBlock_ << " " << (size_t)last;
1194  //check if this is last block if single file, then total read size must match file size
1195  if (file->numFiles_ == 1 && !(chunk->usedSize_ - skipped == i * eventChunkBlock_ + (size_t)last)) {
1196  edm::LogError("DAQSource") << "readWorker failed to read file -: " << file->fileName_
1197  << " fd:" << fileDescriptor << " last:" << last
1198  << " expectedChunkSize:" << chunk->usedSize_
1199  << " readChunkSize:" << (skipped + i * eventChunkBlock_ + last)
1200  << " skipped:" << skipped << " block:" << (i + 1) << "/" << readBlocks
1201  << " error: " << strerror(errno);
1202  setExceptionState_ = true;
1203  }
1204  break;
1205  }
1206  }
1207  if (setExceptionState_)
1208  return;
1209 
1210  file->fileSizes_[0] = bufferLeft;
1211 
1212  if (chunk->offset_ + bufferLeft == file->diskFileSizes_[0] || bufferLeft == chunk->size_) {
1213  //file reading finished using this fd
1214  //or the whole buffer is filled (single sequential file spread over more chunks)
1215  close(fileDescriptor);
1216  fileDescriptor = -1;
1217  } else
1218  assert(fileDescriptor == -1);
1219 
1220  if (fitToBuffer && bufferLeft != file->diskFileSizes_[0]) {
1221  edm::LogError("DAQSource") << "mismatch between read file size for file -: " << file->fileNames_[0]
1222  << " read:" << bufferLeft << " expected:" << file->diskFileSizes_[0];
1223  setExceptionState_ = true;
1224  return;
1225  }
1226 
1228  auto diff = end - start;
1229  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1230  LogDebug("DAQSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1231  << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1232  << " GB/s)";
1233  };
1234  //END primary function
1235 
1236  //SECONDARY files function
1237  auto readSecondary = [&](uint64_t bufferLeft, unsigned int j) {
1238  size_t fileLen = 0;
1239 
1240  std::string const& addFile = file->fileNames_[j];
1241  int fileDescriptor = open(addFile.c_str(), O_RDONLY);
1242 
1243  if (fileDescriptor < 0) {
1244  edm::LogError("DAQSource") << "readWorker failed to open file -: " << addFile << " fd:" << fileDescriptor
1245  << " error: " << strerror(errno);
1246  setExceptionState_ = true;
1247  return;
1248  }
1249 
1250  LogDebug("DAQSource") << "Reader thread opened file -: TID: " << tid << " file: " << addFile << " at offset "
1251  << lseek(fileDescriptor, 0, SEEK_CUR);
1252 
1253  //size_t skipped = 0;//file is newly opened, read with header
1255  for (unsigned int i = 0; i < readBlocks; i++) {
1256  ssize_t last;
1257 
1258  //protect against reading into next block
1259  //use bufferLeft for the write offset
1260  last = ::read(fileDescriptor,
1261  (void*)(chunk->buf_ + bufferLeft),
1262  std::min((uint64_t)file->diskFileSizes_[j], (uint64_t)eventChunkBlock_));
1263 
1264  if (last < 0) {
1265  edm::LogError("DAQSource") << "readWorker failed to read file -: " << addFile << " fd:" << fileDescriptor
1266  << " error: " << strerror(errno);
1267  setExceptionState_ = true;
1268  close(fileDescriptor);
1269  break;
1270  }
1271  if (last > 0) {
1272  bufferLeft += last;
1273  fileLen += last;
1274  file->fileSize_ += last;
1275  }
1276  };
1277 
1278  close(fileDescriptor);
1279  file->fileSizes_[j] = fileLen;
1280  assert(fileLen > 0);
1281 
1282  if (fitToBuffer && fileLen != file->diskFileSizes_[j]) {
1283  edm::LogError("DAQSource") << "mismatch between read file size for file -: " << file->fileNames_[j]
1284  << " read:" << fileLen << " expected:" << file->diskFileSizes_[j];
1285  setExceptionState_ = true;
1286  return;
1287  }
1288 
1290  auto diff = end - start;
1291  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1292  LogDebug("DAQSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1293  << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1294  << " GB/s)";
1295  };
1296 
1297  //randomized order multi-file loop
1298  for (unsigned int j : file->fileOrder_) {
1299  if (j == 0) {
1300  readPrimary(bufferLeftInitial);
1301  } else
1302  readSecondary(file->bufferOffsets_[j], j);
1303 
1304  if (setExceptionState_)
1305  break;
1306  }
1307 
1308  if (setExceptionState_)
1309  continue;
1310 
1311  //detect FRD event version. Skip file Header if it exists
1312  if (dataMode_->dataVersion() == 0 && chunk->offset_ == 0) {
1313  dataMode_->detectVersion(chunk->buf_, file->rawHeaderSize_);
1314  }
1315  assert(dataMode_->versionCheck());
1316 
1317  chunk->readComplete_ =
1318  true; //this is atomic to secure the sequential buffer fill before becoming available for processing)
1319  file->chunks_[chunk->fileIndex_] = chunk; //put the completed chunk in the file chunk vector at predetermined index
1320  }
1321 }
Definition: start.py:1
uint64_t maxChunkSize_
Definition: DAQSource.h:92
std::mutex startupLock_
Definition: DAQSource.h:155
std::condition_variable startupCv_
Definition: DAQSource.h:156
std::atomic< bool > threadInit_
Definition: DAQSource.h:171
void read(edm::EventPrincipal &eventPrincipal) override
Definition: DAQSource.cc:548
Log< level::Error, false > LogError
std::mutex mReader_
Definition: DAQSource.h:148
assert(be >=bs)
uint64_t eventChunkSize_
Definition: DAQSource.h:91
U second(std::pair< T, U > const &p)
uint64_t eventChunkBlock_
Definition: DAQSource.h:93
unsigned char * buf_
std::shared_ptr< DataMode > dataMode_
Definition: DAQSource.h:176
unsigned int fileIndex_
Log< level::Info, false > LogInfo
std::atomic< bool > readComplete_
Definition: init.py:1
unsigned long long uint64_t
Definition: Time.h:13
std::vector< unsigned int > tid_active_
Definition: DAQSource.h:150
std::vector< unsigned int > thread_quit_signal
Definition: DAQSource.h:153
bool setExceptionState_
Definition: DAQSource.h:154
std::vector< std::unique_ptr< std::condition_variable > > cvReader_
Definition: DAQSource.h:149
unsigned int numConcurrentReads_
Definition: DAQSource.h:97
int addFile(MEStore &micromes, int fd)
Definition: fastHadd.cc:352
tbb::concurrent_queue< unsigned int > workerPool_
Definition: DAQSource.h:142
bool resize(uint64_t wantedSize, uint64_t maxSize)
std::vector< ReaderInfo > workerJob_
Definition: DAQSource.h:143
#define LogDebug(id)

◆ reportEventsThisLumiInSource()

void DAQSource::reportEventsThisLumiInSource ( unsigned int  lumi,
unsigned int  events 
)
private

Definition at line 1379 of file DAQSource.cc.

References events, CommonMethods::lock(), monlock_, and sourceEventsReport_.

Referenced by checkNext(), and getNextDataBlock().

1379  {
1380  std::lock_guard<std::mutex> lock(monlock_);
1381  auto itr = sourceEventsReport_.find(lumi);
1382  if (itr != sourceEventsReport_.end())
1383  itr->second += events;
1384  else
1386 }
std::map< unsigned int, unsigned int > sourceEventsReport_
Definition: DAQSource.h:173
int events
std::mutex monlock_
Definition: DAQSource.h:174

◆ rewind_()

void DAQSource::rewind_ ( )
overrideprivatevirtual

Reimplemented from edm::InputSource.

Definition at line 589 of file DAQSource.cc.

589 {}

◆ setMonState()

void DAQSource::setMonState ( evf::FastMonState::InputState  state)
protected

Definition at line 1328 of file DAQSource.cc.

References fms_, evf::FastMonitoringService::setInState(), and edm::InputSource::state().

Referenced by RawInputFile::advance(), checkNext(), getNextDataBlock(), getNextEventFromDataBlock(), and read().

1328  {
1329  if (fms_)
1330  fms_->setInState(state);
1331 }
evf::FastMonitoringService * fms_
Definition: DAQSource.h:87
ItemTypeInfo state() const
Definition: InputSource.h:361
void setInState(FastMonState::InputState inputState)

◆ setMonStateSup()

void DAQSource::setMonStateSup ( evf::FastMonState::InputState  state)
protected

Definition at line 1333 of file DAQSource.cc.

References fms_, evf::FastMonitoringService::setInStateSup(), and edm::InputSource::state().

Referenced by readSupervisor().

1333  {
1334  if (fms_)
1336 }
evf::FastMonitoringService * fms_
Definition: DAQSource.h:87
void setInStateSup(FastMonState::InputState inputState)
ItemTypeInfo state() const
Definition: InputSource.h:361

◆ threadError()

void DAQSource::threadError ( )
private

Definition at line 1323 of file DAQSource.cc.

References Exception, and quit_threads_.

Referenced by RawInputFile::advance(), and getNextDataBlock().

1323  {
1324  quit_threads_ = true;
1325  throw cms::Exception("DAQSource:threadError") << " file reader thread error ";
1326 }
std::atomic< bool > quit_threads_
Definition: DAQSource.h:152

◆ useL1EventID()

bool DAQSource::useL1EventID ( ) const
inline

Definition at line 51 of file DAQSource.h.

References useL1EventID_.

Referenced by DataModeFRD::readEvent(), and DataModeFRDStriped::readEvent().

51 { return useL1EventID_; }
const bool useL1EventID_
Definition: DAQSource.h:103

Friends And Related Function Documentation

◆ InputChunk

friend struct InputChunk
friend

Definition at line 43 of file DAQSource.h.

Referenced by DAQSource().

◆ RawInputFile

friend class RawInputFile
friend

Definition at line 42 of file DAQSource.h.

Referenced by readSupervisor().

Member Data Documentation

◆ alwaysStartFromFirstLS_

const bool DAQSource::alwaysStartFromFirstLS_
private

Definition at line 101 of file DAQSource.h.

Referenced by readSupervisor().

◆ checkEvery_

unsigned int DAQSource::checkEvery_ = 10
private

Definition at line 162 of file DAQSource.h.

Referenced by read().

◆ chunkIsFree_

bool DAQSource::chunkIsFree_ = false
private

Definition at line 135 of file DAQSource.h.

Referenced by getNextDataBlock(), and read().

◆ currentFile_

std::unique_ptr<RawInputFile> DAQSource::currentFile_
private

Definition at line 134 of file DAQSource.h.

Referenced by getNextDataBlock(), getNextEventFromDataBlock(), read(), and ~DAQSource().

◆ currentFileIndex_

int DAQSource::currentFileIndex_ = -1
private

Definition at line 158 of file DAQSource.h.

Referenced by getNextDataBlock(), and read().

◆ currentLumiSection_

unsigned int DAQSource::currentLumiSection_
private

◆ cvReader_

std::vector<std::unique_ptr<std::condition_variable> > DAQSource::cvReader_
private

Definition at line 149 of file DAQSource.h.

Referenced by DAQSource(), readSupervisor(), readWorker(), and ~DAQSource().

◆ cvWakeup_

std::condition_variable DAQSource::cvWakeup_
private

Definition at line 166 of file DAQSource.h.

Referenced by getNextDataBlock(), and readSupervisor().

◆ daqDirector_

evf::EvFDaqDirector* DAQSource::daqDirector_ = nullptr
private

Definition at line 88 of file DAQSource.h.

Referenced by checkNext(), DAQSource(), getNextDataBlock(), and readSupervisor().

◆ dataArrangerThread_

std::unique_ptr<std::thread> DAQSource::dataArrangerThread_
private

Definition at line 139 of file DAQSource.h.

◆ dataMode_

std::shared_ptr<DataMode> DAQSource::dataMode_
private

◆ dataModeConfig_

const std::string DAQSource::dataModeConfig_
private

Definition at line 90 of file DAQSource.h.

Referenced by DAQSource().

◆ eventChunkBlock_

uint64_t DAQSource::eventChunkBlock_
private

Definition at line 93 of file DAQSource.h.

Referenced by DAQSource(), and readWorker().

◆ eventChunkSize_

uint64_t DAQSource::eventChunkSize_
private

Definition at line 91 of file DAQSource.h.

Referenced by DAQSource(), readSupervisor(), and readWorker().

◆ eventRunNumber_

uint32_t DAQSource::eventRunNumber_ = 0
private

Definition at line 120 of file DAQSource.h.

Referenced by checkNext(), and eventRunNumber().

◆ eventsThisLumi_

unsigned int DAQSource::eventsThisLumi_
private

Definition at line 122 of file DAQSource.h.

Referenced by checkNext(), getNextDataBlock(), and read().

◆ eventsThisRun_

unsigned long DAQSource::eventsThisRun_ = 0
private

Definition at line 123 of file DAQSource.h.

◆ fileDeleteLock_

std::mutex DAQSource::fileDeleteLock_
private

Definition at line 160 of file DAQSource.h.

Referenced by DAQSource(), getNextDataBlock(), read(), and ~DAQSource().

◆ fileDescriptor_

int DAQSource::fileDescriptor_ = -1
private

Definition at line 169 of file DAQSource.h.

◆ fileListIndex_

unsigned int DAQSource::fileListIndex_ = 0
private

Definition at line 110 of file DAQSource.h.

Referenced by getFile().

◆ fileListLoopMode_

const bool DAQSource::fileListLoopMode_
private

Definition at line 111 of file DAQSource.h.

Referenced by checkNext(), DAQSource(), fileListLoopMode(), and getFile().

◆ fileListMode_

const bool DAQSource::fileListMode_
private

Definition at line 109 of file DAQSource.h.

Referenced by checkNext(), DAQSource(), getNextDataBlock(), and readSupervisor().

◆ fileQueue_

tbb::concurrent_queue<std::unique_ptr<RawInputFile> > DAQSource::fileQueue_
private

Definition at line 146 of file DAQSource.h.

Referenced by getNextDataBlock(), and readSupervisor().

◆ filesToDelete_

std::list<std::pair<int, std::unique_ptr<InputFile> > > DAQSource::filesToDelete_
private

Definition at line 159 of file DAQSource.h.

Referenced by DAQSource(), getNextDataBlock(), read(), and ~DAQSource().

◆ fms_

evf::FastMonitoringService* DAQSource::fms_ = nullptr
private

◆ freeChunks_

tbb::concurrent_queue<InputChunk*> DAQSource::freeChunks_
private

Definition at line 145 of file DAQSource.h.

Referenced by DAQSource(), getNextDataBlock(), read(), and readSupervisor().

◆ fuOutputDir_

std::string DAQSource::fuOutputDir_
private

Definition at line 115 of file DAQSource.h.

◆ GTPEventID_

uint32_t DAQSource::GTPEventID_ = 0
private

Definition at line 121 of file DAQSource.h.

◆ listFileNames_

std::vector<std::string> DAQSource::listFileNames_
private

Definition at line 105 of file DAQSource.h.

Referenced by getFile(), and initFileList().

◆ loopModeIterationInc_

unsigned int DAQSource::loopModeIterationInc_ = 0
private

Definition at line 112 of file DAQSource.h.

Referenced by getFile().

◆ maxBufferedFiles_

unsigned int DAQSource::maxBufferedFiles_
private

Definition at line 96 of file DAQSource.h.

Referenced by readSupervisor().

◆ maxChunkSize_

uint64_t DAQSource::maxChunkSize_
private

Definition at line 92 of file DAQSource.h.

Referenced by DAQSource(), readSupervisor(), and readWorker().

◆ monlock_

std::mutex DAQSource::monlock_
private

Definition at line 174 of file DAQSource.h.

Referenced by getEventReport(), and reportEventsThisLumiInSource().

◆ mReader_

std::mutex DAQSource::mReader_
private

Definition at line 148 of file DAQSource.h.

Referenced by readSupervisor(), readWorker(), and ~DAQSource().

◆ mWakeup_

std::mutex DAQSource::mWakeup_
private

Definition at line 165 of file DAQSource.h.

Referenced by getNextDataBlock(), and readSupervisor().

◆ numBuffers_

unsigned int DAQSource::numBuffers_
private

Definition at line 95 of file DAQSource.h.

Referenced by DAQSource().

◆ numConcurrentReads_

unsigned int DAQSource::numConcurrentReads_
private

Definition at line 97 of file DAQSource.h.

Referenced by DAQSource(), and readWorker().

◆ processHistoryID_

edm::ProcessHistoryID DAQSource::processHistoryID_
private

Definition at line 117 of file DAQSource.h.

Referenced by DAQSource(), maybeOpenNewLumiSection(), and processHistoryID().

◆ quit_threads_

std::atomic<bool> DAQSource::quit_threads_
private

Definition at line 152 of file DAQSource.h.

Referenced by DAQSource(), readSupervisor(), threadError(), and ~DAQSource().

◆ readBlocks_

unsigned int DAQSource::readBlocks_
private

Definition at line 94 of file DAQSource.h.

Referenced by DAQSource().

◆ readingFilesCount_

std::atomic<unsigned int> DAQSource::readingFilesCount_
private

Definition at line 98 of file DAQSource.h.

Referenced by DAQSource(), getNextDataBlock(), and readSupervisor().

◆ readSupervisorThread_

std::unique_ptr<std::thread> DAQSource::readSupervisorThread_
private

Definition at line 138 of file DAQSource.h.

Referenced by checkNext(), and ~DAQSource().

◆ rng_

std::default_random_engine DAQSource::rng_
private

Definition at line 124 of file DAQSource.h.

Referenced by readSupervisor().

◆ runNumber_

edm::RunNumber_t DAQSource::runNumber_
private

Definition at line 114 of file DAQSource.h.

Referenced by checkNext(), and DAQSource().

◆ setExceptionState_

bool DAQSource::setExceptionState_ = false
private

Definition at line 154 of file DAQSource.h.

Referenced by exceptionState(), getNextDataBlock(), readSupervisor(), and readWorker().

◆ sourceEventsReport_

std::map<unsigned int, unsigned int> DAQSource::sourceEventsReport_
private

Definition at line 173 of file DAQSource.h.

Referenced by getEventReport(), and reportEventsThisLumiInSource().

◆ startedSupervisorThread_

bool DAQSource::startedSupervisorThread_ = false
private

Definition at line 137 of file DAQSource.h.

Referenced by checkNext(), and ~DAQSource().

◆ startupCv_

std::condition_variable DAQSource::startupCv_
private

Definition at line 156 of file DAQSource.h.

Referenced by checkNext(), DAQSource(), readSupervisor(), and readWorker().

◆ startupLock_

std::mutex DAQSource::startupLock_
private

Definition at line 155 of file DAQSource.h.

Referenced by checkNext(), DAQSource(), readSupervisor(), and readWorker().

◆ streamFileTracker_

std::vector<int> DAQSource::streamFileTracker_
private

Definition at line 161 of file DAQSource.h.

Referenced by read().

◆ testTCDSFEDRange_

const std::vector<unsigned int> DAQSource::testTCDSFEDRange_
private

Definition at line 104 of file DAQSource.h.

Referenced by DAQSource().

◆ thread_quit_signal

std::vector<unsigned int> DAQSource::thread_quit_signal
private

Definition at line 153 of file DAQSource.h.

Referenced by DAQSource(), readSupervisor(), readWorker(), and ~DAQSource().

◆ threadInit_

std::atomic<bool> DAQSource::threadInit_
private

Definition at line 171 of file DAQSource.h.

Referenced by readWorker().

◆ tid_active_

std::vector<unsigned int> DAQSource::tid_active_
private

Definition at line 150 of file DAQSource.h.

Referenced by DAQSource(), readSupervisor(), and readWorker().

◆ useFileBroker_

bool DAQSource::useFileBroker_
private

Definition at line 106 of file DAQSource.h.

◆ useL1EventID_

const bool DAQSource::useL1EventID_
private

Definition at line 103 of file DAQSource.h.

Referenced by useL1EventID().

◆ verifyChecksum_

const bool DAQSource::verifyChecksum_
private

Definition at line 102 of file DAQSource.h.

Referenced by getNextEventFromDataBlock().

◆ workerJob_

std::vector<ReaderInfo> DAQSource::workerJob_
private

Definition at line 143 of file DAQSource.h.

Referenced by DAQSource(), readSupervisor(), and readWorker().

◆ workerPool_

tbb::concurrent_queue<unsigned int> DAQSource::workerPool_
private

Definition at line 142 of file DAQSource.h.

Referenced by readSupervisor(), and readWorker().

◆ workerThreads_

std::vector<std::thread*> DAQSource::workerThreads_
private

Definition at line 140 of file DAQSource.h.

Referenced by DAQSource(), readSupervisor(), and ~DAQSource().