CMS 3D CMS Logo

List of all members | Public Member Functions | Static Public Member Functions | Protected Member Functions | Private Types | Private Member Functions | Private Attributes | Friends
DAQSource Class Reference

#include <DAQSource.h>

Inheritance diagram for DAQSource:
edm::RawInputSource edm::InputSource

Public Member Functions

int currentLumiSection () const
 
 DAQSource (edm::ParameterSet const &, edm::InputSourceDescription const &)
 
int eventRunNumber () const
 
bool fileListLoopMode ()
 
std::pair< bool, unsigned int > getEventReport (unsigned int lumi, bool erase)
 
void makeEventWrapper (edm::EventPrincipal &eventPrincipal, edm::EventAuxiliary &aux)
 
edm::ProcessHistoryIDprocessHistoryID ()
 
bool useL1EventID () const
 
 ~DAQSource () override
 
- Public Member Functions inherited from edm::RawInputSource
 RawInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
 
 ~RawInputSource () override
 
- Public Member Functions inherited from edm::InputSource
std::shared_ptr< ActivityRegistryactReg () const
 Accessor for Activity Registry. More...
 
std::shared_ptr< BranchIDListHelper const > branchIDListHelper () const
 Accessors for branchIDListHelper. More...
 
std::shared_ptr< BranchIDListHelper > & branchIDListHelper ()
 
void closeFile (FileBlock *, bool cleaningUpAfterException)
 close current file More...
 
void doBeginJob ()
 Called by framework at beginning of job. More...
 
virtual void doBeginLumi (LuminosityBlockPrincipal &lbp, ProcessContext const *)
 Called by framework at beginning of lumi block. More...
 
virtual void doBeginRun (RunPrincipal &rp, ProcessContext const *)
 Called by framework at beginning of run. More...
 
void doEndJob ()
 Called by framework at end of job. More...
 
void fillProcessBlockHelper ()
 Fill the ProcessBlockHelper with info for the current file. More...
 
ProcessingController::ForwardState forwardState () const
 
bool goToEvent (EventID const &eventID)
 
 InputSource (ParameterSet const &, InputSourceDescription const &)
 Constructor. More...
 
 InputSource (InputSource const &)=delete
 
void issueReports (EventID const &eventID, StreamID streamID)
 issue an event report More...
 
LuminosityBlockNumber_t luminosityBlock () const
 Accessor for current luminosity block number. More...
 
std::shared_ptr< LuminosityBlockAuxiliaryluminosityBlockAuxiliary () const
 Called by the framework to merge or insert lumi in principal cache. More...
 
int maxEvents () const
 
int maxLuminosityBlocks () const
 
ModuleDescription const & moduleDescription () const
 Accessor for 'module' description. More...
 
ItemType nextItemType ()
 Advances the source to the next item. More...
 
bool nextProcessBlock (ProcessBlockPrincipal &)
 Next process block, return false if there is none, sets the processName in the principal. More...
 
InputSourceoperator= (InputSource const &)=delete
 
std::shared_ptr< ProcessBlockHelper const > processBlockHelper () const
 Accessors for processBlockHelper. More...
 
std::shared_ptr< ProcessBlockHelper > & processBlockHelper ()
 
ProcessConfiguration const & processConfiguration () const
 Accessor for Process Configuration. More...
 
std::string const & processGUID () const
 Accessor for global process identifier. More...
 
ProcessHistoryRegistry const & processHistoryRegistry () const
 Accessors for process history registry. More...
 
ProcessHistoryRegistryprocessHistoryRegistry ()
 
ProcessingMode processingMode () const
 RunsLumisAndEvents (default), RunsAndLumis, or Runs. More...
 
std::shared_ptr< ProductRegistry const > productRegistry () const
 Accessors for product registry. More...
 
bool randomAccess () const
 
void readAndMergeLumi (LuminosityBlockPrincipal &lbp)
 Read next luminosity block (same as a prior lumi) More...
 
void readAndMergeRun (RunPrincipal &rp)
 Read next run (same as a prior run) More...
 
void readEvent (EventPrincipal &ep, StreamContext &)
 Read next event. More...
 
bool readEvent (EventPrincipal &ep, EventID const &, StreamContext &)
 Read a specific event. More...
 
std::shared_ptr< FileBlockreadFile ()
 Read next file. More...
 
void readLuminosityBlock (LuminosityBlockPrincipal &lumiPrincipal, HistoryAppender &historyAppender)
 Read next luminosity block (new lumi) More...
 
std::shared_ptr< LuminosityBlockAuxiliaryreadLuminosityBlockAuxiliary ()
 Read next luminosity block Auxilary. More...
 
void readProcessBlock (ProcessBlockPrincipal &)
 Read next process block. More...
 
void readRun (RunPrincipal &runPrincipal, HistoryAppender &historyAppender)
 Read next run (new run) More...
 
std::shared_ptr< RunAuxiliaryreadRunAuxiliary ()
 Read next run Auxiliary. More...
 
ProcessHistoryID const & reducedProcessHistoryID () const
 
virtual void registerProducts ()
 Register any produced products. More...
 
int remainingEvents () const
 
int remainingLuminosityBlocks () const
 
void repeat ()
 Reset the remaining number of events/lumis to the maximum number. More...
 
std::pair< SharedResourcesAcquirer *, std::recursive_mutex * > resourceSharedWithDelayedReader ()
 Returns nullptr if no resource shared between the Source and a DelayedReader. More...
 
ProcessingController::ReverseState reverseState () const
 
void rewind ()
 Begin again at the first event. More...
 
RunNumber_t run () const
 Accessor for current run number. More...
 
std::shared_ptr< RunAuxiliaryrunAuxiliary () const
 Called by the framework to merge or insert run in principal cache. More...
 
void setLuminosityBlockNumber_t (LuminosityBlockNumber_t lb)
 Set the luminosity block ID. More...
 
void setRunNumber (RunNumber_t r)
 Set the run number. More...
 
void skipEvents (int offset)
 
void switchTo (std::shared_ptr< ProductRegistry > iOther)
 switch to a different ProductRegistry. More...
 
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper () const
 Accessors for thinnedAssociationsHelper. More...
 
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper ()
 
Timestamp const & timestamp () const
 Accessor for the current time, as seen by the input source. More...
 
virtual ~InputSource () noexcept(false)
 Destructor. More...
 

Static Public Member Functions

static void fillDescriptions (edm::ConfigurationDescriptions &descriptions)
 
- Static Public Member Functions inherited from edm::RawInputSource
static void fillDescription (ParameterSetDescription &description)
 
- Static Public Member Functions inherited from edm::InputSource
static const std::string & baseType ()
 
static void fillDescription (ParameterSetDescription &desc)
 
static void fillDescriptions (ConfigurationDescriptions &descriptions)
 
static void prevalidate (ConfigurationDescriptions &)
 

Protected Member Functions

Next checkNext () override
 
void read (edm::EventPrincipal &eventPrincipal) override
 
void setMonState (evf::FastMonState::InputState state)
 
void setMonStateSup (evf::FastMonState::InputState state)
 
- Protected Member Functions inherited from edm::RawInputSource
void makeEvent (EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
 
void setInputFileTransitionsEachEvent ()
 
- Protected Member Functions inherited from edm::InputSource
virtual void beginJob ()
 Begin protected makes it easier to do template programming. More...
 
void decreaseRemainingEventsBy (int iSkipped)
 
bool eventCached () const
 
bool newLumi () const
 
bool newRun () const
 
ProcessHistoryRegistryprocessHistoryRegistryForUpdate ()
 
ProductRegistryproductRegistryUpdate ()
 
void reset () const
 
void resetEventCached ()
 
void resetLuminosityBlockAuxiliary (bool isNewLumi=true) const
 
void resetNewLumi ()
 
void resetNewRun ()
 
void resetRunAuxiliary (bool isNewRun=true) const
 
void setEventCached ()
 Called by the framework to merge or ached() const {return eventCached_;}. More...
 
void setLuminosityBlockAuxiliary (LuminosityBlockAuxiliary *lbp)
 
void setNewLumi ()
 
void setNewRun ()
 
void setRunAuxiliary (RunAuxiliary *rp)
 
void setTimestamp (Timestamp const &theTime)
 To set the current time, as seen by the input source. More...
 
virtual void skip (int offset)
 
ItemType state () const
 

Private Types

typedef std::pair< RawInputFile *, InputChunk * > ReaderInfo
 

Private Member Functions

void dataArranger ()
 
bool exceptionState ()
 
evf::EvFDaqDirector::FileStatus getFile (unsigned int &ls, std::string &nextFile, uint64_t &lockWaitTime)
 
evf::EvFDaqDirector::FileStatus getNextDataBlock ()
 
evf::EvFDaqDirector::FileStatus getNextEventFromDataBlock ()
 
long initFileList ()
 
void maybeOpenNewLumiSection (const uint32_t lumiSection)
 
void readSupervisor ()
 
void readWorker (unsigned int tid)
 
void reportEventsThisLumiInSource (unsigned int lumi, unsigned int events)
 
void rewind_ () override
 
void threadError ()
 

Private Attributes

const bool alwaysStartFromFirstLS_
 
unsigned int checkEvery_ = 10
 
bool chunkIsFree_ = false
 
std::unique_ptr< RawInputFilecurrentFile_
 
int currentFileIndex_ = -1
 
unsigned int currentLumiSection_
 
std::vector< std::unique_ptr< std::condition_variable > > cvReader_
 
std::condition_variable cvWakeup_
 
evf::EvFDaqDirectordaqDirector_ = nullptr
 
std::unique_ptr< std::thread > dataArrangerThread_
 
std::shared_ptr< DataModedataMode_
 
const std::string dataModeConfig_
 
uint64_t eventChunkBlock_
 
uint64_t eventChunkSize_
 
uint32_t eventRunNumber_ = 0
 
unsigned int eventsThisLumi_
 
unsigned long eventsThisRun_ = 0
 
std::mutex fileDeleteLock_
 
int fileDescriptor_ = -1
 
unsigned int fileListIndex_ = 0
 
const bool fileListLoopMode_
 
const bool fileListMode_
 
tbb::concurrent_queue< std::unique_ptr< RawInputFile > > fileQueue_
 
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
 
evf::FastMonitoringServicefms_ = nullptr
 
tbb::concurrent_queue< InputChunk * > freeChunks_
 
std::string fuOutputDir_
 
uint32_t GTPEventID_ = 0
 
std::vector< std::string > listFileNames_
 
unsigned int loopModeIterationInc_ = 0
 
unsigned int maxBufferedFiles_
 
uint64_t maxChunkSize_
 
std::mutex monlock_
 
std::mutex mReader_
 
std::mutex mWakeup_
 
unsigned int numBuffers_
 
unsigned int numConcurrentReads_
 
edm::ProcessHistoryID processHistoryID_
 
std::atomic< bool > quit_threads_
 
unsigned int readBlocks_
 
std::atomic< unsigned int > readingFilesCount_
 
std::unique_ptr< std::thread > readSupervisorThread_
 
std::default_random_engine rng_
 
edm::RunNumber_t runNumber_
 
bool setExceptionState_ = false
 
std::map< unsigned int, unsigned int > sourceEventsReport_
 
bool startedSupervisorThread_ = false
 
std::condition_variable startupCv_
 
std::mutex startupLock_
 
std::vector< int > streamFileTracker_
 
const std::vector< unsigned int > testTCDSFEDRange_
 
std::vector< unsigned int > thread_quit_signal
 
std::atomic< bool > threadInit_
 
std::vector< unsigned int > tid_active_
 
bool useFileBroker_
 
const bool useL1EventID_
 
const bool verifyChecksum_
 
std::vector< ReaderInfoworkerJob_
 
tbb::concurrent_queue< unsigned int > workerPool_
 
std::vector< std::thread * > workerThreads_
 

Friends

struct InputChunk
 
class RawInputFile
 

Additional Inherited Members

- Public Types inherited from edm::RawInputSource
enum  Next { Next::kEvent, Next::kFile, Next::kStop }
 
- Public Types inherited from edm::InputSource
enum  ItemType {
  IsInvalid, IsStop, IsFile, IsRun,
  IsLumi, IsEvent, IsRepeat, IsSynchronize
}
 
enum  ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents }
 
- Public Attributes inherited from edm::InputSource
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> postEventReadFromSourceSignal_
 
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> preEventReadFromSourceSignal_
 

Detailed Description

Definition at line 41 of file DAQSource.h.

Member Typedef Documentation

◆ ReaderInfo

typedef std::pair<RawInputFile*, InputChunk*> DAQSource::ReaderInfo
private

Definition at line 132 of file DAQSource.h.

Constructor & Destructor Documentation

◆ DAQSource()

DAQSource::DAQSource ( edm::ParameterSet const &  pset,
edm::InputSourceDescription const &  desc 
)
explicit

Definition at line 32 of file DAQSource.cc.

References cms::cuda::assert(), edm::Timestamp::beginOfTime(), crc32c_hw_test(), cvReader_, daqDirector_, dataMode_, dataModeConfig_, eventChunkBlock_, eventChunkSize_, Exception, fileDeleteLock_, fileListLoopMode_, fileListMode_, filesToDelete_, fms_, freeChunks_, evf::EvFDaqDirector::getBUBaseDirs(), mps_fire::i, evf::FastMonState::inInit, initFileList(), InputChunk, edm::Timestamp::invalidTimestamp(), maxChunkSize_, FEDNumbering::MAXTCDSuTCAFEDID, FEDNumbering::MINTCDSuTCAFEDID, numBuffers_, numConcurrentReads_, Utilities::operator, evf::EvFDaqDirector::overrideRunNumber(), processHistoryID_, edm::InputSource::processHistoryRegistryForUpdate(), edm::InputSource::productRegistryUpdate(), muonDTDigis_cfi::pset, quit_threads_, readBlocks_, readingFilesCount_, readWorker(), edm::InputSource::runAuxiliary(), runNumber_, evf::EvFDaqDirector::runString(), evf::EvFDaqDirector::setDeleteTracking(), evf::EvFDaqDirector::setFMS(), evf::FastMonitoringService::setInputSource(), evf::FastMonitoringService::setInState(), evf::FastMonitoringService::setInStateSup(), edm::InputSource::setNewRun(), edm::InputSource::setRunAuxiliary(), startupCv_, startupLock_, testTCDSFEDRange_, thread_quit_signal, tid_active_, workerJob_, and workerThreads_.

34  dataModeConfig_(pset.getUntrackedParameter<std::string>("dataMode")),
35  eventChunkSize_(uint64_t(pset.getUntrackedParameter<unsigned int>("eventChunkSize")) << 20),
36  maxChunkSize_(uint64_t(pset.getUntrackedParameter<unsigned int>("maxChunkSize")) << 20),
37  eventChunkBlock_(uint64_t(pset.getUntrackedParameter<unsigned int>("eventChunkBlock")) << 20),
38  numBuffers_(pset.getUntrackedParameter<unsigned int>("numBuffers")),
39  maxBufferedFiles_(pset.getUntrackedParameter<unsigned int>("maxBufferedFiles")),
40  alwaysStartFromFirstLS_(pset.getUntrackedParameter<bool>("alwaysStartFromFirstLS", false)),
41  verifyChecksum_(pset.getUntrackedParameter<bool>("verifyChecksum")),
42  useL1EventID_(pset.getUntrackedParameter<bool>("useL1EventID")),
43  testTCDSFEDRange_(pset.getUntrackedParameter<std::vector<unsigned int>>("testTCDSFEDRange")),
44  listFileNames_(pset.getUntrackedParameter<std::vector<std::string>>("fileNames")),
45  fileListMode_(pset.getUntrackedParameter<bool>("fileListMode")),
46  fileListLoopMode_(pset.getUntrackedParameter<bool>("fileListLoopMode", false)),
50  eventsThisLumi_(0),
51  rng_(std::chrono::system_clock::now().time_since_epoch().count()) {
52  char thishost[256];
53  gethostname(thishost, 255);
54 
55  if (maxChunkSize_ == 0)
57  else if (maxChunkSize_ < eventChunkSize_)
58  throw cms::Exception("DAQSource::DAQSource") << "maxChunkSize must be equal or larger than eventChunkSize";
59 
60  if (eventChunkBlock_ == 0)
63  throw cms::Exception("DAQSource::DAQSource") << "eventChunkBlock must be equal or smaller than eventChunkSize";
64 
65  edm::LogInfo("DAQSource") << "Construction. read-ahead chunk size -: " << std::endl
66  << (eventChunkSize_ >> 20) << " MB on host " << thishost << " in mode " << dataModeConfig_;
67 
68  uint16_t MINTCDSuTCAFEDID = FEDNumbering::MINTCDSuTCAFEDID;
69  uint16_t MAXTCDSuTCAFEDID = FEDNumbering::MAXTCDSuTCAFEDID;
70 
71  if (!testTCDSFEDRange_.empty()) {
72  if (testTCDSFEDRange_.size() != 2) {
73  throw cms::Exception("DAQSource::DAQSource") << "Invalid TCDS Test FED range parameter";
74  }
75  MINTCDSuTCAFEDID = testTCDSFEDRange_[0];
76  MAXTCDSuTCAFEDID = testTCDSFEDRange_[1];
77  }
78 
79  //load mode class based on parameter
80  if (dataModeConfig_ == "FRD") {
81  dataMode_.reset(new DataModeFRD(this));
82  } else if (dataModeConfig_ == "FRDStriped") {
83  dataMode_.reset(new DataModeFRDStriped(this));
84  } else
85  throw cms::Exception("DAQSource::DAQSource") << "Unknown data mode " << dataModeConfig_;
86 
88 
89  dataMode_->setTCDSSearchRange(MINTCDSuTCAFEDID, MAXTCDSuTCAFEDID);
90  dataMode_->setTesting(pset.getUntrackedParameter<bool>("testing", false));
91 
92  long autoRunNumber = -1;
93  if (fileListMode_) {
94  autoRunNumber = initFileList();
95  if (!fileListLoopMode_) {
96  if (autoRunNumber < 0)
97  throw cms::Exception("DAQSource::DAQSource") << "Run number not found from filename";
98  //override run number
99  runNumber_ = (edm::RunNumber_t)autoRunNumber;
100  daqDirector_->overrideRunNumber((unsigned int)autoRunNumber);
101  }
102  }
103 
104  dataMode_->makeDirectoryEntries(daqDirector_->getBUBaseDirs(), daqDirector_->runString());
105 
106  auto& daqProvenanceHelpers = dataMode_->makeDaqProvenanceHelpers();
107  for (const auto& daqProvenanceHelper : daqProvenanceHelpers)
108  processHistoryID_ = daqProvenanceHelper->daqInit(productRegistryUpdate(), processHistoryRegistryForUpdate());
109  setNewRun();
110  //todo:autodetect from file name (assert if names differ)
112 
113  //make sure that chunk size is N * block size
118 
119  if (!numBuffers_)
120  throw cms::Exception("DAQSource::DAQSource") << "no reading enabled with numBuffers parameter 0";
121 
123  assert(numBuffers_ > 1);
124  readingFilesCount_ = 0;
125 
126  if (!crc32c_hw_test())
127  edm::LogError("DAQSource::DAQSource") << "Intel crc32c checksum computation unavailable";
128 
129  //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
130  if (fileListMode_) {
131  try {
133  } catch (cms::Exception const&) {
134  edm::LogInfo("DAQSource") << "No FastMonitoringService found in the configuration";
135  }
136  } else {
138  if (!fms_) {
139  throw cms::Exception("DAQSource") << "FastMonitoringService not found";
140  }
141  }
142 
144  if (!daqDirector_)
145  cms::Exception("DAQSource") << "EvFDaqDirector not found";
146 
147  edm::LogInfo("DAQSource") << "EvFDaqDirector/Source configured to use file service";
148  //set DaqDirector to delete files in preGlobalEndLumi callback
150  if (fms_) {
152  fms_->setInputSource(this);
155  }
156  //should delete chunks when run stops
157  for (unsigned int i = 0; i < numBuffers_; i++) {
159  }
160 
161  quit_threads_ = false;
162 
163  //prepare data shared by threads
164  for (unsigned int i = 0; i < numConcurrentReads_; i++) {
165  thread_quit_signal.push_back(false);
166  workerJob_.push_back(ReaderInfo(nullptr, nullptr));
167  cvReader_.push_back(std::make_unique<std::condition_variable>());
168  tid_active_.push_back(0);
169  }
170 
171  //start threads
172  for (unsigned int i = 0; i < numConcurrentReads_; i++) {
173  //wait for each thread to complete initialization
174  std::unique_lock<std::mutex> lk(startupLock_);
175  workerThreads_.push_back(new std::thread(&DAQSource::readWorker, this, i));
176  startupCv_.wait(lk);
177  }
178 
179  runAuxiliary()->setProcessHistoryID(processHistoryID_);
180 }
long initFileList()
Definition: DAQSource.cc:1390
std::string const & runString() const
std::pair< RawInputFile *, InputChunk * > ReaderInfo
Definition: DAQSource.h:132
edm::RunNumber_t runNumber_
Definition: DAQSource.h:114
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:232
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:330
static Timestamp invalidTimestamp()
Definition: Timestamp.h:75
bool crc32c_hw_test()
Definition: crc32c.cc:354
uint64_t maxChunkSize_
Definition: DAQSource.h:92
std::mutex startupLock_
Definition: DAQSource.h:155
std::default_random_engine rng_
Definition: DAQSource.h:124
std::condition_variable startupCv_
Definition: DAQSource.h:156
const bool useL1EventID_
Definition: DAQSource.h:103
std::mutex fileDeleteLock_
Definition: DAQSource.h:160
const std::string dataModeConfig_
Definition: DAQSource.h:90
std::vector< std::thread * > workerThreads_
Definition: DAQSource.h:140
Log< level::Error, false > LogError
assert(be >=bs)
edm::ProcessHistoryID processHistoryID_
Definition: DAQSource.h:117
unsigned int currentLumiSection_
Definition: DAQSource.h:119
void overrideRunNumber(unsigned int run)
std::atomic< unsigned int > readingFilesCount_
Definition: DAQSource.h:98
uint64_t eventChunkSize_
Definition: DAQSource.h:91
static Timestamp beginOfTime()
Definition: Timestamp.h:77
uint64_t eventChunkBlock_
Definition: DAQSource.h:93
unsigned int eventsThisLumi_
Definition: DAQSource.h:122
const bool fileListLoopMode_
Definition: DAQSource.h:111
friend struct InputChunk
Definition: DAQSource.h:43
unsigned int maxBufferedFiles_
Definition: DAQSource.h:96
evf::FastMonitoringService * fms_
Definition: DAQSource.h:87
void readWorker(unsigned int tid)
Definition: DAQSource.cc:1054
unsigned int numBuffers_
Definition: DAQSource.h:95
std::atomic< bool > quit_threads_
Definition: DAQSource.h:152
const bool alwaysStartFromFirstLS_
Definition: DAQSource.h:101
std::shared_ptr< DataMode > dataMode_
Definition: DAQSource.h:176
const bool fileListMode_
Definition: DAQSource.h:109
void setInputSource(FedRawDataInputSource *inputSource)
Log< level::Info, false > LogInfo
const std::vector< unsigned int > testTCDSFEDRange_
Definition: DAQSource.h:104
def getRunNumber(filename)
void setInStateSup(FastMonState::InputState inputState)
unsigned long long uint64_t
Definition: Time.h:13
std::vector< std::string > listFileNames_
Definition: DAQSource.h:105
std::vector< unsigned int > tid_active_
Definition: DAQSource.h:150
std::vector< unsigned int > thread_quit_signal
Definition: DAQSource.h:153
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:331
unsigned int readBlocks_
Definition: DAQSource.h:94
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:333
std::vector< std::unique_ptr< std::condition_variable > > cvReader_
Definition: DAQSource.h:149
evf::EvFDaqDirector * daqDirector_
Definition: DAQSource.h:88
tbb::concurrent_queue< InputChunk * > freeChunks_
Definition: DAQSource.h:145
unsigned int numConcurrentReads_
Definition: DAQSource.h:97
unsigned int RunNumber_t
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
std::vector< ReaderInfo > workerJob_
Definition: DAQSource.h:143
void setFMS(evf::FastMonitoringService *fms)
const bool verifyChecksum_
Definition: DAQSource.h:102
std::vector< std::string > const & getBUBaseDirs() const
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
Definition: DAQSource.h:159
void setInState(FastMonState::InputState inputState)

◆ ~DAQSource()

DAQSource::~DAQSource ( )
override

Definition at line 182 of file DAQSource.cc.

References currentFile_, cvReader_, evf::FastMonitoringService::exceptionDetected(), fileDeleteLock_, filesToDelete_, fms_, mps_fire::i, evf::FastMonitoringService::isExceptionOnData(), mReader_, quit_threads_, readSupervisorThread_, startedSupervisorThread_, thread_quit_signal, and workerThreads_.

182  {
183  quit_threads_ = true;
184 
185  //delete any remaining open files
186  if (!fms_ || !fms_->exceptionDetected()) {
187  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
188  for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++)
189  it->second.reset();
190  } else {
191  //skip deleting files with exception
192  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
193  for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++) {
194  if (fms_->isExceptionOnData(it->second->lumi_))
195  it->second->unsetDeleteFile();
196  else
197  it->second.reset();
198  }
199  //disable deleting current file with exception
200  if (currentFile_.get())
201  if (fms_->isExceptionOnData(currentFile_->lumi_))
202  currentFile_->unsetDeleteFile();
203  }
204 
206  readSupervisorThread_->join();
207  } else {
208  //join aux threads in case the supervisor thread was not started
209  for (unsigned int i = 0; i < workerThreads_.size(); i++) {
210  std::unique_lock<std::mutex> lk(mReader_);
211  thread_quit_signal[i] = true;
212  cvReader_[i]->notify_one();
213  lk.unlock();
214  workerThreads_[i]->join();
215  delete workerThreads_[i];
216  }
217  }
218 }
std::unique_ptr< RawInputFile > currentFile_
Definition: DAQSource.h:134
std::unique_ptr< std::thread > readSupervisorThread_
Definition: DAQSource.h:138
std::mutex fileDeleteLock_
Definition: DAQSource.h:160
std::vector< std::thread * > workerThreads_
Definition: DAQSource.h:140
std::mutex mReader_
Definition: DAQSource.h:148
bool isExceptionOnData(unsigned int ls)
evf::FastMonitoringService * fms_
Definition: DAQSource.h:87
std::atomic< bool > quit_threads_
Definition: DAQSource.h:152
std::vector< unsigned int > thread_quit_signal
Definition: DAQSource.h:153
std::vector< std::unique_ptr< std::condition_variable > > cvReader_
Definition: DAQSource.h:149
bool startedSupervisorThread_
Definition: DAQSource.h:137
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
Definition: DAQSource.h:159

Member Function Documentation

◆ checkNext()

edm::RawInputSource::Next DAQSource::checkNext ( )
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 251 of file DAQSource.cc.

References visDQMUpload::buf, evf::EvFDaqDirector::createProcessingNotificationMaybe(), currentLumiSection_, daqDirector_, dataMode_, eventRunNumber_, eventsThisLumi_, fileListLoopMode_, fileListMode_, evf::EvFDaqDirector::getEoRFilePathOnFU(), getNextDataBlock(), getNextEventFromDataBlock(), evf::FastMonState::inWaitInput, edm::RawInputSource::kEvent, edm::RawInputSource::kStop, svgfig::load(), evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, readSupervisor(), readSupervisorThread_, reportEventsThisLumiInSource(), edm::InputSource::resetLuminosityBlockAuxiliary(), evf::EvFDaqDirector::runEnded, runNumber_, edm::InputSource::setEventCached(), setMonState(), edm::shutdown_flag, startedSupervisorThread_, startupCv_, startupLock_, edm_modernize_messagelogger::stat, and mps_update::status.

251  {
253  std::unique_lock<std::mutex> lk(startupLock_);
254 
255  //this thread opens new files and dispatches reading to worker readers
256  readSupervisorThread_ = std::make_unique<std::thread>(&DAQSource::readSupervisor, this);
258 
259  startupCv_.wait(lk);
260  }
261 
262  //signal hltd to start event accounting
263  if (!currentLumiSection_)
266 
267  auto nextEvent = [this]() {
268  auto getNextEvent = [this]() {
269  //for some models this is always true (if one event is one block)
270  if (dataMode_->dataBlockCompleted()) {
271  return getNextDataBlock();
272  } else {
273  return getNextEventFromDataBlock();
274  }
275  };
276 
278  while ((status = getNextEvent()) == evf::EvFDaqDirector::noFile) {
279  if (edm::shutdown_flag.load(std::memory_order_relaxed))
280  break;
281  }
282  return status;
283  };
284 
285  switch (nextEvent()) {
287  //maybe create EoL file in working directory before ending run
288  struct stat buf;
289  //also create EoR file in FU data directory
290  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(), &buf) == 0);
291  if (!eorFound) {
292  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(),
293  O_RDWR | O_CREAT,
294  S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
295  close(eor_fd);
296  }
298  eventsThisLumi_ = 0;
300  edm::LogInfo("DAQSource") << "----------------RUN ENDED----------------";
301  return Next::kStop;
302  }
304  //this is not reachable
305  return Next::kEvent;
306  }
308  //std::cout << "--------------NEW LUMI---------------" << std::endl;
309  return Next::kEvent;
310  }
311  default: {
314  else
315  eventRunNumber_ = dataMode_->run();
316 
317  setEventCached();
318 
319  return Next::kEvent;
320  }
321  }
322 }
edm::RunNumber_t runNumber_
Definition: DAQSource.h:114
std::unique_ptr< std::thread > readSupervisorThread_
Definition: DAQSource.h:138
std::mutex startupLock_
Definition: DAQSource.h:155
std::condition_variable startupCv_
Definition: DAQSource.h:156
volatile std::atomic< bool > shutdown_flag
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
Definition: DAQSource.cc:1369
unsigned int currentLumiSection_
Definition: DAQSource.h:119
evf::EvFDaqDirector::FileStatus getNextEventFromDataBlock()
Definition: DAQSource.cc:344
std::string getEoRFilePathOnFU() const
evf::EvFDaqDirector::FileStatus getNextDataBlock()
Definition: DAQSource.cc:370
unsigned int eventsThisLumi_
Definition: DAQSource.h:122
const bool fileListLoopMode_
Definition: DAQSource.h:111
std::shared_ptr< DataMode > dataMode_
Definition: DAQSource.h:176
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:362
const bool fileListMode_
Definition: DAQSource.h:109
void createProcessingNotificationMaybe() const
Log< level::Info, false > LogInfo
uint32_t eventRunNumber_
Definition: DAQSource.h:120
def load(fileName)
Definition: svgfig.py:547
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:345
bool startedSupervisorThread_
Definition: DAQSource.h:137
evf::EvFDaqDirector * daqDirector_
Definition: DAQSource.h:88
void setMonState(evf::FastMonState::InputState state)
Definition: DAQSource.cc:1318
void readSupervisor()
Definition: DAQSource.cc:583

◆ currentLumiSection()

int DAQSource::currentLumiSection ( ) const
inline

Definition at line 52 of file DAQSource.h.

References currentLumiSection_.

Referenced by DataModeFRD::readEvent(), DataModeFRDStriped::readEvent(), and readSupervisor().

52 { return currentLumiSection_; }
unsigned int currentLumiSection_
Definition: DAQSource.h:119

◆ dataArranger()

void DAQSource::dataArranger ( )
private

Definition at line 581 of file DAQSource.cc.

581 {}

◆ eventRunNumber()

int DAQSource::eventRunNumber ( ) const
inline

Definition at line 53 of file DAQSource.h.

References eventRunNumber_.

Referenced by DataModeFRD::readEvent(), and DataModeFRDStriped::readEvent().

53 { return eventRunNumber_; }
uint32_t eventRunNumber_
Definition: DAQSource.h:120

◆ exceptionState()

bool DAQSource::exceptionState ( )
inlineprivate

Definition at line 78 of file DAQSource.h.

References setExceptionState_.

Referenced by RawInputFile::advance().

78 { return setExceptionState_; }
bool setExceptionState_
Definition: DAQSource.h:154

◆ fileListLoopMode()

bool DAQSource::fileListLoopMode ( )
inline

Definition at line 57 of file DAQSource.h.

References fileListLoopMode_.

Referenced by DataModeFRD::readEvent(), and DataModeFRDStriped::readEvent().

57 { return fileListLoopMode_; }
const bool fileListLoopMode_
Definition: DAQSource.h:111

◆ fillDescriptions()

void DAQSource::fillDescriptions ( edm::ConfigurationDescriptions descriptions)
static

Definition at line 220 of file DAQSource.cc.

References edm::ConfigurationDescriptions::add(), submitPVResolutionJobs::desc, and AlCaHLTBitMon_QueryRunRegistry::string.

220  {
222  desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk (unified)");
223  desc.addUntracked<std::string>("dataMode", "FRD")->setComment("Data mode: event 'FRD', 'FRSStriped', 'ScoutingRun2'");
224  desc.addUntracked<unsigned int>("eventChunkSize", 64)->setComment("Input buffer (chunk) size");
225  desc.addUntracked<unsigned int>("maxChunkSize", 0)
226  ->setComment("Maximum chunk size allowed if buffer is resized to fit data. If 0 is specifier, use chunk size");
227  desc.addUntracked<unsigned int>("eventChunkBlock", 0)
228  ->setComment(
229  "Block size used in a single file read call (must be smaller or equal to the initial chunk buffer size). If "
230  "0 is specified, use chunk size.");
231 
232  desc.addUntracked<unsigned int>("numBuffers", 2)->setComment("Number of buffers used for reading input");
233  desc.addUntracked<unsigned int>("maxBufferedFiles", 2)
234  ->setComment("Maximum number of simultaneously buffered raw files");
235  desc.addUntracked<unsigned int>("alwaysStartFromfirstLS", false)
236  ->setComment("Force source to start from LS 1 if server provides higher lumisection number");
237  desc.addUntracked<bool>("verifyChecksum", true)
238  ->setComment("Verify event CRC-32C checksum of FRDv5 and higher or Adler32 with v3 and v4");
239  desc.addUntracked<bool>("useL1EventID", false)
240  ->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
241  desc.addUntracked<std::vector<unsigned int>>("testTCDSFEDRange", std::vector<unsigned int>())
242  ->setComment("[min, max] range to search for TCDS FED ID in test setup");
243  desc.addUntracked<bool>("fileListMode", false)
244  ->setComment("Use fileNames parameter to directly specify raw files to open");
245  desc.addUntracked<std::vector<std::string>>("fileNames", std::vector<std::string>())
246  ->setComment("file list used when fileListMode is enabled");
247  desc.setAllowAnything();
248  descriptions.add("source", desc);
249 }
void add(std::string const &label, ParameterSetDescription const &psetDescription)

◆ getEventReport()

std::pair< bool, unsigned int > DAQSource::getEventReport ( unsigned int  lumi,
bool  erase 
)

Definition at line 1378 of file DAQSource.cc.

References CommonMethods::lock(), monlock_, runTheMatrix::ret, and sourceEventsReport_.

Referenced by evf::FastMonitoringService::preGlobalEndLumi().

1378  {
1379  std::lock_guard<std::mutex> lock(monlock_);
1380  auto itr = sourceEventsReport_.find(lumi);
1381  if (itr != sourceEventsReport_.end()) {
1382  std::pair<bool, unsigned int> ret(true, itr->second);
1383  if (erase)
1384  sourceEventsReport_.erase(itr);
1385  return ret;
1386  } else
1387  return std::pair<bool, unsigned int>(false, 0);
1388 }
std::map< unsigned int, unsigned int > sourceEventsReport_
Definition: DAQSource.h:173
ret
prodAgent to be discontinued
std::mutex monlock_
Definition: DAQSource.h:174

◆ getFile()

evf::EvFDaqDirector::FileStatus DAQSource::getFile ( unsigned int &  ls,
std::string &  nextFile,
uint64_t &  lockWaitTime 
)
private

Definition at line 1424 of file DAQSource.cc.

References fileListIndex_, fileListLoopMode_, MillePedeFileConverter_cfg::fileName, listFileNames_, loopModeIterationInc_, eostools::ls(), evf::EvFDaqDirector::newFile, castor_dqm_sourceclient_file_cfg::path, evf::EvFDaqDirector::runEnded, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by readSupervisor().

1424  {
1425  if (fileListIndex_ < listFileNames_.size()) {
1426  nextFile = listFileNames_[fileListIndex_];
1427  if (nextFile.find("file://") == 0)
1428  nextFile = nextFile.substr(7);
1429  else if (nextFile.find("file:") == 0)
1430  nextFile = nextFile.substr(5);
1431  std::filesystem::path fileName = nextFile;
1432  std::string fileStem = fileName.stem().string();
1433  if (fileStem.find("ls"))
1434  fileStem = fileStem.substr(fileStem.find("ls") + 2);
1435  if (fileStem.find('_'))
1436  fileStem = fileStem.substr(0, fileStem.find('_'));
1437 
1438  if (!fileListLoopMode_)
1439  ls = std::stoul(fileStem);
1440  else //always starting from LS 1 in loop mode
1441  ls = 1 + loopModeIterationInc_;
1442 
1443  //fsize = 0;
1444  //lockWaitTime = 0;
1445  fileListIndex_++;
1447  } else {
1448  if (!fileListLoopMode_)
1450  else {
1451  //loop through files until interrupted
1453  fileListIndex_ = 0;
1454  return getFile(ls, nextFile, lockWaitTime);
1455  }
1456  }
1457 }
unsigned int loopModeIterationInc_
Definition: DAQSource.h:112
unsigned int fileListIndex_
Definition: DAQSource.h:110
const bool fileListLoopMode_
Definition: DAQSource.h:111
def ls(path, rec=False)
Definition: eostools.py:349
std::vector< std::string > listFileNames_
Definition: DAQSource.h:105
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint64_t &lockWaitTime)
Definition: DAQSource.cc:1424

◆ getNextDataBlock()

evf::EvFDaqDirector::FileStatus DAQSource::getNextDataBlock ( )
inlineprivate

Definition at line 370 of file DAQSource.cc.

References cms::cuda::assert(), chunkIsFree_, currentFile_, currentFileIndex_, currentLumiSection_, cvWakeup_, daqDirector_, dataMode_, eventsThisLumi_, Exception, fileDeleteLock_, fileListMode_, fileQueue_, filesToDelete_, freeChunks_, getNextEventFromDataBlock(), evf::FastMonState::inChunkReceived, evf::FastMonState::inNewLumi, evf::FastMonState::inProcessingFile, evf::FastMonState::inRunEnd, evf::FastMonState::inWaitChunk, evf::FastMonState::inWaitInput, evf::EvFDaqDirector::isSingleStreamThread(), maybeOpenNewLumiSection(), eostools::move(), mWakeup_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, readingFilesCount_, reportEventsThisLumiInSource(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, setExceptionState_, setMonState(), mps_update::status, threadError(), and mps_check::timeout.

Referenced by checkNext().

370  {
371  if (setExceptionState_)
372  threadError();
373  if (!currentFile_.get()) {
376  if (!fileQueue_.try_pop(currentFile_)) {
377  //sleep until wakeup (only in single-buffer mode) or timeout
378  std::unique_lock<std::mutex> lkw(mWakeup_);
379  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_.get())
381  }
382  status = currentFile_->status_;
385  currentFile_.reset();
386  return status;
387  } else if (status == evf::EvFDaqDirector::runAbort) {
388  throw cms::Exception("DAQSource::getNextDataBlock") << "Run has been aborted by the input source reader thread";
389  } else if (status == evf::EvFDaqDirector::newLumi) {
391  if (currentFile_->lumi_ > currentLumiSection_) {
393  eventsThisLumi_ = 0;
395  }
396  currentFile_.reset();
397  return status;
398  } else if (status == evf::EvFDaqDirector::newFile) {
400  } else
401  assert(false);
402  }
404 
405  //file is empty
406  if (!currentFile_->fileSize_) {
408  //try to open new lumi
409  assert(currentFile_->nChunks_ == 0);
410  if (currentFile_->lumi_ > currentLumiSection_) {
412  eventsThisLumi_ = 0;
414  }
415  //immediately delete empty file
416  currentFile_.reset();
418  }
419 
420  //file is finished
421  if (currentFile_->bufferPosition_ == currentFile_->fileSize_) {
423  //release last chunk (it is never released elsewhere)
424  freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]);
425  if (currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_)) {
426  throw cms::Exception("DAQSource::getNextDataBlock")
427  << "Fully processed " << currentFile_->nProcessed_ << " from the file " << currentFile_->fileName_
428  << " but according to BU JSON there should be " << currentFile_->nEvents_ << " events";
429  }
431  //put the file in pending delete list;
432  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
433  filesToDelete_.push_back(
434  std::pair<int, std::unique_ptr<RawInputFile>>(currentFileIndex_, std::move(currentFile_)));
435  } else {
436  //in single-thread and stream jobs, events are already processed
437  currentFile_.reset();
438  }
440  }
441 
442  //assert(currentFile_->status_ == evf::EvFDaqDirector::newFile);
443 
444  //handle RAW file header
445  if (currentFile_->bufferPosition_ == 0 && currentFile_->rawHeaderSize_ > 0) {
446  if (currentFile_->fileSize_ <= currentFile_->rawHeaderSize_) {
447  if (currentFile_->fileSize_ < currentFile_->rawHeaderSize_)
448  throw cms::Exception("DAQSource::getNextDataBlock") << "Premature end of input file while reading file header";
449 
450  edm::LogWarning("DAQSource") << "File with only raw header and no events received in LS " << currentFile_->lumi_;
451  if (currentFile_->lumi_ > currentLumiSection_) {
453  eventsThisLumi_ = 0;
455  }
456  }
457 
458  //advance buffer position to skip file header (chunk will be acquired later)
459  currentFile_->advance(currentFile_->rawHeaderSize_);
460  }
461 
462  //file is too short to fit event header
463  if (currentFile_->fileSizeLeft() < dataMode_->headerSize())
464  throw cms::Exception("DAQSource::getNextDataBlock")
465  << "Premature end of input file while reading event header. Missing: "
466  << (dataMode_->headerSize() - currentFile_->fileSizeLeft()) << " bytes";
467 
468  //multibuffer mode
469  //wait for the current chunk to become added to the vector
471  while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
472  usleep(10000);
473  if (setExceptionState_)
474  threadError();
475  }
477 
478  chunkIsFree_ = false;
479  bool chunkEnd;
480  unsigned char* dataPosition;
481 
482  //read event header, copy it to a single chunk if necessary
483  chunkEnd = currentFile_->advance(dataPosition, dataMode_->headerSize());
484 
485  //get buffer size of current chunk (can be resized)
486  uint64_t currentChunkSize = currentFile_->currentChunkSize();
487 
488  //prepare view based on header that was read
489  dataMode_->makeDataBlockView(dataPosition, currentChunkSize, currentFile_->fileSizes_, currentFile_->rawHeaderSize_);
490 
491  //check that payload size is within the file
492  const size_t msgSize = dataMode_->dataBlockSize() - dataMode_->headerSize();
493 
494  if (currentFile_->fileSizeLeft() < (int64_t)msgSize)
495  throw cms::Exception("DAQSource::getNextEventDataBlock")
496  << "Premature end of input file (missing:" << (msgSize - currentFile_->fileSizeLeft())
497  << ") while parsing block";
498 
499  //for cross-buffer models
500  if (chunkEnd) {
501  //header was at the chunk boundary, move payload into the starting chunk as well. No need to update block view here
502  currentFile_->moveToPreviousChunk(msgSize, dataMode_->headerSize());
503  //mark to release old chunk
504  chunkIsFree_ = true;
505  } else if (currentChunkSize - currentFile_->chunkPosition_ < msgSize) {
506  //header was contiguous, but payload does not fit in the chunk
507  //rewind to header start position and then together with payload will be copied together to the old chunk
508  currentFile_->rewindChunk(dataMode_->headerSize());
509 
511 
512  //do the copy to the beginning of the starting chunk. move pointers for next event in the next chunk
513  chunkEnd = currentFile_->advance(dataPosition, dataMode_->headerSize() + msgSize);
514  assert(chunkEnd);
515  //mark to release old chunk
516  chunkIsFree_ = true;
517 
519  //header and payload is moved, update view
520  dataMode_->makeDataBlockView(
521  dataPosition, currentFile_->currentChunkSize(), currentFile_->fileSizes_, currentFile_->rawHeaderSize_);
522  } else {
523  //everything is in a single chunk, only move pointers forward
524  chunkEnd = currentFile_->advance(dataPosition, msgSize);
525  assert(!chunkEnd);
526  chunkIsFree_ = false;
527  }
528 
529  //sanity-check check that the buffer position has not exceeded file size after preparing event
530  if (currentFile_->fileSize_ < currentFile_->bufferPosition_)
531  throw cms::Exception("DAQSource::getNextEventDataBlock")
532  << "Exceeded file size by " << (currentFile_->bufferPosition_ - currentFile_->fileSize_);
533 
534  //prepare event
535  return getNextEventFromDataBlock();
536 }
std::unique_ptr< RawInputFile > currentFile_
Definition: DAQSource.h:134
int currentFileIndex_
Definition: DAQSource.h:158
void threadError()
Definition: DAQSource.cc:1313
bool chunkIsFree_
Definition: DAQSource.h:135
void maybeOpenNewLumiSection(const uint32_t lumiSection)
Definition: DAQSource.cc:324
std::mutex fileDeleteLock_
Definition: DAQSource.h:160
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
Definition: DAQSource.cc:1369
int timeout
Definition: mps_check.py:53
assert(be >=bs)
std::mutex mWakeup_
Definition: DAQSource.h:165
unsigned int currentLumiSection_
Definition: DAQSource.h:119
std::atomic< unsigned int > readingFilesCount_
Definition: DAQSource.h:98
evf::EvFDaqDirector::FileStatus getNextEventFromDataBlock()
Definition: DAQSource.cc:344
unsigned int eventsThisLumi_
Definition: DAQSource.h:122
std::condition_variable cvWakeup_
Definition: DAQSource.h:166
std::shared_ptr< DataMode > dataMode_
Definition: DAQSource.h:176
tbb::concurrent_queue< std::unique_ptr< RawInputFile > > fileQueue_
Definition: DAQSource.h:146
const bool fileListMode_
Definition: DAQSource.h:109
unsigned long long uint64_t
Definition: Time.h:13
bool setExceptionState_
Definition: DAQSource.h:154
evf::EvFDaqDirector * daqDirector_
Definition: DAQSource.h:88
tbb::concurrent_queue< InputChunk * > freeChunks_
Definition: DAQSource.h:145
Log< level::Warning, false > LogWarning
void setMonState(evf::FastMonState::InputState state)
Definition: DAQSource.cc:1318
def move(src, dest)
Definition: eostools.py:511
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
Definition: DAQSource.h:159

◆ getNextEventFromDataBlock()

evf::EvFDaqDirector::FileStatus DAQSource::getNextEventFromDataBlock ( )
inlineprivate

Definition at line 344 of file DAQSource.cc.

References currentFile_, currentLumiSection_, dataMode_, Exception, fms_, newFWLiteAna::found, evf::FastMonState::inCachedEvent, evf::FastMonState::inChecksumEvent, evf::EvFDaqDirector::noFile, evf::EvFDaqDirector::sameFile, evf::FastMonitoringService::setExceptionDetected(), setMonState(), and verifyChecksum_.

Referenced by checkNext(), and getNextDataBlock().

344  {
346 
347  bool found = dataMode_->nextEventView();
348  //file(s) completely parsed
349  if (!found) {
350  if (dataMode_->dataBlockInitialized()) {
351  dataMode_->setDataBlockInitialized(false);
352  //roll position to the end of the file to close it
353  currentFile_->bufferPosition_ = currentFile_->fileSize_;
354  }
356  }
357 
358  if (verifyChecksum_ && !dataMode_->checksumValid()) {
359  if (fms_)
361  throw cms::Exception("DAQSource::getNextEventFromDataBlock") << dataMode_->getChecksumError();
362  }
364 
365  currentFile_->nProcessed_++;
366 
368 }
std::unique_ptr< RawInputFile > currentFile_
Definition: DAQSource.h:134
void setExceptionDetected(unsigned int ls)
unsigned int currentLumiSection_
Definition: DAQSource.h:119
evf::FastMonitoringService * fms_
Definition: DAQSource.h:87
std::shared_ptr< DataMode > dataMode_
Definition: DAQSource.h:176
void setMonState(evf::FastMonState::InputState state)
Definition: DAQSource.cc:1318
const bool verifyChecksum_
Definition: DAQSource.h:102

◆ initFileList()

long DAQSource::initFileList ( )
private

Definition at line 1390 of file DAQSource.cc.

References a, b, mps_fire::end, cppFunctionSkipper::exception, MillePedeFileConverter_cfg::fileName, listFileNames_, castor_dqm_sourceclient_file_cfg::path, jetUpdater_cfi::sort, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by DAQSource().

1390  {
1392  if (a.rfind('/') != std::string::npos)
1393  a = a.substr(a.rfind('/'));
1394  if (b.rfind('/') != std::string::npos)
1395  b = b.substr(b.rfind('/'));
1396  return b > a;
1397  });
1398 
1399  if (!listFileNames_.empty()) {
1400  //get run number from first file in the vector
1402  std::string fileStem = fileName.stem().string();
1403  if (fileStem.find("file://") == 0)
1404  fileStem = fileStem.substr(7);
1405  else if (fileStem.find("file:") == 0)
1406  fileStem = fileStem.substr(5);
1407  auto end = fileStem.find('_');
1408 
1409  if (fileStem.find("run") == 0) {
1410  std::string runStr = fileStem.substr(3, end - 3);
1411  try {
1412  //get long to support test run numbers < 2^32
1413  long rval = std::stol(runStr);
1414  edm::LogInfo("DAQSource") << "Autodetected run number in fileListMode -: " << rval;
1415  return rval;
1416  } catch (const std::exception&) {
1417  edm::LogWarning("DAQSource") << "Unable to autodetect run number in fileListMode from file -: " << fileName;
1418  }
1419  }
1420  }
1421  return -1;
1422 }
Log< level::Info, false > LogInfo
std::vector< std::string > listFileNames_
Definition: DAQSource.h:105
double b
Definition: hdecay.h:120
double a
Definition: hdecay.h:121
Log< level::Warning, false > LogWarning

◆ makeEventWrapper()

void DAQSource::makeEventWrapper ( edm::EventPrincipal eventPrincipal,
edm::EventAuxiliary aux 
)
inline

Definition at line 54 of file DAQSource.h.

References printConversionInfo::aux, and edm::RawInputSource::makeEvent().

Referenced by DataModeFRD::readEvent(), and DataModeFRDStriped::readEvent().

54  {
55  makeEvent(eventPrincipal, aux);
56  }
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)

◆ maybeOpenNewLumiSection()

void DAQSource::maybeOpenNewLumiSection ( const uint32_t  lumiSection)
private

Definition at line 324 of file DAQSource.cc.

References currentLumiSection_, edm::Timestamp::invalidTimestamp(), edm::InputSource::luminosityBlock(), edm::InputSource::luminosityBlockAuxiliary(), processHistoryID_, edm::InputSource::resetLuminosityBlockAuxiliary(), edm::InputSource::run(), edm::InputSource::runAuxiliary(), and edm::InputSource::setLuminosityBlockAuxiliary().

Referenced by getNextDataBlock().

324  {
325  if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
326  currentLumiSection_ = lumiSection;
327 
329 
330  timeval tv;
331  gettimeofday(&tv, nullptr);
332  const edm::Timestamp lsopentime((unsigned long long)tv.tv_sec * 1000000 + (unsigned long long)tv.tv_usec);
333 
335  runAuxiliary()->run(), lumiSection, lsopentime, edm::Timestamp::invalidTimestamp());
336 
337  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
338  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
339 
340  edm::LogInfo("DAQSource") << "New lumi section was opened. LUMI -: " << lumiSection;
341  }
342 }
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:232
static Timestamp invalidTimestamp()
Definition: Timestamp.h:75
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:457
edm::ProcessHistoryID processHistoryID_
Definition: DAQSource.h:117
unsigned int currentLumiSection_
Definition: DAQSource.h:119
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:337
Log< level::Info, false > LogInfo
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:345
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:462
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:235

◆ processHistoryID()

edm::ProcessHistoryID& DAQSource::processHistoryID ( )
inline

Definition at line 59 of file DAQSource.h.

References processHistoryID_.

Referenced by DataModeFRD::readEvent(), and DataModeFRDStriped::readEvent().

59 { return processHistoryID_; }
edm::ProcessHistoryID processHistoryID_
Definition: DAQSource.h:117

◆ read()

void DAQSource::read ( edm::EventPrincipal eventPrincipal)
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 538 of file DAQSource.cc.

References checkEvery_, chunkIsFree_, currentFile_, currentFileIndex_, dataMode_, eventsThisLumi_, fileDeleteLock_, filesToDelete_, fms_, freeChunks_, mps_fire::i, evf::FastMonState::inNoRequest, evf::FastMonState::inReadCleanup, evf::FastMonState::inReadEvent, evf::FastMonitoringService::isExceptionOnData(), setMonState(), streamFileTracker_, and edm::EventPrincipal::streamID().

Referenced by edmIntegrityCheck.PublishToFileSystem::get(), and readWorker().

538  {
540 
541  dataMode_->readEvent(eventPrincipal);
542 
543  eventsThisLumi_++;
545 
546  //resize vector if needed
547  while (streamFileTracker_.size() <= eventPrincipal.streamID())
548  streamFileTracker_.push_back(-1);
549 
550  streamFileTracker_[eventPrincipal.streamID()] = currentFileIndex_;
551 
552  //this old file check runs no more often than every 10 events
553  if (!((currentFile_->nProcessed_ - 1) % (checkEvery_))) {
554  //delete files that are not in processing
555  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
556  auto it = filesToDelete_.begin();
557  while (it != filesToDelete_.end()) {
558  bool fileIsBeingProcessed = false;
559  for (unsigned int i = 0; i < streamFileTracker_.size(); i++) {
560  if (it->first == streamFileTracker_.at(i)) {
561  fileIsBeingProcessed = true;
562  break;
563  }
564  }
565  if (!fileIsBeingProcessed && !(fms_ && fms_->isExceptionOnData(it->second->lumi_))) {
566  it = filesToDelete_.erase(it);
567  } else
568  it++;
569  }
570  }
571  if (dataMode_->dataBlockCompleted() && chunkIsFree_) {
572  freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_ - 1]);
573  chunkIsFree_ = false;
574  }
576  return;
577 }
std::unique_ptr< RawInputFile > currentFile_
Definition: DAQSource.h:134
int currentFileIndex_
Definition: DAQSource.h:158
unsigned int checkEvery_
Definition: DAQSource.h:162
bool chunkIsFree_
Definition: DAQSource.h:135
std::mutex fileDeleteLock_
Definition: DAQSource.h:160
StreamID streamID() const
bool isExceptionOnData(unsigned int ls)
unsigned int eventsThisLumi_
Definition: DAQSource.h:122
evf::FastMonitoringService * fms_
Definition: DAQSource.h:87
std::shared_ptr< DataMode > dataMode_
Definition: DAQSource.h:176
std::vector< int > streamFileTracker_
Definition: DAQSource.h:161
tbb::concurrent_queue< InputChunk * > freeChunks_
Definition: DAQSource.h:145
void setMonState(evf::FastMonState::InputState state)
Definition: DAQSource.cc:1318
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
Definition: DAQSource.h:159

◆ readSupervisor()

void DAQSource::readSupervisor ( )
private

possibility to use by new formats

Definition at line 583 of file DAQSource.cc.

References addFile(), alwaysStartFromFirstLS_, cms::cuda::assert(), evf::EvFDaqDirector::buBaseRunDir(), visDQMUpload::buf, currentLumiSection(), cvReader_, cvWakeup_, daqDirector_, dataMode_, eventChunkSize_, fileListMode_, fileQueue_, fms_, freeChunks_, evf::EvFDaqDirector::getEoRFileName(), getFile(), evf::EvFDaqDirector::getLumisectionToStart(), evf::EvFDaqDirector::getNextFromFileBroker(), evf::EvFDaqDirector::getStartLumisectionFromEnv(), mps_fire::i, evf::EvFDaqDirector::inputThrottled(), evf::FastMonState::inRunEnd, evf::FastMonState::inSupBusy, evf::FastMonState::inSupFileLimit, evf::FastMonState::inSupLockPolling, evf::FastMonState::inSupNewFile, evf::FastMonState::inSupNewFileWaitChunk, evf::FastMonState::inSupNewFileWaitChunkCopying, evf::FastMonState::inSupNewFileWaitThread, evf::FastMonState::inSupNewFileWaitThreadCopying, evf::FastMonState::inSupNoFile, evf::FastMonState::inSupWaitFreeChunk, evf::FastMonState::inSupWaitFreeChunkCopying, evf::FastMonState::inSupWaitFreeThread, evf::FastMonState::inSupWaitFreeThreadCopying, createfilelist::int, evf::FastMonState::inThrottled, dqmiolumiharvest::j, LogDebug, eostools::ls(), evf::EvFDaqDirector::lumisectionDiscarded(), SiStripPI::max, maxBufferedFiles_, maxChunkSize_, SiStripPI::min, eostools::move(), mReader_, mWakeup_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, evf::EvFDaqDirector::numConcurrentLumis(), evf::EvFDaqDirector::parseFRDFileHeader(), castor_dqm_sourceclient_file_cfg::path, conifer::pow(), quit_threads_, RawInputFile, readingFilesCount_, evf::FastMonitoringService::reportLockWait(), InputChunk::reset(), rng_, evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, setExceptionState_, setMonStateSup(), edm::shutdown_flag, evf::FastMonitoringService::startedLookingForFile(), startupCv_, startupLock_, edm_modernize_messagelogger::stat, mps_update::status, evf::FastMonitoringService::stoppedLookingForFile(), AlCaHLTBitMon_QueryRunRegistry::string, thread_quit_signal, tid_active_, mps_check::timeout, workerJob_, workerPool_, and workerThreads_.

Referenced by checkNext().

583  {
584  bool stop = false;
585  unsigned int currentLumiSection = 0;
586 
587  {
588  std::unique_lock<std::mutex> lk(startupLock_);
589  startupCv_.notify_one();
590  }
591 
592  uint32_t ls = 0;
593  uint32_t monLS = 1;
594  uint32_t lockCount = 0;
595  uint64_t sumLockWaitTimeUs = 0.;
596 
597  bool requireHeader = dataMode_->requireHeader();
598 
599  while (!stop) {
600  //wait for at least one free thread and chunk
601  int counter = 0;
602 
603  while (workerPool_.empty() || freeChunks_.empty() || readingFilesCount_ >= maxBufferedFiles_) {
604  //report state to monitoring
605  if (fms_) {
606  bool copy_active = false;
607  for (auto j : tid_active_)
608  if (j)
609  copy_active = true;
612  else if (freeChunks_.empty()) {
613  if (copy_active)
615  else
617  } else {
618  if (copy_active)
620  else
622  }
623  }
624  std::unique_lock<std::mutex> lkw(mWakeup_);
625  //sleep until woken up by condition or a timeout
626  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
627  counter++;
628  if (!(counter % 6000)) {
629  edm::LogWarning("FedRawDataInputSource")
630  << "No free chunks or threads. Worker pool empty:" << workerPool_.empty()
631  << ", free chunks empty:" << freeChunks_.empty() << ", number of files buffered:" << readingFilesCount_
632  << " / " << maxBufferedFiles_;
633  }
634  LogDebug("DAQSource") << "No free chunks or threads...";
635  } else {
636  assert(!workerPool_.empty() || freeChunks_.empty());
637  }
638  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
639  stop = true;
640  break;
641  }
642  }
643  //if this is reached, there are enough buffers and threads to proceed or processing is instructed to stop
644 
645  if (stop)
646  break;
647 
648  //look for a new file
649  std::string nextFile;
650  int64_t fileSizeFromMetadata;
651 
652  if (fms_) {
655  }
656  bool fitToBuffer = dataMode_->fitToBuffer();
657 
659  uint16_t rawHeaderSize = 0;
660  uint32_t lsFromRaw = 0;
661  int32_t serverEventsInNewFile = -1;
662  int rawFd = -1;
663 
664  int backoff_exp = 0;
665 
666  //entering loop which tries to grab new file from ramdisk
668  //check if hltd has signalled to throttle input
669  counter = 0;
670  while (daqDirector_->inputThrottled()) {
671  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed))
672  break;
673 
674  unsigned int nConcurrentLumis = daqDirector_->numConcurrentLumis();
675  unsigned int nOtherLumis = nConcurrentLumis > 0 ? nConcurrentLumis - 1 : 0;
676  unsigned int checkLumiStart = currentLumiSection > nOtherLumis ? currentLumiSection - nOtherLumis : 1;
677  bool hasDiscardedLumi = false;
678  for (unsigned int i = checkLumiStart; i <= currentLumiSection; i++) {
680  edm::LogWarning("DAQSource") << "Source detected that the lumisection is discarded -: " << i;
681  hasDiscardedLumi = true;
682  break;
683  }
684  }
685  if (hasDiscardedLumi)
686  break;
687 
689  if (!(counter % 50))
690  edm::LogWarning("DAQSource") << "Input throttled detected, reading files is paused...";
691  usleep(100000);
692  counter++;
693  }
694 
695  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
696  stop = true;
697  break;
698  }
699 
700  assert(rawFd == -1);
701  uint64_t thisLockWaitTimeUs = 0.;
703  if (fileListMode_) {
704  //return LS if LS not set, otherwise return file
705  status = getFile(ls, nextFile, thisLockWaitTimeUs);
707  uint16_t rawDataType;
709  rawFd,
710  rawHeaderSize,
711  rawDataType,
712  lsFromRaw,
713  serverEventsInNewFile,
714  fileSizeFromMetadata,
715  requireHeader,
716  false,
717  false) != 0) {
718  //error
719  setExceptionState_ = true;
720  stop = true;
721  break;
722  }
723  }
724  } else {
726  ls,
727  nextFile,
728  rawFd,
729  rawHeaderSize, //which format?
730  serverEventsInNewFile,
731  fileSizeFromMetadata,
732  thisLockWaitTimeUs,
733  requireHeader);
734  }
735 
737 
738  //cycle through all remaining LS even if no files get assigned
741 
742  //monitoring of lock wait time
743  if (thisLockWaitTimeUs > 0.)
744  sumLockWaitTimeUs += thisLockWaitTimeUs;
745  lockCount++;
746  if (ls > monLS) {
747  monLS = ls;
748  if (lockCount)
749  if (fms_)
750  fms_->reportLockWait(monLS, sumLockWaitTimeUs, lockCount);
751  lockCount = 0;
752  sumLockWaitTimeUs = 0;
753  }
754 
756  fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runEnded));
757  stop = true;
758  break;
759  }
760 
761  //error from filelocking function
763  fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runAbort, 0));
764  stop = true;
765  break;
766  }
767  //queue new lumisection
768  if (ls > currentLumiSection) {
769  //new file service
772  //start transitions from LS specified by env, continue if not reached
773  if (ls < daqDirector_->getStartLumisectionFromEnv()) {
774  //skip file if from earlier LS than specified by env
775  if (rawFd != -1) {
776  close(rawFd);
777  rawFd = -1;
778  }
780  continue;
781  } else {
782  fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, ls));
783  }
784  } else if (ls < 100) {
785  //look at last LS file on disk to start from that lumisection (only within first 100 LS)
786  unsigned int lsToStart = daqDirector_->getLumisectionToStart();
787 
788  for (unsigned int nextLS = std::min(lsToStart, ls); nextLS <= ls; nextLS++) {
789  fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, nextLS));
790  }
791  } else {
792  //start from current LS
793  fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, ls));
794  }
795  } else {
796  //queue all lumisections after last one seen to avoid gaps
797  for (unsigned int nextLS = currentLumiSection + 1; nextLS <= ls; nextLS++) {
798  fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::newLumi, nextLS));
799  }
800  }
802  }
803  //else
805  edm::LogError("DAQSource") << "Got old LS (" << ls
806  << ") file from EvFDAQDirector! Expected LS:" << currentLumiSection
807  << ". Aborting execution." << std::endl;
808  if (rawFd != -1)
809  close(rawFd);
810  rawFd = -1;
811  fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runAbort, 0));
812  stop = true;
813  break;
814  }
815 
816  int dbgcount = 0;
819  dbgcount++;
820  if (!(dbgcount % 20))
821  LogDebug("DAQSource") << "No file for me... sleep and try again...";
822 
823  backoff_exp = std::min(4, backoff_exp); // max 1.6 seconds
824  //backoff_exp=0; // disabled!
825  int sleeptime = (int)(100000. * pow(2, backoff_exp));
826  usleep(sleeptime);
827  backoff_exp++;
828  } else
829  backoff_exp = 0;
830  }
831  //end of file grab loop, parse result
834  LogDebug("DAQSource") << "The director says to grab -: " << nextFile;
835 
836  std::string rawFile;
837  //file service will report raw extension
838  rawFile = nextFile;
839 
840  struct stat st;
841  int stat_res = stat(rawFile.c_str(), &st);
842  if (stat_res == -1) {
843  edm::LogError("DAQSource") << "Can not stat file (" << errno << "):-" << rawFile << std::endl;
844  setExceptionState_ = true;
845  break;
846  }
847  uint64_t fileSize = st.st_size;
848 
849  if (fms_) {
853  }
854  int eventsInNewFile;
855  if (fileListMode_) {
856  if (fileSize == 0)
857  eventsInNewFile = 0;
858  else
859  eventsInNewFile = -1;
860  } else {
861  eventsInNewFile = serverEventsInNewFile;
862  assert(eventsInNewFile >= 0);
863  assert((eventsInNewFile > 0) ==
864  (fileSize > rawHeaderSize)); //file without events must be empty or contain only header
865  }
866 
867  std::pair<bool, std::vector<std::string>> additionalFiles =
868  dataMode_->defineAdditionalFiles(rawFile, fileListMode_);
869  if (!additionalFiles.first) {
870  //skip secondary files from file broker
871  if (rawFd > -1)
872  close(rawFd);
873  continue;
874  }
875 
876  std::unique_ptr<RawInputFile> newInputFile(new RawInputFile(evf::EvFDaqDirector::FileStatus::newFile,
877  ls,
878  rawFile,
879  !fileListMode_,
880  rawFd,
881  fileSize,
882  rawHeaderSize, //for which format
883  0,
884  eventsInNewFile,
885  this));
886 
887  uint64_t neededSize = fileSize;
888  for (const auto& addFile : additionalFiles.second) {
889  struct stat buf;
890  //wait for secondary files to appear
891  unsigned int fcnt = 0;
892  while (stat(addFile.c_str(), &buf) != 0) {
893  if (fileListMode_) {
894  edm::LogError("DAQSource") << "additional file is missing -: " << addFile;
895  stop = true;
896  setExceptionState_ = true;
897  break;
898  }
899  usleep(10000);
900  fcnt++;
901  //report and EoR check every 30 seconds
902  if ((fcnt && fcnt % 3000 == 0) || quit_threads_.load(std::memory_order_relaxed)) {
903  edm::LogWarning("DAQSource") << "Additional file is still missing after 30 seconds -: " << addFile;
904  struct stat bufEoR;
905  auto secondaryPath = std::filesystem::path(addFile).parent_path();
907  std::string mainEoR = (std::filesystem::path(daqDirector_->buBaseRunDir()) / eorName).generic_string();
908  std::string secondaryEoR = (secondaryPath / eorName).generic_string();
909  bool prematureEoR = false;
910  if (stat(secondaryEoR.c_str(), &bufEoR) == 0) {
911  if (stat(addFile.c_str(), &bufEoR) != 0) {
912  edm::LogError("DAQSource")
913  << "EoR file appeared in -: " << secondaryPath << " while waiting for index file " << addFile;
914  prematureEoR = true;
915  }
916  } else if (stat(mainEoR.c_str(), &bufEoR) == 0) {
917  //wait another 10 seconds
918  usleep(10000000);
919  if (stat(addFile.c_str(), &bufEoR) != 0) {
920  edm::LogError("DAQSource")
921  << "Main EoR file appeared -: " << mainEoR << " while waiting for index file " << addFile;
922  prematureEoR = true;
923  }
924  }
925  if (prematureEoR) {
926  //queue EoR since this is not FU error
927  fileQueue_.push(std::make_unique<RawInputFile>(evf::EvFDaqDirector::runEnded, 0));
928  stop = true;
929  break;
930  }
931  }
932 
933  if (quit_threads_) {
934  edm::LogError("DAQSource") << "Quitting while waiting for file -: " << addFile;
935  stop = true;
936  setExceptionState_ = true;
937  break;
938  }
939  }
940  LogDebug("DAQSource") << " APPEND NAME " << addFile;
941  if (stop)
942  break;
943 
944  newInputFile->appendFile(addFile, buf.st_size);
945  neededSize += buf.st_size;
946  }
947  if (stop)
948  break;
949 
950  //calculate number of needed chunks and size if resizing will be applied
951  uint16_t neededChunks;
952  uint64_t chunkSize;
953 
954  if (fitToBuffer) {
955  chunkSize = std::min(maxChunkSize_, std::max(eventChunkSize_, neededSize));
956  neededChunks = 1;
957  } else {
958  chunkSize = eventChunkSize_;
959  neededChunks = neededSize / eventChunkSize_ + uint16_t((neededSize % eventChunkSize_) > 0);
960  }
961  newInputFile->setChunks(neededChunks);
962 
963  newInputFile->randomizeOrder(rng_);
964 
966  auto newInputFilePtr = newInputFile.get();
967  fileQueue_.push(std::move(newInputFile));
968 
969  for (size_t i = 0; i < neededChunks; i++) {
970  if (fms_) {
971  bool copy_active = false;
972  for (auto j : tid_active_)
973  if (j)
974  copy_active = true;
975  if (copy_active)
977  else
979  }
980  //get thread
981  unsigned int newTid = 0xffffffff;
982  while (!workerPool_.try_pop(newTid)) {
983  usleep(100000);
984  if (quit_threads_.load(std::memory_order_relaxed)) {
985  stop = true;
986  break;
987  }
988  }
989 
990  if (fms_) {
991  bool copy_active = false;
992  for (auto j : tid_active_)
993  if (j)
994  copy_active = true;
995  if (copy_active)
997  else
999  }
1000  InputChunk* newChunk = nullptr;
1001  while (!freeChunks_.try_pop(newChunk)) {
1002  usleep(100000);
1003  if (quit_threads_.load(std::memory_order_relaxed)) {
1004  stop = true;
1005  break;
1006  }
1007  }
1008 
1009  if (newChunk == nullptr) {
1010  //return unused tid if we received shutdown (nullptr chunk)
1011  if (newTid != 0xffffffff)
1012  workerPool_.push(newTid);
1013  stop = true;
1014  break;
1015  }
1016  if (stop)
1017  break;
1019 
1020  std::unique_lock<std::mutex> lk(mReader_);
1021 
1022  uint64_t toRead = chunkSize;
1023  if (i == (uint64_t)neededChunks - 1 && neededSize % chunkSize)
1024  toRead = neededSize % chunkSize;
1025  newChunk->reset(i * chunkSize, toRead, i);
1026 
1027  workerJob_[newTid].first = newInputFilePtr;
1028  workerJob_[newTid].second = newChunk;
1029 
1030  //wake up the worker thread
1031  cvReader_[newTid]->notify_one();
1032  }
1033  }
1034  }
1036  //make sure threads finish reading
1037  unsigned int numFinishedThreads = 0;
1038  while (numFinishedThreads < workerThreads_.size()) {
1039  unsigned int tid = 0;
1040  while (!workerPool_.try_pop(tid)) {
1041  usleep(10000);
1042  }
1043  std::unique_lock<std::mutex> lk(mReader_);
1044  thread_quit_signal[tid] = true;
1045  cvReader_[tid]->notify_one();
1046  numFinishedThreads++;
1047  }
1048  for (unsigned int i = 0; i < workerThreads_.size(); i++) {
1049  workerThreads_[i]->join();
1050  delete workerThreads_[i];
1051  }
1052 }
std::string & buBaseRunDir()
uint64_t maxChunkSize_
Definition: DAQSource.h:92
std::mutex startupLock_
Definition: DAQSource.h:155
std::default_random_engine rng_
Definition: DAQSource.h:124
std::condition_variable startupCv_
Definition: DAQSource.h:156
volatile std::atomic< bool > shutdown_flag
constexpr int pow(int x)
Definition: conifer.h:24
bool lumisectionDiscarded(unsigned int ls)
std::vector< std::thread * > workerThreads_
Definition: DAQSource.h:140
Log< level::Error, false > LogError
std::mutex mReader_
Definition: DAQSource.h:148
unsigned int numConcurrentLumis() const
int timeout
Definition: mps_check.py:53
assert(be >=bs)
std::mutex mWakeup_
Definition: DAQSource.h:165
std::atomic< unsigned int > readingFilesCount_
Definition: DAQSource.h:98
uint64_t eventChunkSize_
Definition: DAQSource.h:91
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint16_t &rawDataType, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs, bool requireHeader=true)
int currentLumiSection() const
Definition: DAQSource.h:52
std::condition_variable cvWakeup_
Definition: DAQSource.h:166
void reset(uint64_t newOffset, uint64_t toRead, unsigned int fileIndex)
unsigned int maxBufferedFiles_
Definition: DAQSource.h:96
evf::FastMonitoringService * fms_
Definition: DAQSource.h:87
std::atomic< bool > quit_threads_
Definition: DAQSource.h:152
const bool alwaysStartFromFirstLS_
Definition: DAQSource.h:101
std::shared_ptr< DataMode > dataMode_
Definition: DAQSource.h:176
tbb::concurrent_queue< std::unique_ptr< RawInputFile > > fileQueue_
Definition: DAQSource.h:146
const bool fileListMode_
Definition: DAQSource.h:109
def ls(path, rec=False)
Definition: eostools.py:349
unsigned int getStartLumisectionFromEnv() const
unsigned long long uint64_t
Definition: Time.h:13
std::string getEoRFileName() const
std::vector< unsigned int > tid_active_
Definition: DAQSource.h:150
void stoppedLookingForFile(unsigned int lumi)
std::vector< unsigned int > thread_quit_signal
Definition: DAQSource.h:153
void setMonStateSup(evf::FastMonState::InputState state)
Definition: DAQSource.cc:1323
friend class RawInputFile
Definition: DAQSource.h:42
bool setExceptionState_
Definition: DAQSource.h:154
std::vector< std::unique_ptr< std::condition_variable > > cvReader_
Definition: DAQSource.h:149
unsigned int getLumisectionToStart() const
evf::EvFDaqDirector * daqDirector_
Definition: DAQSource.h:88
tbb::concurrent_queue< InputChunk * > freeChunks_
Definition: DAQSource.h:145
int addFile(MEStore &micromes, int fd)
Definition: fastHadd.cc:352
tbb::concurrent_queue< unsigned int > workerPool_
Definition: DAQSource.h:142
Log< level::Warning, false > LogWarning
std::vector< ReaderInfo > workerJob_
Definition: DAQSource.h:143
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint64_t &lockWaitTime)
Definition: DAQSource.cc:1424
def move(src, dest)
Definition: eostools.py:511
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
#define LogDebug(id)

◆ readWorker()

void DAQSource::readWorker ( unsigned int  tid)
private

Definition at line 1054 of file DAQSource.cc.

References addFile(), cms::cuda::assert(), InputChunk::buf_, cvReader_, dataMode_, change_name::diff, mps_fire::end, eventChunkBlock_, eventChunkSize_, geometryDiff::file, InputChunk::fileIndex_, dqmdumpme::first, mps_fire::i, dqmiolumiharvest::j, dqmdumpme::last, LogDebug, maxChunkSize_, SiStripPI::min, mReader_, submitPVValidationJobs::now, numConcurrentReads_, InputChunk::offset_, read(), InputChunk::readComplete_, InputChunk::resize(), alignCSCRings::s, edm::second(), setExceptionState_, InputChunk::size_, runEdmFileComparison::skipped, command_line::start, startupCv_, startupLock_, AlCaHLTBitMon_QueryRunRegistry::string, thread_quit_signal, threadInit_, tid_active_, InputChunk::usedSize_, workerJob_, and workerPool_.

Referenced by DAQSource().

1054  {
1055  bool init = true;
1056  threadInit_.exchange(true, std::memory_order_acquire);
1057 
1058  while (true) {
1059  tid_active_[tid] = false;
1060  std::unique_lock<std::mutex> lk(mReader_);
1061  workerJob_[tid].first = nullptr;
1062  workerJob_[tid].first = nullptr;
1063 
1064  assert(!thread_quit_signal[tid]); //should never get it here
1065  workerPool_.push(tid);
1066 
1067  if (init) {
1068  std::unique_lock<std::mutex> lk(startupLock_);
1069  init = false;
1070  startupCv_.notify_one();
1071  }
1072  cvReader_[tid]->wait(lk);
1073 
1074  if (thread_quit_signal[tid])
1075  return;
1076  tid_active_[tid] = true;
1077 
1078  RawInputFile* file;
1079  InputChunk* chunk;
1080 
1081  assert(workerJob_[tid].first != nullptr && workerJob_[tid].second != nullptr);
1082 
1083  file = workerJob_[tid].first;
1084  chunk = workerJob_[tid].second;
1085 
1086  bool fitToBuffer = dataMode_->fitToBuffer();
1087 
1088  //resize if multi-chunked reading is not possible
1089  if (fitToBuffer) {
1090  uint64_t accum = 0;
1091  for (auto s : file->diskFileSizes_)
1092  accum += s;
1093  if (accum > eventChunkSize_) {
1094  if (!chunk->resize(accum, maxChunkSize_)) {
1095  edm::LogError("DAQSource")
1096  << "maxChunkSize can not accomodate the file set. Try increasing chunk size and/or chunk maximum size.";
1097  if (file->rawFd_ != -1 && (numConcurrentReads_ == 1 || chunk->offset_ == 0))
1098  close(file->rawFd_);
1099  setExceptionState_ = true;
1100  continue;
1101  } else {
1102  edm::LogInfo("DAQSource") << "chunk size was increased to " << (chunk->size_ >> 20) << " MB";
1103  }
1104  }
1105  }
1106 
1107  //skip reading initial header size in first chunk if inheriting file descriptor (already set at appropriate position)
1108  unsigned int bufferLeftInitial = (chunk->offset_ == 0 && file->rawFd_ != -1) ? file->rawHeaderSize_ : 0;
1109  const uint16_t readBlocks = chunk->size_ / eventChunkBlock_ + uint16_t(chunk->size_ % eventChunkBlock_ > 0);
1110 
1111  auto readPrimary = [&](uint64_t bufferLeft) {
1112  //BEGIN reading primary file - check if file descriptor is already open
1113  //in multi-threaded chunked mode, only first thread will use already open fd for reading the first file
1114  //fd will not be closed in other case (used by other threads)
1115  int fileDescriptor = -1;
1116  bool fileOpenedHere = false;
1117 
1118  if (numConcurrentReads_ == 1) {
1119  fileDescriptor = file->rawFd_;
1120  file->rawFd_ = -1;
1121  if (fileDescriptor == -1) {
1122  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1123  fileOpenedHere = true;
1124  }
1125  } else {
1126  if (chunk->offset_ == 0) {
1127  fileDescriptor = file->rawFd_;
1128  file->rawFd_ = -1;
1129  if (fileDescriptor == -1) {
1130  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1131  fileOpenedHere = true;
1132  }
1133  } else {
1134  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1135  fileOpenedHere = true;
1136  }
1137  }
1138 
1139  if (fileDescriptor == -1) {
1140  edm::LogError("DAQSource") << "readWorker failed to open file -: " << file->fileName_
1141  << " fd:" << fileDescriptor << " error: " << strerror(errno);
1142  setExceptionState_ = true;
1143  return;
1144  }
1145 
1146  if (fileOpenedHere) { //fast forward to this chunk position
1147  off_t pos = lseek(fileDescriptor, chunk->offset_, SEEK_SET);
1148  if (pos == -1) {
1149  edm::LogError("DAQSource") << "readWorker failed to seek file -: " << file->fileName_
1150  << " fd:" << fileDescriptor << " to offset " << chunk->offset_
1151  << " error: " << strerror(errno);
1152  setExceptionState_ = true;
1153  return;
1154  }
1155  }
1156 
1157  LogDebug("DAQSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_
1158  << " at offset " << lseek(fileDescriptor, 0, SEEK_CUR);
1159 
1160  size_t skipped = bufferLeft;
1162  for (unsigned int i = 0; i < readBlocks; i++) {
1163  ssize_t last;
1164  edm::LogInfo("DAQSource") << "readWorker read -: " << (int64_t)(chunk->usedSize_ - bufferLeft) << " or "
1165  << (int64_t)eventChunkBlock_;
1166 
1167  //protect against reading into next block
1168  last = ::read(fileDescriptor,
1169  (void*)(chunk->buf_ + bufferLeft),
1170  std::min((int64_t)(chunk->usedSize_ - bufferLeft), (int64_t)eventChunkBlock_));
1171 
1172  if (last < 0) {
1173  edm::LogError("DAQSource") << "readWorker failed to read file -: " << file->fileName_
1174  << " fd:" << fileDescriptor << " last: " << last << " error: " << strerror(errno);
1175  setExceptionState_ = true;
1176  break;
1177  }
1178  if (last > 0) {
1179  bufferLeft += last;
1180  }
1181  if ((uint64_t)last < eventChunkBlock_) { //last read
1182  edm::LogInfo("DAQSource") << "chunkUsedSize" << chunk->usedSize_ << " u-s:" << (chunk->usedSize_ - skipped)
1183  << " ix:" << i * eventChunkBlock_ << " " << (size_t)last;
1184  //check if this is last block if single file, then total read size must match file size
1185  if (file->numFiles_ == 1 && !(chunk->usedSize_ - skipped == i * eventChunkBlock_ + (size_t)last)) {
1186  edm::LogError("DAQSource") << "readWorker failed to read file -: " << file->fileName_
1187  << " fd:" << fileDescriptor << " last:" << last
1188  << " expectedChunkSize:" << chunk->usedSize_
1189  << " readChunkSize:" << (skipped + i * eventChunkBlock_ + last)
1190  << " skipped:" << skipped << " block:" << (i + 1) << "/" << readBlocks
1191  << " error: " << strerror(errno);
1192  setExceptionState_ = true;
1193  }
1194  break;
1195  }
1196  }
1197  if (setExceptionState_)
1198  return;
1199 
1200  file->fileSizes_[0] = bufferLeft;
1201 
1202  if (chunk->offset_ + bufferLeft == file->diskFileSizes_[0] || bufferLeft == chunk->size_) {
1203  //file reading finished using this fd
1204  //or the whole buffer is filled (single sequential file spread over more chunks)
1205  close(fileDescriptor);
1206  fileDescriptor = -1;
1207  } else
1208  assert(fileDescriptor == -1);
1209 
1210  if (fitToBuffer && bufferLeft != file->diskFileSizes_[0]) {
1211  edm::LogError("DAQSource") << "mismatch between read file size for file -: " << file->fileNames_[0]
1212  << " read:" << bufferLeft << " expected:" << file->diskFileSizes_[0];
1213  setExceptionState_ = true;
1214  return;
1215  }
1216 
1218  auto diff = end - start;
1219  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1220  LogDebug("DAQSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1221  << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1222  << " GB/s)";
1223  };
1224  //END primary function
1225 
1226  //SECONDARY files function
1227  auto readSecondary = [&](uint64_t bufferLeft, unsigned int j) {
1228  size_t fileLen = 0;
1229 
1230  std::string const& addFile = file->fileNames_[j];
1231  int fileDescriptor = open(addFile.c_str(), O_RDONLY);
1232 
1233  if (fileDescriptor < 0) {
1234  edm::LogError("DAQSource") << "readWorker failed to open file -: " << addFile << " fd:" << fileDescriptor
1235  << " error: " << strerror(errno);
1236  setExceptionState_ = true;
1237  return;
1238  }
1239 
1240  LogDebug("DAQSource") << "Reader thread opened file -: TID: " << tid << " file: " << addFile << " at offset "
1241  << lseek(fileDescriptor, 0, SEEK_CUR);
1242 
1243  //size_t skipped = 0;//file is newly opened, read with header
1245  for (unsigned int i = 0; i < readBlocks; i++) {
1246  ssize_t last;
1247 
1248  //protect against reading into next block
1249  //use bufferLeft for the write offset
1250  last = ::read(fileDescriptor,
1251  (void*)(chunk->buf_ + bufferLeft),
1252  std::min((uint64_t)file->diskFileSizes_[j], (uint64_t)eventChunkBlock_));
1253 
1254  if (last < 0) {
1255  edm::LogError("DAQSource") << "readWorker failed to read file -: " << addFile << " fd:" << fileDescriptor
1256  << " error: " << strerror(errno);
1257  setExceptionState_ = true;
1258  close(fileDescriptor);
1259  break;
1260  }
1261  if (last > 0) {
1262  bufferLeft += last;
1263  fileLen += last;
1264  file->fileSize_ += last;
1265  }
1266  };
1267 
1268  close(fileDescriptor);
1269  file->fileSizes_[j] = fileLen;
1270  assert(fileLen > 0);
1271 
1272  if (fitToBuffer && fileLen != file->diskFileSizes_[j]) {
1273  edm::LogError("DAQSource") << "mismatch between read file size for file -: " << file->fileNames_[j]
1274  << " read:" << fileLen << " expected:" << file->diskFileSizes_[j];
1275  setExceptionState_ = true;
1276  return;
1277  }
1278 
1280  auto diff = end - start;
1281  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1282  LogDebug("DAQSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1283  << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1284  << " GB/s)";
1285  };
1286 
1287  //randomized order multi-file loop
1288  for (unsigned int j : file->fileOrder_) {
1289  if (j == 0) {
1290  readPrimary(bufferLeftInitial);
1291  } else
1292  readSecondary(file->bufferOffsets_[j], j);
1293 
1294  if (setExceptionState_)
1295  break;
1296  }
1297 
1298  if (setExceptionState_)
1299  continue;
1300 
1301  //detect FRD event version. Skip file Header if it exists
1302  if (dataMode_->dataVersion() == 0 && chunk->offset_ == 0) {
1303  dataMode_->detectVersion(chunk->buf_, file->rawHeaderSize_);
1304  }
1305  assert(dataMode_->versionCheck());
1306 
1307  chunk->readComplete_ =
1308  true; //this is atomic to secure the sequential buffer fill before becoming available for processing)
1309  file->chunks_[chunk->fileIndex_] = chunk; //put the completed chunk in the file chunk vector at predetermined index
1310  }
1311 }
Definition: start.py:1
uint64_t maxChunkSize_
Definition: DAQSource.h:92
std::mutex startupLock_
Definition: DAQSource.h:155
std::condition_variable startupCv_
Definition: DAQSource.h:156
std::atomic< bool > threadInit_
Definition: DAQSource.h:171
void read(edm::EventPrincipal &eventPrincipal) override
Definition: DAQSource.cc:538
Log< level::Error, false > LogError
std::mutex mReader_
Definition: DAQSource.h:148
assert(be >=bs)
uint64_t eventChunkSize_
Definition: DAQSource.h:91
U second(std::pair< T, U > const &p)
uint64_t eventChunkBlock_
Definition: DAQSource.h:93
unsigned char * buf_
std::shared_ptr< DataMode > dataMode_
Definition: DAQSource.h:176
unsigned int fileIndex_
Log< level::Info, false > LogInfo
std::atomic< bool > readComplete_
Definition: init.py:1
unsigned long long uint64_t
Definition: Time.h:13
std::vector< unsigned int > tid_active_
Definition: DAQSource.h:150
std::vector< unsigned int > thread_quit_signal
Definition: DAQSource.h:153
bool setExceptionState_
Definition: DAQSource.h:154
std::vector< std::unique_ptr< std::condition_variable > > cvReader_
Definition: DAQSource.h:149
unsigned int numConcurrentReads_
Definition: DAQSource.h:97
int addFile(MEStore &micromes, int fd)
Definition: fastHadd.cc:352
tbb::concurrent_queue< unsigned int > workerPool_
Definition: DAQSource.h:142
bool resize(uint64_t wantedSize, uint64_t maxSize)
std::vector< ReaderInfo > workerJob_
Definition: DAQSource.h:143
#define LogDebug(id)

◆ reportEventsThisLumiInSource()

void DAQSource::reportEventsThisLumiInSource ( unsigned int  lumi,
unsigned int  events 
)
private

Definition at line 1369 of file DAQSource.cc.

References events, CommonMethods::lock(), monlock_, and sourceEventsReport_.

Referenced by checkNext(), and getNextDataBlock().

1369  {
1370  std::lock_guard<std::mutex> lock(monlock_);
1371  auto itr = sourceEventsReport_.find(lumi);
1372  if (itr != sourceEventsReport_.end())
1373  itr->second += events;
1374  else
1376 }
std::map< unsigned int, unsigned int > sourceEventsReport_
Definition: DAQSource.h:173
int events
std::mutex monlock_
Definition: DAQSource.h:174

◆ rewind_()

void DAQSource::rewind_ ( )
overrideprivatevirtual

Reimplemented from edm::InputSource.

Definition at line 579 of file DAQSource.cc.

579 {}

◆ setMonState()

void DAQSource::setMonState ( evf::FastMonState::InputState  state)
protected

Definition at line 1318 of file DAQSource.cc.

References fms_, evf::FastMonitoringService::setInState(), and edm::InputSource::state().

Referenced by RawInputFile::advance(), checkNext(), getNextDataBlock(), getNextEventFromDataBlock(), and read().

1318  {
1319  if (fms_)
1320  fms_->setInState(state);
1321 }
evf::FastMonitoringService * fms_
Definition: DAQSource.h:87
ItemType state() const
Definition: InputSource.h:332
void setInState(FastMonState::InputState inputState)

◆ setMonStateSup()

void DAQSource::setMonStateSup ( evf::FastMonState::InputState  state)
protected

Definition at line 1323 of file DAQSource.cc.

References fms_, evf::FastMonitoringService::setInStateSup(), and edm::InputSource::state().

Referenced by readSupervisor().

1323  {
1324  if (fms_)
1326 }
evf::FastMonitoringService * fms_
Definition: DAQSource.h:87
void setInStateSup(FastMonState::InputState inputState)
ItemType state() const
Definition: InputSource.h:332

◆ threadError()

void DAQSource::threadError ( )
private

Definition at line 1313 of file DAQSource.cc.

References Exception, and quit_threads_.

Referenced by RawInputFile::advance(), and getNextDataBlock().

1313  {
1314  quit_threads_ = true;
1315  throw cms::Exception("DAQSource:threadError") << " file reader thread error ";
1316 }
std::atomic< bool > quit_threads_
Definition: DAQSource.h:152

◆ useL1EventID()

bool DAQSource::useL1EventID ( ) const
inline

Definition at line 51 of file DAQSource.h.

References useL1EventID_.

Referenced by DataModeFRD::readEvent(), and DataModeFRDStriped::readEvent().

51 { return useL1EventID_; }
const bool useL1EventID_
Definition: DAQSource.h:103

Friends And Related Function Documentation

◆ InputChunk

friend struct InputChunk
friend

Definition at line 43 of file DAQSource.h.

Referenced by DAQSource().

◆ RawInputFile

friend class RawInputFile
friend

Definition at line 42 of file DAQSource.h.

Referenced by readSupervisor().

Member Data Documentation

◆ alwaysStartFromFirstLS_

const bool DAQSource::alwaysStartFromFirstLS_
private

Definition at line 101 of file DAQSource.h.

Referenced by readSupervisor().

◆ checkEvery_

unsigned int DAQSource::checkEvery_ = 10
private

Definition at line 162 of file DAQSource.h.

Referenced by read().

◆ chunkIsFree_

bool DAQSource::chunkIsFree_ = false
private

Definition at line 135 of file DAQSource.h.

Referenced by getNextDataBlock(), and read().

◆ currentFile_

std::unique_ptr<RawInputFile> DAQSource::currentFile_
private

Definition at line 134 of file DAQSource.h.

Referenced by getNextDataBlock(), getNextEventFromDataBlock(), read(), and ~DAQSource().

◆ currentFileIndex_

int DAQSource::currentFileIndex_ = -1
private

Definition at line 158 of file DAQSource.h.

Referenced by getNextDataBlock(), and read().

◆ currentLumiSection_

unsigned int DAQSource::currentLumiSection_
private

◆ cvReader_

std::vector<std::unique_ptr<std::condition_variable> > DAQSource::cvReader_
private

Definition at line 149 of file DAQSource.h.

Referenced by DAQSource(), readSupervisor(), readWorker(), and ~DAQSource().

◆ cvWakeup_

std::condition_variable DAQSource::cvWakeup_
private

Definition at line 166 of file DAQSource.h.

Referenced by getNextDataBlock(), and readSupervisor().

◆ daqDirector_

evf::EvFDaqDirector* DAQSource::daqDirector_ = nullptr
private

Definition at line 88 of file DAQSource.h.

Referenced by checkNext(), DAQSource(), getNextDataBlock(), and readSupervisor().

◆ dataArrangerThread_

std::unique_ptr<std::thread> DAQSource::dataArrangerThread_
private

Definition at line 139 of file DAQSource.h.

◆ dataMode_

std::shared_ptr<DataMode> DAQSource::dataMode_
private

◆ dataModeConfig_

const std::string DAQSource::dataModeConfig_
private

Definition at line 90 of file DAQSource.h.

Referenced by DAQSource().

◆ eventChunkBlock_

uint64_t DAQSource::eventChunkBlock_
private

Definition at line 93 of file DAQSource.h.

Referenced by DAQSource(), and readWorker().

◆ eventChunkSize_

uint64_t DAQSource::eventChunkSize_
private

Definition at line 91 of file DAQSource.h.

Referenced by DAQSource(), readSupervisor(), and readWorker().

◆ eventRunNumber_

uint32_t DAQSource::eventRunNumber_ = 0
private

Definition at line 120 of file DAQSource.h.

Referenced by checkNext(), and eventRunNumber().

◆ eventsThisLumi_

unsigned int DAQSource::eventsThisLumi_
private

Definition at line 122 of file DAQSource.h.

Referenced by checkNext(), getNextDataBlock(), and read().

◆ eventsThisRun_

unsigned long DAQSource::eventsThisRun_ = 0
private

Definition at line 123 of file DAQSource.h.

◆ fileDeleteLock_

std::mutex DAQSource::fileDeleteLock_
private

Definition at line 160 of file DAQSource.h.

Referenced by DAQSource(), getNextDataBlock(), read(), and ~DAQSource().

◆ fileDescriptor_

int DAQSource::fileDescriptor_ = -1
private

Definition at line 169 of file DAQSource.h.

◆ fileListIndex_

unsigned int DAQSource::fileListIndex_ = 0
private

Definition at line 110 of file DAQSource.h.

Referenced by getFile().

◆ fileListLoopMode_

const bool DAQSource::fileListLoopMode_
private

Definition at line 111 of file DAQSource.h.

Referenced by checkNext(), DAQSource(), fileListLoopMode(), and getFile().

◆ fileListMode_

const bool DAQSource::fileListMode_
private

Definition at line 109 of file DAQSource.h.

Referenced by checkNext(), DAQSource(), getNextDataBlock(), and readSupervisor().

◆ fileQueue_

tbb::concurrent_queue<std::unique_ptr<RawInputFile> > DAQSource::fileQueue_
private

Definition at line 146 of file DAQSource.h.

Referenced by getNextDataBlock(), and readSupervisor().

◆ filesToDelete_

std::list<std::pair<int, std::unique_ptr<InputFile> > > DAQSource::filesToDelete_
private

Definition at line 159 of file DAQSource.h.

Referenced by DAQSource(), getNextDataBlock(), read(), and ~DAQSource().

◆ fms_

evf::FastMonitoringService* DAQSource::fms_ = nullptr
private

◆ freeChunks_

tbb::concurrent_queue<InputChunk*> DAQSource::freeChunks_
private

Definition at line 145 of file DAQSource.h.

Referenced by DAQSource(), getNextDataBlock(), read(), and readSupervisor().

◆ fuOutputDir_

std::string DAQSource::fuOutputDir_
private

Definition at line 115 of file DAQSource.h.

◆ GTPEventID_

uint32_t DAQSource::GTPEventID_ = 0
private

Definition at line 121 of file DAQSource.h.

◆ listFileNames_

std::vector<std::string> DAQSource::listFileNames_
private

Definition at line 105 of file DAQSource.h.

Referenced by getFile(), and initFileList().

◆ loopModeIterationInc_

unsigned int DAQSource::loopModeIterationInc_ = 0
private

Definition at line 112 of file DAQSource.h.

Referenced by getFile().

◆ maxBufferedFiles_

unsigned int DAQSource::maxBufferedFiles_
private

Definition at line 96 of file DAQSource.h.

Referenced by readSupervisor().

◆ maxChunkSize_

uint64_t DAQSource::maxChunkSize_
private

Definition at line 92 of file DAQSource.h.

Referenced by DAQSource(), readSupervisor(), and readWorker().

◆ monlock_

std::mutex DAQSource::monlock_
private

Definition at line 174 of file DAQSource.h.

Referenced by getEventReport(), and reportEventsThisLumiInSource().

◆ mReader_

std::mutex DAQSource::mReader_
private

Definition at line 148 of file DAQSource.h.

Referenced by readSupervisor(), readWorker(), and ~DAQSource().

◆ mWakeup_

std::mutex DAQSource::mWakeup_
private

Definition at line 165 of file DAQSource.h.

Referenced by getNextDataBlock(), and readSupervisor().

◆ numBuffers_

unsigned int DAQSource::numBuffers_
private

Definition at line 95 of file DAQSource.h.

Referenced by DAQSource().

◆ numConcurrentReads_

unsigned int DAQSource::numConcurrentReads_
private

Definition at line 97 of file DAQSource.h.

Referenced by DAQSource(), and readWorker().

◆ processHistoryID_

edm::ProcessHistoryID DAQSource::processHistoryID_
private

Definition at line 117 of file DAQSource.h.

Referenced by DAQSource(), maybeOpenNewLumiSection(), and processHistoryID().

◆ quit_threads_

std::atomic<bool> DAQSource::quit_threads_
private

Definition at line 152 of file DAQSource.h.

Referenced by DAQSource(), readSupervisor(), threadError(), and ~DAQSource().

◆ readBlocks_

unsigned int DAQSource::readBlocks_
private

Definition at line 94 of file DAQSource.h.

Referenced by DAQSource().

◆ readingFilesCount_

std::atomic<unsigned int> DAQSource::readingFilesCount_
private

Definition at line 98 of file DAQSource.h.

Referenced by DAQSource(), getNextDataBlock(), and readSupervisor().

◆ readSupervisorThread_

std::unique_ptr<std::thread> DAQSource::readSupervisorThread_
private

Definition at line 138 of file DAQSource.h.

Referenced by checkNext(), and ~DAQSource().

◆ rng_

std::default_random_engine DAQSource::rng_
private

Definition at line 124 of file DAQSource.h.

Referenced by readSupervisor().

◆ runNumber_

edm::RunNumber_t DAQSource::runNumber_
private

Definition at line 114 of file DAQSource.h.

Referenced by checkNext(), and DAQSource().

◆ setExceptionState_

bool DAQSource::setExceptionState_ = false
private

Definition at line 154 of file DAQSource.h.

Referenced by exceptionState(), getNextDataBlock(), readSupervisor(), and readWorker().

◆ sourceEventsReport_

std::map<unsigned int, unsigned int> DAQSource::sourceEventsReport_
private

Definition at line 173 of file DAQSource.h.

Referenced by getEventReport(), and reportEventsThisLumiInSource().

◆ startedSupervisorThread_

bool DAQSource::startedSupervisorThread_ = false
private

Definition at line 137 of file DAQSource.h.

Referenced by checkNext(), and ~DAQSource().

◆ startupCv_

std::condition_variable DAQSource::startupCv_
private

Definition at line 156 of file DAQSource.h.

Referenced by checkNext(), DAQSource(), readSupervisor(), and readWorker().

◆ startupLock_

std::mutex DAQSource::startupLock_
private

Definition at line 155 of file DAQSource.h.

Referenced by checkNext(), DAQSource(), readSupervisor(), and readWorker().

◆ streamFileTracker_

std::vector<int> DAQSource::streamFileTracker_
private

Definition at line 161 of file DAQSource.h.

Referenced by read().

◆ testTCDSFEDRange_

const std::vector<unsigned int> DAQSource::testTCDSFEDRange_
private

Definition at line 104 of file DAQSource.h.

Referenced by DAQSource().

◆ thread_quit_signal

std::vector<unsigned int> DAQSource::thread_quit_signal
private

Definition at line 153 of file DAQSource.h.

Referenced by DAQSource(), readSupervisor(), readWorker(), and ~DAQSource().

◆ threadInit_

std::atomic<bool> DAQSource::threadInit_
private

Definition at line 171 of file DAQSource.h.

Referenced by readWorker().

◆ tid_active_

std::vector<unsigned int> DAQSource::tid_active_
private

Definition at line 150 of file DAQSource.h.

Referenced by DAQSource(), readSupervisor(), and readWorker().

◆ useFileBroker_

bool DAQSource::useFileBroker_
private

Definition at line 106 of file DAQSource.h.

◆ useL1EventID_

const bool DAQSource::useL1EventID_
private

Definition at line 103 of file DAQSource.h.

Referenced by useL1EventID().

◆ verifyChecksum_

const bool DAQSource::verifyChecksum_
private

Definition at line 102 of file DAQSource.h.

Referenced by getNextEventFromDataBlock().

◆ workerJob_

std::vector<ReaderInfo> DAQSource::workerJob_
private

Definition at line 143 of file DAQSource.h.

Referenced by DAQSource(), readSupervisor(), and readWorker().

◆ workerPool_

tbb::concurrent_queue<unsigned int> DAQSource::workerPool_
private

Definition at line 142 of file DAQSource.h.

Referenced by readSupervisor(), and readWorker().

◆ workerThreads_

std::vector<std::thread*> DAQSource::workerThreads_
private

Definition at line 140 of file DAQSource.h.

Referenced by DAQSource(), readSupervisor(), and ~DAQSource().