CMS 3D CMS Logo

List of all members | Public Member Functions | Static Public Member Functions | Protected Member Functions | Private Types | Private Member Functions | Private Attributes | Friends
FedRawDataInputSource Class Reference

#include <FedRawDataInputSource.h>

Inheritance diagram for FedRawDataInputSource:
edm::RawInputSource edm::InputSource

Public Member Functions

 FedRawDataInputSource (edm::ParameterSet const &, edm::InputSourceDescription const &)
 
std::pair< bool, unsigned int > getEventReport (unsigned int lumi, bool erase)
 
 ~FedRawDataInputSource () override
 
- Public Member Functions inherited from edm::RawInputSource
 RawInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
 
 ~RawInputSource () override
 
- Public Member Functions inherited from edm::InputSource
std::shared_ptr< ActivityRegistryactReg () const
 Accessor for Activity Registry. More...
 
std::shared_ptr< BranchIDListHelper > & branchIDListHelper ()
 
std::shared_ptr< BranchIDListHelper const > branchIDListHelper () const
 Accessors for branchIDListHelper. More...
 
void closeFile (FileBlock *, bool cleaningUpAfterException)
 close current file More...
 
void doBeginJob ()
 Called by framework at beginning of job. More...
 
virtual void doBeginLumi (LuminosityBlockPrincipal &lbp, ProcessContext const *)
 Called by framework at beginning of lumi block. More...
 
virtual void doBeginRun (RunPrincipal &rp, ProcessContext const *)
 Called by framework at beginning of run. More...
 
void doEndJob ()
 Called by framework at end of job. More...
 
ProcessingController::ForwardState forwardState () const
 
bool goToEvent (EventID const &eventID)
 
 InputSource (InputSource const &)=delete
 
 InputSource (ParameterSet const &, InputSourceDescription const &)
 Constructor. More...
 
void issueReports (EventID const &eventID, StreamID streamID)
 issue an event report More...
 
LuminosityBlockNumber_t luminosityBlock () const
 Accessor for current luminosity block number. More...
 
std::shared_ptr< LuminosityBlockAuxiliaryluminosityBlockAuxiliary () const
 Called by the framework to merge or insert lumi in principal cache. More...
 
int maxEvents () const
 
int maxLuminosityBlocks () const
 
ModuleDescription const & moduleDescription () const
 Accessor for 'module' description. More...
 
ItemType nextItemType ()
 Advances the source to the next item. More...
 
InputSourceoperator= (InputSource const &)=delete
 
ProcessConfiguration const & processConfiguration () const
 Accessor for Process Configuration. More...
 
std::string const & processGUID () const
 Accessor for global process identifier. More...
 
ProcessHistoryRegistryprocessHistoryRegistry ()
 
ProcessHistoryRegistry const & processHistoryRegistry () const
 Accessors for process history registry. More...
 
ProcessingMode processingMode () const
 RunsLumisAndEvents (default), RunsAndLumis, or Runs. More...
 
std::shared_ptr< ProductRegistry > & productRegistry ()
 
std::shared_ptr< ProductRegistry const > productRegistry () const
 Accessors for product registry. More...
 
bool randomAccess () const
 
void readAndMergeLumi (LuminosityBlockPrincipal &lbp)
 Read next luminosity block (same as a prior lumi) More...
 
void readAndMergeRun (RunPrincipal &rp)
 Read next run (same as a prior run) More...
 
bool readEvent (EventPrincipal &ep, EventID const &, StreamContext &)
 Read a specific event. More...
 
void readEvent (EventPrincipal &ep, StreamContext &)
 Read next event. More...
 
std::unique_ptr< FileBlockreadFile ()
 Read next file. More...
 
void readLuminosityBlock (LuminosityBlockPrincipal &lumiPrincipal, HistoryAppender &historyAppender)
 Read next luminosity block (new lumi) More...
 
std::shared_ptr< LuminosityBlockAuxiliaryreadLuminosityBlockAuxiliary ()
 Read next luminosity block Auxilary. More...
 
void readRun (RunPrincipal &runPrincipal, HistoryAppender &historyAppender)
 Read next run (new run) More...
 
std::shared_ptr< RunAuxiliaryreadRunAuxiliary ()
 Read next run Auxiliary. More...
 
ProcessHistoryID const & reducedProcessHistoryID () const
 
virtual void registerProducts ()
 Register any produced products. More...
 
int remainingEvents () const
 
int remainingLuminosityBlocks () const
 
void repeat ()
 Reset the remaining number of events/lumis to the maximum number. More...
 
std::pair< SharedResourcesAcquirer *, std::recursive_mutex * > resourceSharedWithDelayedReader ()
 Returns nullptr if no resource shared between the Source and a DelayedReader. More...
 
ProcessingController::ReverseState reverseState () const
 
void rewind ()
 Begin again at the first event. More...
 
RunNumber_t run () const
 Accessor for current run number. More...
 
std::shared_ptr< RunAuxiliaryrunAuxiliary () const
 Called by the framework to merge or insert run in principal cache. More...
 
void setLuminosityBlockNumber_t (LuminosityBlockNumber_t lb)
 Set the luminosity block ID. More...
 
void setRunNumber (RunNumber_t r)
 Set the run number. More...
 
void skipEvents (int offset)
 
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper ()
 
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper () const
 Accessors for thinnedAssociationsHelper. More...
 
Timestamp const & timestamp () const
 Accessor for the current time, as seen by the input source. More...
 
virtual ~InputSource () noexcept(false)
 Destructor. More...
 

Static Public Member Functions

static void fillDescriptions (edm::ConfigurationDescriptions &descriptions)
 
- Static Public Member Functions inherited from edm::RawInputSource
static void fillDescription (ParameterSetDescription &description)
 
- Static Public Member Functions inherited from edm::InputSource
static const std::string & baseType ()
 
static void fillDescription (ParameterSetDescription &desc)
 
static void fillDescriptions (ConfigurationDescriptions &descriptions)
 
static void prevalidate (ConfigurationDescriptions &)
 

Protected Member Functions

Next checkNext () override
 
void read (edm::EventPrincipal &eventPrincipal) override
 
- Protected Member Functions inherited from edm::RawInputSource
void makeEvent (EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
 
void setInputFileTransitionsEachEvent ()
 
- Protected Member Functions inherited from edm::InputSource
virtual void beginJob ()
 Begin protected makes it easier to do template programming. More...
 
void decreaseRemainingEventsBy (int iSkipped)
 
bool eventCached () const
 
bool newLumi () const
 
bool newRun () const
 
ProcessHistoryRegistryprocessHistoryRegistryForUpdate ()
 
ProductRegistryproductRegistryUpdate ()
 
void reset () const
 
void resetEventCached ()
 
void resetLuminosityBlockAuxiliary (bool isNewLumi=true) const
 
void resetNewLumi ()
 
void resetNewRun ()
 
void resetRunAuxiliary (bool isNewRun=true) const
 
void setEventCached ()
 Called by the framework to merge or ached() const {return eventCached_;}. More...
 
void setLuminosityBlockAuxiliary (LuminosityBlockAuxiliary *lbp)
 
void setNewLumi ()
 
void setNewRun ()
 
void setRunAuxiliary (RunAuxiliary *rp)
 
void setTimestamp (Timestamp const &theTime)
 To set the current time, as seen by the input source. More...
 
virtual void skip (int offset)
 
ItemType state () const
 

Private Types

typedef std::pair< InputFile *, InputChunk * > ReaderInfo
 

Private Member Functions

bool exceptionState ()
 
edm::Timestamp fillFEDRawDataCollection (FEDRawDataCollection &)
 
evf::EvFDaqDirector::FileStatus getFile (unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
 
evf::EvFDaqDirector::FileStatus getNextEvent ()
 
long initFileList ()
 
void maybeOpenNewLumiSection (const uint32_t lumiSection)
 
evf::EvFDaqDirector::FileStatus nextEvent ()
 
void readNextChunkIntoBuffer (InputFile *file)
 
void readSupervisor ()
 
void readWorker (unsigned int tid)
 
void reportEventsThisLumiInSource (unsigned int lumi, unsigned int events)
 
void rewind_ () override
 
void threadError ()
 

Private Attributes

const bool alwaysStartFromFirstLS_
 
uint32_t bufferInputRead_ = 0
 
unsigned int checkEvery_ = 10
 
bool chunkIsFree_ = false
 
std::unique_ptr< InputFilecurrentFile_
 
int currentFileIndex_ = -1
 
unsigned int currentLumiSection_
 
std::vector< std::condition_variable * > cvReader_
 
std::condition_variable cvWakeup_
 
evf::EvFDaqDirectordaqDirector_ = nullptr
 
const edm::DaqProvenanceHelper daqProvenanceHelper_
 
std::string defPath_
 
uint32 detectedFRDversion_ = 0
 
std::unique_ptr< FRDEventMsgViewevent_
 
unsigned int eventChunkBlock_
 
unsigned int eventChunkSize_
 
edm::EventID eventID_
 
uint32_t eventRunNumber_ = 0
 
unsigned int eventsThisLumi_
 
unsigned long eventsThisRun_ = 0
 
std::mutex fileDeleteLock_
 
int fileDescriptor_ = -1
 
unsigned int fileListIndex_ = 0
 
const bool fileListLoopMode_
 
const bool fileListMode_
 
std::vector< std::string > fileNames_
 
std::list< std::pair< int, std::string > > fileNamesToDelete_
 
tbb::concurrent_queue< std::unique_ptr< InputFile > > fileQueue_
 
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
 
evf::FastMonitoringServicefms_ = nullptr
 
tbb::concurrent_queue< InputChunk * > freeChunks_
 
std::string fuOutputDir_
 
const bool getLSFromFilename_
 
uint32_t GTPEventID_ = 0
 
uint32_t L1EventID_ = 0
 
unsigned int loopModeIterationInc_ = 0
 
unsigned int maxBufferedFiles_
 
std::mutex monlock_
 
std::mutex mReader_
 
std::mutex mWakeup_
 
unsigned int nStreams_ = 0
 
unsigned int numBuffers_
 
unsigned int numConcurrentReads_
 
edm::ProcessHistoryID processHistoryID_
 
std::atomic< bool > quit_threads_
 
unsigned int readBlocks_
 
std::atomic< unsigned int > readingFilesCount_
 
std::unique_ptr< std::thread > readSupervisorThread_
 
edm::RunNumber_t runNumber_
 
bool setExceptionState_ = false
 
bool singleBufferMode_
 
std::map< unsigned int, unsigned int > sourceEventsReport_
 
bool startedSupervisorThread_ = false
 
std::condition_variable startupCv_
 
std::mutex startupLock_
 
std::vector< int > streamFileTracker_
 
unsigned char * tcds_pointer_
 
std::vector< unsigned int > thread_quit_signal
 
std::atomic< bool > threadInit_
 
std::vector< unsigned int > tid_active_
 
bool useFileBroker_
 
const bool useL1EventID_
 
const bool verifyChecksum_
 
std::vector< ReaderInfoworkerJob_
 
tbb::concurrent_queue< unsigned int > workerPool_
 
std::vector< std::thread * > workerThreads_
 

Friends

struct InputChunk
 
struct InputFile
 

Additional Inherited Members

- Public Types inherited from edm::RawInputSource
enum  Next { Next::kEvent, Next::kFile, Next::kStop }
 
- Public Types inherited from edm::InputSource
enum  ItemType {
  IsInvalid, IsStop, IsFile, IsRun,
  IsLumi, IsEvent, IsRepeat, IsSynchronize
}
 
enum  ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents }
 
- Public Attributes inherited from edm::InputSource
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> postEventReadFromSourceSignal_
 
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> preEventReadFromSourceSignal_
 

Detailed Description

Definition at line 36 of file FedRawDataInputSource.h.

Member Typedef Documentation

◆ ReaderInfo

Definition at line 128 of file FedRawDataInputSource.h.

Constructor & Destructor Documentation

◆ FedRawDataInputSource()

FedRawDataInputSource::FedRawDataInputSource ( edm::ParameterSet const &  pset,
edm::InputSourceDescription const &  desc 
)
explicit

Definition at line 53 of file FedRawDataInputSource.cc.

54  : edm::RawInputSource(pset, desc),
55  defPath_(pset.getUntrackedParameter<std::string>("buDefPath", "")),
56  eventChunkSize_(pset.getUntrackedParameter<unsigned int>("eventChunkSize", 32) * 1048576),
57  eventChunkBlock_(pset.getUntrackedParameter<unsigned int>("eventChunkBlock", 32) * 1048576),
58  numBuffers_(pset.getUntrackedParameter<unsigned int>("numBuffers", 2)),
59  maxBufferedFiles_(pset.getUntrackedParameter<unsigned int>("maxBufferedFiles", 2)),
60  getLSFromFilename_(pset.getUntrackedParameter<bool>("getLSFromFilename", true)),
61  alwaysStartFromFirstLS_(pset.getUntrackedParameter<bool>("alwaysStartFromFirstLS", false)),
62  verifyChecksum_(pset.getUntrackedParameter<bool>("verifyChecksum", true)),
63  useL1EventID_(pset.getUntrackedParameter<bool>("useL1EventID", false)),
64  fileNames_(pset.getUntrackedParameter<std::vector<std::string>>("fileNames", std::vector<std::string>())),
65  fileListMode_(pset.getUntrackedParameter<bool>("fileListMode", false)),
66  fileListLoopMode_(pset.getUntrackedParameter<bool>("fileListLoopMode", false)),
69  eventID_(),
72  tcds_pointer_(nullptr),
73  eventsThisLumi_(0) {
74  char thishost[256];
75  gethostname(thishost, 255);
76  edm::LogInfo("FedRawDataInputSource") << "Construction. read-ahead chunk size -: " << std::endl
77  << (eventChunkSize_ / 1048576) << " MB on host " << thishost;
78 
79  long autoRunNumber = -1;
80  if (fileListMode_) {
81  autoRunNumber = initFileList();
82  if (!fileListLoopMode_) {
83  if (autoRunNumber < 0)
84  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource") << "Run number not found from filename";
85  //override run number
86  runNumber_ = (edm::RunNumber_t)autoRunNumber;
87  edm::Service<evf::EvFDaqDirector>()->overrideRunNumber((unsigned int)autoRunNumber);
88  }
89  }
90 
92  setNewRun();
93  //todo:autodetect from file name (assert if names differ)
95 
96  //make sure that chunk size is N * block size
101 
102  if (!numBuffers_)
103  throw cms::Exception("FedRawDataInputSource::FedRawDataInputSource")
104  << "no reading enabled with numBuffers parameter 0";
105 
108  readingFilesCount_ = 0;
109 
110  if (!crc32c_hw_test())
111  edm::LogError("FedRawDataInputSource::FedRawDataInputSource") << "Intel crc32c checksum computation unavailable";
112 
113  //get handles to DaqDirector and FastMonitoringService because getting them isn't possible in readSupervisor thread
114  if (fileListMode_) {
115  try {
116  fms_ = static_cast<evf::FastMonitoringService*>(edm::Service<evf::MicroStateService>().operator->());
117  } catch (cms::Exception const&) {
118  edm::LogInfo("FedRawDataInputSource") << "No FastMonitoringService found in the configuration";
119  }
120  } else {
121  fms_ = static_cast<evf::FastMonitoringService*>(edm::Service<evf::MicroStateService>().operator->());
122  if (!fms_) {
123  throw cms::Exception("FedRawDataInputSource") << "FastMonitoringService not found";
124  }
125  }
126 
128  if (!daqDirector_)
129  cms::Exception("FedRawDataInputSource") << "EvFDaqDirector not found";
130 
132  if (useFileBroker_)
133  edm::LogInfo("FedRawDataInputSource") << "EvFDaqDirector/Source configured to use file service";
134  //set DaqDirector to delete files in preGlobalEndLumi callback
136  if (fms_) {
138  fms_->setInputSource(this);
141  }
142  //should delete chunks when run stops
143  for (unsigned int i = 0; i < numBuffers_; i++) {
145  }
146 
147  quit_threads_ = false;
148 
149  for (unsigned int i = 0; i < numConcurrentReads_; i++) {
150  std::unique_lock<std::mutex> lk(startupLock_);
151  //issue a memory fence here and in threads (constructor was segfaulting without this)
152  thread_quit_signal.push_back(false);
153  workerJob_.push_back(ReaderInfo(nullptr, nullptr));
154  cvReader_.push_back(new std::condition_variable);
155  tid_active_.push_back(0);
156  threadInit_.store(false, std::memory_order_release);
157  workerThreads_.push_back(new std::thread(&FedRawDataInputSource::readWorker, this, i));
158  startupCv_.wait(lk);
159  }
160 
161  runAuxiliary()->setProcessHistoryID(processHistoryID_);
162 }

References cms::cuda::assert(), edm::Timestamp::beginOfTime(), crc32c_hw_test(), cvReader_, daqDirector_, edm::DaqProvenanceHelper::daqInit(), daqProvenanceHelper_, eventChunkBlock_, eventChunkSize_, Exception, fileDeleteLock_, fileListLoopMode_, fileListMode_, filesToDelete_, fms_, freeChunks_, mps_fire::i, evf::FastMonitoringThread::inInit, initFileList(), InputChunk, edm::Timestamp::invalidTimestamp(), numBuffers_, numConcurrentReads_, Utilities::operator, processHistoryID_, edm::InputSource::processHistoryRegistryForUpdate(), edm::InputSource::productRegistryUpdate(), quit_threads_, readBlocks_, readingFilesCount_, readWorker(), edm::InputSource::runAuxiliary(), runNumber_, evf::EvFDaqDirector::setDeleteTracking(), evf::EvFDaqDirector::setFMS(), evf::FastMonitoringService::setInputSource(), evf::FastMonitoringService::setInState(), evf::FastMonitoringService::setInStateSup(), edm::InputSource::setNewRun(), edm::InputSource::setRunAuxiliary(), singleBufferMode_, startupCv_, startupLock_, thread_quit_signal, threadInit_, tid_active_, evf::EvFDaqDirector::useFileBroker(), useFileBroker_, workerJob_, and workerThreads_.

◆ ~FedRawDataInputSource()

FedRawDataInputSource::~FedRawDataInputSource ( )
override

Definition at line 164 of file FedRawDataInputSource.cc.

164  {
165  quit_threads_ = true;
166 
167  //delete any remaining open files
168  for (auto it = filesToDelete_.begin(); it != filesToDelete_.end(); it++) {
169  std::string fileToDelete = it->second->fileName_;
170  it->second.release();
171  }
173  readSupervisorThread_->join();
174  } else {
175  //join aux threads in case the supervisor thread was not started
176  for (unsigned int i = 0; i < workerThreads_.size(); i++) {
177  std::unique_lock<std::mutex> lk(mReader_);
178  thread_quit_signal[i] = true;
179  cvReader_[i]->notify_one();
180  lk.unlock();
181  workerThreads_[i]->join();
182  delete workerThreads_[i];
183  }
184  }
185  for (unsigned int i = 0; i < numConcurrentReads_; i++)
186  delete cvReader_[i];
187  /*
188  for (unsigned int i=0;i<numConcurrentReads_+1;i++) {
189  InputChunk *ch;
190  while (!freeChunks_.try_pop(ch)) {}
191  delete ch;
192  }
193  */
194 }

References cvReader_, filesToDelete_, mps_fire::i, mReader_, numConcurrentReads_, quit_threads_, readSupervisorThread_, startedSupervisorThread_, AlCaHLTBitMon_QueryRunRegistry::string, thread_quit_signal, and workerThreads_.

Member Function Documentation

◆ checkNext()

edm::RawInputSource::Next FedRawDataInputSource::checkNext ( )
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 219 of file FedRawDataInputSource.cc.

219  {
221  //this thread opens new files and dispatches reading to worker readers
222  //threadInit_.store(false,std::memory_order_release);
223  std::unique_lock<std::mutex> lk(startupLock_);
224  readSupervisorThread_.reset(new std::thread(&FedRawDataInputSource::readSupervisor, this));
226  startupCv_.wait(lk);
227  }
228  //signal hltd to start event accounting
229  if (!currentLumiSection_)
231  if (fms_)
233  switch (nextEvent()) {
235  //maybe create EoL file in working directory before ending run
236  struct stat buf;
237  if (!useFileBroker_ && currentLumiSection_ > 0) {
238  bool eolFound = (stat(daqDirector_->getEoLSFilePathOnBU(currentLumiSection_).c_str(), &buf) == 0);
239  if (eolFound) {
241  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
242  if (!found) {
244  int eol_fd =
245  open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
246  close(eol_fd);
248  }
249  }
250  }
251  //also create EoR file in FU data directory
252  bool eorFound = (stat(daqDirector_->getEoRFilePathOnFU().c_str(), &buf) == 0);
253  if (!eorFound) {
254  int eor_fd = open(daqDirector_->getEoRFilePathOnFU().c_str(),
255  O_RDWR | O_CREAT,
256  S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
257  close(eor_fd);
258  }
260  eventsThisLumi_ = 0;
262  edm::LogInfo("FedRawDataInputSource") << "----------------RUN ENDED----------------";
263  return Next::kStop;
264  }
266  //this is not reachable
267  return Next::kEvent;
268  }
270  //std::cout << "--------------NEW LUMI---------------" << std::endl;
271  return Next::kEvent;
272  }
273  default: {
274  if (!getLSFromFilename_) {
275  //get new lumi from file header
276  if (event_->lumi() > currentLumiSection_) {
278  eventsThisLumi_ = 0;
280  }
281  }
284  else
285  eventRunNumber_ = event_->run();
286  L1EventID_ = event_->event();
287 
288  setEventCached();
289 
290  return Next::kEvent;
291  }
292  }
293 }

References visDQMUpload::buf, evf::EvFDaqDirector::createProcessingNotificationMaybe(), currentLumiSection_, daqDirector_, event_, eventRunNumber_, eventsThisLumi_, fileListLoopMode_, fileListMode_, fms_, newFWLiteAna::found, evf::EvFDaqDirector::getEoLSFilePathOnBU(), evf::EvFDaqDirector::getEoLSFilePathOnFU(), evf::EvFDaqDirector::getEoRFilePathOnFU(), getLSFromFilename_, evf::FastMonitoringThread::inWaitInput, edm::RawInputSource::kEvent, edm::RawInputSource::kStop, L1EventID_, evf::EvFDaqDirector::lockFULocal2(), maybeOpenNewLumiSection(), evf::EvFDaqDirector::newLumi, nextEvent(), evf::EvFDaqDirector::noFile, readSupervisor(), readSupervisorThread_, reportEventsThisLumiInSource(), edm::InputSource::resetLuminosityBlockAuxiliary(), evf::EvFDaqDirector::runEnded, runNumber_, edm::InputSource::setEventCached(), evf::FastMonitoringService::setInState(), startedSupervisorThread_, startupCv_, startupLock_, hgcalPlots::stat, AlCaHLTBitMon_QueryRunRegistry::string, evf::EvFDaqDirector::unlockFULocal2(), and useFileBroker_.

◆ exceptionState()

bool FedRawDataInputSource::exceptionState ( )
inlineprivate

Definition at line 62 of file FedRawDataInputSource.h.

62 { return setExceptionState_; }

References setExceptionState_.

Referenced by InputFile::advance().

◆ fillDescriptions()

void FedRawDataInputSource::fillDescriptions ( edm::ConfigurationDescriptions descriptions)
static

Definition at line 196 of file FedRawDataInputSource.cc.

196  {
198  desc.setComment("File-based Filter Farm input source for reading raw data from BU ramdisk");
199  desc.addUntracked<unsigned int>("eventChunkSize", 32)->setComment("Input buffer (chunk) size");
200  desc.addUntracked<unsigned int>("eventChunkBlock", 32)
201  ->setComment("Block size used in a single file read call (must be smaller or equal to buffer size)");
202  desc.addUntracked<unsigned int>("numBuffers", 2)->setComment("Number of buffers used for reading input");
203  desc.addUntracked<unsigned int>("maxBufferedFiles", 2)
204  ->setComment("Maximum number of simultaneously buffered raw files");
205  desc.addUntracked<unsigned int>("alwaysStartFromfirstLS", false)
206  ->setComment("Force source to start from LS 1 if server provides higher lumisection number");
207  desc.addUntracked<bool>("verifyChecksum", true)
208  ->setComment("Verify event CRC-32C checksum of FRDv5 and higher or Adler32 with v3 and v4");
209  desc.addUntracked<bool>("useL1EventID", false)
210  ->setComment("Use L1 event ID from FED header if true or from TCDS FED if false");
211  desc.addUntracked<bool>("fileListMode", false)
212  ->setComment("Use fileNames parameter to directly specify raw files to open");
213  desc.addUntracked<std::vector<std::string>>("fileNames", std::vector<std::string>())
214  ->setComment("file list used when fileListMode is enabled");
215  desc.setAllowAnything();
216  descriptions.add("source", desc);
217 }

References edm::ConfigurationDescriptions::add(), edm::ParameterSetDescription::addUntracked(), edm::ParameterSetDescription::setAllowAnything(), and edm::ParameterSetDescription::setComment().

◆ fillFEDRawDataCollection()

edm::Timestamp FedRawDataInputSource::fillFEDRawDataCollection ( FEDRawDataCollection rawData)
private

Definition at line 683 of file FedRawDataInputSource.cc.

683  {
685  timeval stv;
686  gettimeofday(&stv, nullptr);
687  time = stv.tv_sec;
688  time = (time << 32) + stv.tv_usec;
689  edm::Timestamp tstamp(time);
690 
691  uint32_t eventSize = event_->eventSize();
692  unsigned char* event = (unsigned char*)event_->payload();
693  GTPEventID_ = 0;
694  tcds_pointer_ = nullptr;
695  while (eventSize > 0) {
696  assert(eventSize >= FEDTrailer::length);
697  eventSize -= FEDTrailer::length;
698  const FEDTrailer fedTrailer(event + eventSize);
699  const uint32_t fedSize = fedTrailer.fragmentLength() << 3; //trailer length counts in 8 bytes
700  assert(eventSize >= fedSize - FEDHeader::length);
701  eventSize -= (fedSize - FEDHeader::length);
702  const FEDHeader fedHeader(event + eventSize);
703  const uint16_t fedId = fedHeader.sourceID();
705  throw cms::Exception("FedRawDataInputSource::fillFEDRawDataCollection") << "Out of range FED ID : " << fedId;
706  }
708  tcds_pointer_ = event + eventSize;
709  }
711  if (evf::evtn::evm_board_sense(event + eventSize, fedSize))
712  GTPEventID_ = evf::evtn::get(event + eventSize, true);
713  else
714  GTPEventID_ = evf::evtn::get(event + eventSize, false);
715  //evf::evtn::evm_board_setformat(fedSize);
716  const uint64_t gpsl = evf::evtn::getgpslow(event + eventSize);
717  const uint64_t gpsh = evf::evtn::getgpshigh(event + eventSize);
718  tstamp = edm::Timestamp(static_cast<edm::TimeValue_t>((gpsh << 32) + gpsl));
719  }
720  //take event ID from GTPE FED
722  if (evf::evtn::gtpe_board_sense(event + eventSize)) {
723  GTPEventID_ = evf::evtn::gtpe_get(event + eventSize);
724  }
725  }
726  FEDRawData& fedData = rawData.FEDData(fedId);
727  fedData.resize(fedSize);
728  memcpy(fedData.data(), event + eventSize, fedSize);
729  }
730  assert(eventSize == 0);
731 
732  return tstamp;
733 }

References cms::cuda::assert(), FEDRawData::data(), event_, evf::evtn::evm_board_sense(), Exception, l1tstage2_dqm_sourceclient-live_cfg::fedId, FEDTrailer::fragmentLength(), evf::evtn::get(), evf::evtn::getgpshigh(), evf::evtn::getgpslow(), evf::evtn::gtpe_board_sense(), evf::evtn::gtpe_get(), GTPEventID_, FEDHeader::length, FEDTrailer::length, FEDNumbering::MAXFEDID, FEDNumbering::MINTCDSuTCAFEDID, FEDNumbering::MINTriggerEGTPFEDID, FEDNumbering::MINTriggerGTPFEDID, l1tstage2_dqm_sourceclient-live_cfg::rawData, FEDRawData::resize(), FEDHeader::sourceID(), tcds_pointer_, and ntuplemaker::time.

Referenced by read().

◆ getEventReport()

std::pair< bool, unsigned int > FedRawDataInputSource::getEventReport ( unsigned int  lumi,
bool  erase 
)

Definition at line 1467 of file FedRawDataInputSource.cc.

1467  {
1468  std::lock_guard<std::mutex> lock(monlock_);
1469  auto itr = sourceEventsReport_.find(lumi);
1470  if (itr != sourceEventsReport_.end()) {
1471  std::pair<bool, unsigned int> ret(true, itr->second);
1472  if (erase)
1473  sourceEventsReport_.erase(itr);
1474  return ret;
1475  } else
1476  return std::pair<bool, unsigned int>(false, 0);
1477 }

References CommonMethods::lock(), monlock_, runTheMatrix::ret, and sourceEventsReport_.

Referenced by evf::FastMonitoringService::preGlobalEndLumi().

◆ getFile()

evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getFile ( unsigned int &  ls,
std::string &  nextFile,
uint32_t &  fsize,
uint64_t &  lockWaitTime 
)
private

Definition at line 1509 of file FedRawDataInputSource.cc.

1512  {
1513  if (fileListIndex_ < fileNames_.size()) {
1514  nextFile = fileNames_[fileListIndex_];
1515  if (nextFile.find("file://") == 0)
1516  nextFile = nextFile.substr(7);
1517  else if (nextFile.find("file:") == 0)
1518  nextFile = nextFile.substr(5);
1519  boost::filesystem::path fileName = nextFile;
1520  std::string fileStem = fileName.stem().string();
1521  if (fileStem.find("ls"))
1522  fileStem = fileStem.substr(fileStem.find("ls") + 2);
1523  if (fileStem.find("_"))
1524  fileStem = fileStem.substr(0, fileStem.find("_"));
1525 
1526  if (!fileListLoopMode_)
1527  ls = boost::lexical_cast<unsigned int>(fileStem);
1528  else //always starting from LS 1 in loop mode
1529  ls = 1 + loopModeIterationInc_;
1530 
1531  //fsize = 0;
1532  //lockWaitTime = 0;
1533  fileListIndex_++;
1535  } else {
1536  if (!fileListLoopMode_)
1538  else {
1539  //loop through files until interrupted
1541  fileListIndex_ = 0;
1542  return getFile(ls, nextFile, fsize, lockWaitTime);
1543  }
1544  }
1545 }

References fileListIndex_, fileListLoopMode_, MillePedeFileConverter_cfg::fileName, fileNames_, loopModeIterationInc_, eostools::ls(), evf::EvFDaqDirector::newFile, castor_dqm_sourceclient_file_cfg::path, evf::EvFDaqDirector::runEnded, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by readSupervisor().

◆ getNextEvent()

evf::EvFDaqDirector::FileStatus FedRawDataInputSource::getNextEvent ( )
inlineprivate

Definition at line 341 of file FedRawDataInputSource.cc.

341  {
342  if (setExceptionState_)
343  threadError();
344  if (!currentFile_.get()) {
346  if (fms_)
348  if (!fileQueue_.try_pop(currentFile_)) {
349  //sleep until wakeup (only in single-buffer mode) or timeout
350  std::unique_lock<std::mutex> lkw(mWakeup_);
351  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout || !currentFile_.get())
353  }
354  status = currentFile_->status_;
356  if (fms_)
358  currentFile_.release();
359  return status;
360  } else if (status == evf::EvFDaqDirector::runAbort) {
361  throw cms::Exception("FedRawDataInputSource::getNextEvent")
362  << "Run has been aborted by the input source reader thread";
363  } else if (status == evf::EvFDaqDirector::newLumi) {
364  if (fms_)
366  if (getLSFromFilename_) {
367  if (currentFile_->lumi_ > currentLumiSection_) {
369  eventsThisLumi_ = 0;
371  }
372  } else { //let this be picked up from next event
374  }
375 
376  currentFile_.release();
377  return status;
378  } else if (status == evf::EvFDaqDirector::newFile) {
380  } else
381  assert(false);
382  }
383  if (fms_)
385 
386  //file is empty
387  if (!currentFile_->fileSize_) {
389  //try to open new lumi
390  assert(currentFile_->nChunks_ == 0);
391  if (getLSFromFilename_)
392  if (currentFile_->lumi_ > currentLumiSection_) {
394  eventsThisLumi_ = 0;
396  }
397  //immediately delete empty file
398  std::string currentName = currentFile_->fileName_;
399  currentFile_.release();
401  }
402 
403  //file is finished
404  if (currentFile_->bufferPosition_ == currentFile_->fileSize_) {
406  //release last chunk (it is never released elsewhere)
407  freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_]);
408  if (currentFile_->nEvents_ >= 0 && currentFile_->nEvents_ != int(currentFile_->nProcessed_)) {
409  throw cms::Exception("FedRawDataInputSource::getNextEvent")
410  << "Fully processed " << currentFile_->nProcessed_ << " from the file " << currentFile_->fileName_
411  << " but according to BU JSON there should be " << currentFile_->nEvents_ << " events";
412  }
413  //try to wake up supervisor thread which might be sleeping waiting for the free chunk
414  if (singleBufferMode_) {
415  std::unique_lock<std::mutex> lkw(mWakeup_);
416  cvWakeup_.notify_one();
417  }
418  bufferInputRead_ = 0;
420  //put the file in pending delete list;
421  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
422  filesToDelete_.push_back(std::pair<int, std::unique_ptr<InputFile>>(currentFileIndex_, std::move(currentFile_)));
423  } else {
424  //in single-thread and stream jobs, events are already processed
425  std::string currentName = currentFile_->fileName_;
426  currentFile_.release();
427  }
429  }
430 
431  //handle RAW file header
432  if (currentFile_->bufferPosition_ == 0 && currentFile_->rawHeaderSize_ > 0) {
433  if (currentFile_->fileSize_ <= currentFile_->rawHeaderSize_) {
434  if (currentFile_->fileSize_ < currentFile_->rawHeaderSize_)
435  throw cms::Exception("FedRawDataInputSource::getNextEvent")
436  << "Premature end of input file while reading file header";
437 
438  edm::LogWarning("FedRawDataInputSource")
439  << "File with only raw header and no events received in LS " << currentFile_->lumi_;
440  if (getLSFromFilename_)
441  if (currentFile_->lumi_ > currentLumiSection_) {
443  eventsThisLumi_ = 0;
445  }
446  }
447 
448  //advance buffer position to skip file header (chunk will be acquired later)
449  currentFile_->chunkPosition_ += currentFile_->rawHeaderSize_;
450  currentFile_->bufferPosition_ += currentFile_->rawHeaderSize_;
451  }
452 
453  //file is too short
454  if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < FRDHeaderVersionSize[detectedFRDversion_]) {
455  throw cms::Exception("FedRawDataInputSource::getNextEvent")
456  << "Premature end of input file while reading event header";
457  }
458  if (singleBufferMode_) {
459  //should already be there
460  if (fms_)
462  while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
463  usleep(10000);
464  if (currentFile_->parent_->exceptionState() || setExceptionState_)
465  currentFile_->parent_->threadError();
466  }
467  if (fms_)
469 
470  unsigned char* dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
471 
472  //conditions when read amount is not sufficient for the header to fit
476 
477  if (detectedFRDversion_ == 0) {
478  detectedFRDversion_ = *((uint32*)dataPosition);
479  if (detectedFRDversion_ > 5)
480  throw cms::Exception("FedRawDataInputSource::getNextEvent")
481  << "Unknown FRD version -: " << detectedFRDversion_;
483  }
484 
485  //recalculate chunk position
486  dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
488  throw cms::Exception("FedRawDataInputSource::getNextEvent")
489  << "Premature end of input file while reading event header";
490  }
491  }
492 
493  event_.reset(new FRDEventMsgView(dataPosition));
494  if (event_->size() > eventChunkSize_) {
495  throw cms::Exception("FedRawDataInputSource::getNextEvent")
496  << " event id:" << event_->event() << " lumi:" << event_->lumi() << " run:" << event_->run()
497  << " of size:" << event_->size() << " bytes does not fit into a chunk of size:" << eventChunkSize_
498  << " bytes";
499  }
500 
501  const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];
502 
503  if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize) {
504  throw cms::Exception("FedRawDataInputSource::getNextEvent")
505  << "Premature end of input file while reading event data";
506  }
507  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
509  //recalculate chunk position
510  dataPosition = currentFile_->chunks_[0]->buf_ + currentFile_->chunkPosition_;
511  event_.reset(new FRDEventMsgView(dataPosition));
512  }
513  currentFile_->bufferPosition_ += event_->size();
514  currentFile_->chunkPosition_ += event_->size();
515  //last chunk is released when this function is invoked next time
516 
517  }
518  //multibuffer mode:
519  else {
520  //wait for the current chunk to become added to the vector
521  if (fms_)
523  while (!currentFile_->waitForChunk(currentFile_->currentChunk_)) {
524  usleep(10000);
525  if (setExceptionState_)
526  threadError();
527  }
528  if (fms_)
530 
531  //check if header is at the boundary of two chunks
532  chunkIsFree_ = false;
533  unsigned char* dataPosition;
534 
535  //read header, copy it to a single chunk if necessary
536  bool chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_]);
537 
538  event_.reset(new FRDEventMsgView(dataPosition));
539  if (event_->size() > eventChunkSize_) {
540  throw cms::Exception("FedRawDataInputSource::getNextEvent")
541  << " event id:" << event_->event() << " lumi:" << event_->lumi() << " run:" << event_->run()
542  << " of size:" << event_->size() << " bytes does not fit into a chunk of size:" << eventChunkSize_
543  << " bytes";
544  }
545 
546  const uint32_t msgSize = event_->size() - FRDHeaderVersionSize[detectedFRDversion_];
547 
548  if (currentFile_->fileSize_ - currentFile_->bufferPosition_ < msgSize) {
549  throw cms::Exception("FedRawDataInputSource::getNextEvent")
550  << "Premature end of input file while reading event data";
551  }
552 
553  if (chunkEnd) {
554  //header was at the chunk boundary, we will have to move payload as well
555  currentFile_->moveToPreviousChunk(msgSize, FRDHeaderVersionSize[detectedFRDversion_]);
556  chunkIsFree_ = true;
557  } else {
558  //header was contiguous, but check if payload fits the chunk
559  if (eventChunkSize_ - currentFile_->chunkPosition_ < msgSize) {
560  //rewind to header start position
562  //copy event to a chunk start and move pointers
563  chunkEnd = currentFile_->advance(dataPosition, FRDHeaderVersionSize[detectedFRDversion_] + msgSize);
564  assert(chunkEnd);
565  chunkIsFree_ = true;
566  //header is moved
567  event_.reset(new FRDEventMsgView(dataPosition));
568  } else {
569  //everything is in a single chunk, only move pointers forward
570  chunkEnd = currentFile_->advance(dataPosition, msgSize);
571  assert(!chunkEnd);
572  chunkIsFree_ = false;
573  }
574  }
575  } //end multibuffer mode
576  if (fms_)
578 
579  if (verifyChecksum_ && event_->version() >= 5) {
580  uint32_t crc = 0;
581  crc = crc32c(crc, (const unsigned char*)event_->payload(), event_->eventSize());
582  if (crc != event_->crc32c()) {
583  if (fms_)
585  throw cms::Exception("FedRawDataInputSource::getNextEvent")
586  << "Found a wrong crc32c checksum: expected 0x" << std::hex << event_->crc32c() << " but calculated 0x"
587  << crc;
588  }
589  } else if (verifyChecksum_ && event_->version() >= 3) {
590  uint32_t adler = adler32(0L, Z_NULL, 0);
591  adler = adler32(adler, (Bytef*)event_->payload(), event_->eventSize());
592 
593  if (adler != event_->adler32()) {
594  if (fms_)
596  throw cms::Exception("FedRawDataInputSource::getNextEvent")
597  << "Found a wrong Adler32 checksum: expected 0x" << std::hex << event_->adler32() << " but calculated 0x"
598  << adler;
599  }
600  }
601  if (fms_)
603 
604  currentFile_->nProcessed_++;
605 
607 }

References cms::cuda::assert(), bufferInputRead_, chunkIsFree_, crc32c(), currentFile_, currentFileIndex_, currentLumiSection_, cvWakeup_, daqDirector_, detectedFRDversion_, event_, eventChunkSize_, eventsThisLumi_, Exception, fileDeleteLock_, fileListMode_, fileQueue_, filesToDelete_, fms_, FRDHeaderVersionSize, freeChunks_, getLSFromFilename_, evf::FastMonitoringThread::inCachedEvent, evf::FastMonitoringThread::inChecksumEvent, evf::FastMonitoringThread::inChunkReceived, evf::FastMonitoringThread::inNewLumi, evf::FastMonitoringThread::inProcessingFile, evf::FastMonitoringThread::inRunEnd, evf::FastMonitoringThread::inWaitChunk, evf::FastMonitoringThread::inWaitInput, evf::EvFDaqDirector::isSingleStreamThread(), dttmaxenums::L, maybeOpenNewLumiSection(), eostools::move(), mWakeup_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, readingFilesCount_, readNextChunkIntoBuffer(), reportEventsThisLumiInSource(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, evf::EvFDaqDirector::sameFile, evf::FastMonitoringService::setExceptionDetected(), setExceptionState_, evf::FastMonitoringService::setInState(), singleBufferMode_, mps_update::status, AlCaHLTBitMon_QueryRunRegistry::string, threadError(), mps_check::timeout, and verifyChecksum_.

Referenced by nextEvent().

◆ initFileList()

long FedRawDataInputSource::initFileList ( )
private

Definition at line 1479 of file FedRawDataInputSource.cc.

1479  {
1480  std::sort(fileNames_.begin(), fileNames_.end(), [](std::string a, std::string b) {
1481  if (a.rfind("/") != std::string::npos)
1482  a = a.substr(a.rfind("/"));
1483  if (b.rfind("/") != std::string::npos)
1484  b = b.substr(b.rfind("/"));
1485  return b > a;
1486  });
1487 
1488  if (!fileNames_.empty()) {
1489  //get run number from first file in the vector
1491  std::string fileStem = fileName.stem().string();
1492  auto end = fileStem.find("_");
1493  if (fileStem.find("run") == 0) {
1494  std::string runStr = fileStem.substr(3, end - 3);
1495  try {
1496  //get long to support test run numbers < 2^32
1497  long rval = boost::lexical_cast<long>(runStr);
1498  edm::LogInfo("FedRawDataInputSource") << "Autodetected run number in fileListMode -: " << rval;
1499  return rval;
1500  } catch (boost::bad_lexical_cast const&) {
1501  edm::LogWarning("FedRawDataInputSource")
1502  << "Unable to autodetect run number in fileListMode from file -: " << fileName;
1503  }
1504  }
1505  }
1506  return -1;
1507 }

References a, b, end, MillePedeFileConverter_cfg::fileName, fileNames_, castor_dqm_sourceclient_file_cfg::path, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by FedRawDataInputSource().

◆ maybeOpenNewLumiSection()

void FedRawDataInputSource::maybeOpenNewLumiSection ( const uint32_t  lumiSection)
private

Definition at line 295 of file FedRawDataInputSource.cc.

295  {
296  if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != lumiSection) {
297  if (!useFileBroker_) {
298  if (currentLumiSection_ > 0) {
300  struct stat buf;
301  bool found = (stat(fuEoLS.c_str(), &buf) == 0);
302  if (!found) {
304  int eol_fd =
305  open(fuEoLS.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
306  close(eol_fd);
307  daqDirector_->createBoLSFile(lumiSection, false);
309  }
310  } else
311  daqDirector_->createBoLSFile(lumiSection, true); //needed for initial lumisection
312  }
313 
314  currentLumiSection_ = lumiSection;
315 
317 
318  timeval tv;
319  gettimeofday(&tv, nullptr);
320  const edm::Timestamp lsopentime((unsigned long long)tv.tv_sec * 1000000 + (unsigned long long)tv.tv_usec);
321 
323  runAuxiliary()->run(), lumiSection, lsopentime, edm::Timestamp::invalidTimestamp());
324 
325  setLuminosityBlockAuxiliary(lumiBlockAuxiliary);
326  luminosityBlockAuxiliary()->setProcessHistoryID(processHistoryID_);
327 
328  edm::LogInfo("FedRawDataInputSource") << "New lumi section was opened. LUMI -: " << lumiSection;
329  }
330 }

References visDQMUpload::buf, evf::EvFDaqDirector::createBoLSFile(), currentLumiSection_, daqDirector_, newFWLiteAna::found, evf::EvFDaqDirector::getEoLSFilePathOnFU(), edm::Timestamp::invalidTimestamp(), evf::EvFDaqDirector::lockFULocal2(), edm::InputSource::luminosityBlock(), edm::InputSource::luminosityBlockAuxiliary(), processHistoryID_, edm::InputSource::resetLuminosityBlockAuxiliary(), edm::InputSource::run(), edm::InputSource::runAuxiliary(), edm::InputSource::setLuminosityBlockAuxiliary(), hgcalPlots::stat, AlCaHLTBitMon_QueryRunRegistry::string, evf::EvFDaqDirector::unlockFULocal2(), and useFileBroker_.

Referenced by checkNext(), and getNextEvent().

◆ nextEvent()

evf::EvFDaqDirector::FileStatus FedRawDataInputSource::nextEvent ( )
inlineprivate

Definition at line 332 of file FedRawDataInputSource.cc.

332  {
335  if (edm::shutdown_flag.load(std::memory_order_relaxed))
336  break;
337  }
338  return status;
339 }

References getNextEvent(), svgfig::load(), evf::EvFDaqDirector::noFile, edm::shutdown_flag, and mps_update::status.

Referenced by checkNext().

◆ read()

void FedRawDataInputSource::read ( edm::EventPrincipal eventPrincipal)
overrideprotectedvirtual

Implements edm::RawInputSource.

Definition at line 609 of file FedRawDataInputSource.cc.

609  {
610  if (fms_)
612  std::unique_ptr<FEDRawDataCollection> rawData(new FEDRawDataCollection);
614 
615  if (useL1EventID_) {
618  aux.setProcessHistoryID(processHistoryID_);
619  makeEvent(eventPrincipal, aux);
620  } else if (tcds_pointer_ == nullptr) {
624  aux.setProcessHistoryID(processHistoryID_);
625  makeEvent(eventPrincipal, aux);
626  } else {
627  const FEDHeader fedHeader(tcds_pointer_);
628  tcds::Raw_v1 const* tcds = reinterpret_cast<tcds::Raw_v1 const*>(tcds_pointer_ + FEDHeader::length);
633  static_cast<edm::EventAuxiliary::ExperimentType>(fedHeader.triggerType()),
634  processGUID(),
636  aux.setProcessHistoryID(processHistoryID_);
637  makeEvent(eventPrincipal, aux);
638  }
639 
640  std::unique_ptr<edm::WrapperBase> edp(new edm::Wrapper<FEDRawDataCollection>(std::move(rawData)));
641 
643 
644  eventsThisLumi_++;
645  if (fms_)
647 
648  //resize vector if needed
649  while (streamFileTracker_.size() <= eventPrincipal.streamID())
650  streamFileTracker_.push_back(-1);
651 
652  streamFileTracker_[eventPrincipal.streamID()] = currentFileIndex_;
653 
654  //this old file check runs no more often than every 10 events
655  if (!((currentFile_->nProcessed_ - 1) % (checkEvery_))) {
656  //delete files that are not in processing
657  std::unique_lock<std::mutex> lkw(fileDeleteLock_);
658  auto it = filesToDelete_.begin();
659  while (it != filesToDelete_.end()) {
660  bool fileIsBeingProcessed = false;
661  for (unsigned int i = 0; i < nStreams_; i++) {
662  if (it->first == streamFileTracker_.at(i)) {
663  fileIsBeingProcessed = true;
664  break;
665  }
666  }
667  if (!fileIsBeingProcessed) {
668  std::string fileToDelete = it->second->fileName_;
669  //it->second.release();
670  it = filesToDelete_.erase(it);
671  } else
672  it++;
673  }
674  }
675  if (chunkIsFree_)
676  freeChunks_.push(currentFile_->chunks_[currentFile_->currentChunk_ - 1]);
677  chunkIsFree_ = false;
678  if (fms_)
680  return;
681 }

References cms::cuda::assert(), printConversionInfo::aux, edm::DaqProvenanceHelper::branchDescription(), checkEvery_, chunkIsFree_, currentFile_, currentFileIndex_, currentLumiSection_, daqProvenanceHelper_, edm::DaqProvenanceHelper::dummyProvenance(), eventID_, eventRunNumber_, eventsThisLumi_, fileDeleteLock_, fileListLoopMode_, filesToDelete_, fillFEDRawDataCollection(), fms_, freeChunks_, GTPEventID_, mps_fire::i, evf::FastMonitoringThread::inNoRequest, evf::FastMonitoringThread::inReadCleanup, evf::FastMonitoringThread::inReadEvent, L1EventID_, FEDHeader::length, edm::RawInputSource::makeEvent(), evf::evtn::makeEventAuxiliary(), eostools::move(), nStreams_, edm::EventAuxiliary::PhysicsTrigger, edm::InputSource::processGUID(), processHistoryID_, edm::EventPrincipal::put(), l1tstage2_dqm_sourceclient-live_cfg::rawData, evf::FastMonitoringService::setInState(), streamFileTracker_, edm::EventPrincipal::streamID(), AlCaHLTBitMon_QueryRunRegistry::string, tcds_pointer_, FEDHeader::triggerType(), and useL1EventID_.

Referenced by edmIntegrityCheck.PublishToFileSystem::get(), readNextChunkIntoBuffer(), and readWorker().

◆ readNextChunkIntoBuffer()

void FedRawDataInputSource::readNextChunkIntoBuffer ( InputFile file)
private

Definition at line 1401 of file FedRawDataInputSource.cc.

1401  {
1402  uint32_t existingSize = 0;
1403 
1404  if (fileDescriptor_ < 0) {
1405  bufferInputRead_ = 0;
1406  if (file->rawFd_ == -1)
1407  fileDescriptor_ = open(file->fileName_.c_str(), O_RDONLY);
1408  else {
1409  fileDescriptor_ = file->rawFd_;
1410  //skip header size in destination buffer (chunk position was already adjusted)
1411  bufferInputRead_ += file->rawHeaderSize_;
1412  existingSize += file->rawHeaderSize_;
1413  }
1414  //off_t pos = lseek(fileDescriptor,0,SEEK_SET);
1415  if (fileDescriptor_ >= 0)
1416  LogDebug("FedRawDataInputSource") << "opened file -: " << std::endl << file->fileName_;
1417  else {
1418  throw cms::Exception("FedRawDataInputSource:readNextChunkIntoBuffer")
1419  << "failed to open file " << std::endl
1420  << file->fileName_ << " fd:" << fileDescriptor_;
1421  }
1422  }
1423 
1424  if (file->chunkPosition_ == 0) { //in the rare case the last byte barely fit
1425  for (unsigned int i = 0; i < readBlocks_; i++) {
1426  const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSize), eventChunkBlock_);
1428  existingSize += last;
1429  }
1430  } else {
1431  const uint32_t chunksize = file->chunkPosition_;
1432  const uint32_t blockcount = chunksize / eventChunkBlock_;
1433  const uint32_t leftsize = chunksize % eventChunkBlock_;
1434  uint32_t existingSizeLeft = eventChunkSize_ - file->chunkPosition_;
1435  memmove((void*)file->chunks_[0]->buf_, file->chunks_[0]->buf_ + file->chunkPosition_, existingSizeLeft);
1436 
1437  for (uint32_t i = 0; i < blockcount; i++) {
1438  const ssize_t last =
1439  ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), eventChunkBlock_);
1441  existingSizeLeft += last;
1442  }
1443  if (leftsize) {
1444  const ssize_t last = ::read(fileDescriptor_, (void*)(file->chunks_[0]->buf_ + existingSizeLeft), leftsize);
1446  }
1447  file->chunkPosition_ = 0; //data was moved to beginning of the chunk
1448  }
1449  if (bufferInputRead_ == file->fileSize_) { // no more data in this file
1450  if (fileDescriptor_ != -1) {
1451  LogDebug("FedRawDataInputSource") << "Closing input file -: " << std::endl << file->fileName_;
1452  close(fileDescriptor_);
1453  file->rawFd_ = fileDescriptor_ = -1;
1454  }
1455  }
1456 }

References bufferInputRead_, eventChunkBlock_, eventChunkSize_, Exception, FrontierConditions_GlobalTag_cff::file, fileDescriptor_, mps_fire::i, dqmdumpme::last, LogDebug, read(), and readBlocks_.

Referenced by getNextEvent().

◆ readSupervisor()

void FedRawDataInputSource::readSupervisor ( )
private

Definition at line 737 of file FedRawDataInputSource.cc.

737  {
738  bool stop = false;
739  unsigned int currentLumiSection = 0;
740  //threadInit_.exchange(true,std::memory_order_acquire);
741 
742  {
743  std::unique_lock<std::mutex> lk(startupLock_);
744  startupCv_.notify_one();
745  }
746 
747  uint32_t ls = 0;
748  uint32_t monLS = 1;
749  uint32_t lockCount = 0;
750  uint64_t sumLockWaitTimeUs = 0.;
751 
752  while (!stop) {
753  //wait for at least one free thread and chunk
754  int counter = 0;
755  while ((workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty() ||
757  //report state to monitoring
758  if (fms_) {
759  bool copy_active = false;
760  for (auto j : tid_active_)
761  if (j)
762  copy_active = true;
765  else if (freeChunks_.empty()) {
766  if (copy_active)
768  else
770  } else {
771  if (copy_active)
773  else
775  }
776  }
777  std::unique_lock<std::mutex> lkw(mWakeup_);
778  //sleep until woken up by condition or a timeout
779  if (cvWakeup_.wait_for(lkw, std::chrono::milliseconds(100)) == std::cv_status::timeout) {
780  counter++;
781  //if (!(counter%50)) edm::LogInfo("FedRawDataInputSource") << "No free chunks or threads...";
782  LogDebug("FedRawDataInputSource") << "No free chunks or threads...";
783  } else {
784  assert(!(workerPool_.empty() && !singleBufferMode_) || freeChunks_.empty());
785  }
786  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
787  stop = true;
788  break;
789  }
790  }
791  //if this is reached, there are enough buffers and threads to proceed or processing is instructed to stop
792 
793  if (stop)
794  break;
795 
796  //look for a new file
797  std::string nextFile;
798  uint32_t fileSizeIndex;
799  int64_t fileSizeFromMetadata;
800 
801  if (fms_) {
805  }
806 
808  uint16_t rawHeaderSize = 0;
809  uint32_t lsFromRaw = 0;
810  int32_t serverEventsInNewFile = -1;
811  int rawFd = -1;
812 
813  int backoff_exp = 0;
814 
815  //entering loop which tries to grab new file from ramdisk
817  if (quit_threads_.load(std::memory_order_relaxed) || edm::shutdown_flag.load(std::memory_order_relaxed)) {
818  stop = true;
819  break;
820  }
821 
822  assert(rawFd == -1);
823  uint64_t thisLockWaitTimeUs = 0.;
824  if (fileListMode_) {
825  //return LS if LS not set, otherwise return file
826  status = getFile(ls, nextFile, fileSizeIndex, thisLockWaitTimeUs);
829  rawFd,
830  rawHeaderSize,
831  lsFromRaw,
832  serverEventsInNewFile,
833  fileSizeFromMetadata,
834  false,
835  false,
836  false) != 0) {
837  //error
838  setExceptionState_ = true;
839  stop = true;
840  break;
841  }
842  if (!getLSFromFilename_)
843  ls = lsFromRaw;
844  }
845  } else if (!useFileBroker_)
846  status = daqDirector_->updateFuLock(ls, nextFile, fileSizeIndex, thisLockWaitTimeUs);
847  else {
848  status = daqDirector_->getNextFromFileBroker(currentLumiSection,
849  ls,
850  nextFile,
851  rawFd,
852  rawHeaderSize,
853  serverEventsInNewFile,
854  fileSizeFromMetadata,
855  thisLockWaitTimeUs);
856  }
857 
858  if (fms_)
860 
861  //cycle through all remaining LS even if no files get assigned
862  if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
864 
865  //monitoring of lock wait time
866  if (thisLockWaitTimeUs > 0.)
867  sumLockWaitTimeUs += thisLockWaitTimeUs;
868  lockCount++;
869  if (ls > monLS) {
870  monLS = ls;
871  if (lockCount)
872  if (fms_)
873  fms_->reportLockWait(monLS, sumLockWaitTimeUs, lockCount);
874  lockCount = 0;
875  sumLockWaitTimeUs = 0;
876  }
877 
878  //check again for any remaining index/EoLS files after EoR file is seen
880  if (fms_)
882  usleep(100000);
883  //now all files should have appeared in ramdisk, check again if any raw files were left behind
884  status = daqDirector_->updateFuLock(ls, nextFile, fileSizeIndex, thisLockWaitTimeUs);
885  if (currentLumiSection != ls && status == evf::EvFDaqDirector::runEnded)
887  }
888 
890  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runEnded));
891  fileQueue_.push(std::move(inf));
892  stop = true;
893  break;
894  }
895 
896  //error from filelocking function
898  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
899  fileQueue_.push(std::move(inf));
900  stop = true;
901  break;
902  }
903  //queue new lumisection
904  if (getLSFromFilename_) {
905  if (ls > currentLumiSection) {
906  if (!useFileBroker_) {
907  //file locking
908  //fms_->setInStateSup(evf::FastMonitoringThread::inSupNewLumi);
909  currentLumiSection = ls;
910  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, currentLumiSection));
911  fileQueue_.push(std::move(inf));
912  } else {
913  //new file service
914  if (currentLumiSection == 0 && !alwaysStartFromFirstLS_) {
915  if (ls < 100) {
916  //look at last LS file on disk to start from that lumisection (only within first 100 LS)
917  unsigned int lsToStart = daqDirector_->getLumisectionToStart();
918 
919  for (unsigned int nextLS = lsToStart; nextLS <= ls; nextLS++) {
920  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
921  fileQueue_.push(std::move(inf));
922  }
923  } else {
924  //start from current LS
925  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, ls));
926  fileQueue_.push(std::move(inf));
927  }
928  } else {
929  //queue all lumisections after last one seen to avoid gaps
930  for (unsigned int nextLS = currentLumiSection + 1; nextLS <= ls; nextLS++) {
931  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::newLumi, nextLS));
932  fileQueue_.push(std::move(inf));
933  }
934  }
935  currentLumiSection = ls;
936  }
937  }
938  //else
939  if (currentLumiSection > 0 && ls < currentLumiSection) {
940  edm::LogError("FedRawDataInputSource")
941  << "Got old LS (" << ls << ") file from EvFDAQDirector! Expected LS:" << currentLumiSection
942  << ". Aborting execution." << std::endl;
943  if (rawFd != -1)
944  close(rawFd);
945  rawFd = -1;
946  std::unique_ptr<InputFile> inf(new InputFile(evf::EvFDaqDirector::runAbort, 0));
947  fileQueue_.push(std::move(inf));
948  stop = true;
949  break;
950  }
951  }
952 
953  int dbgcount = 0;
955  if (fms_)
957  dbgcount++;
958  if (!(dbgcount % 20))
959  LogDebug("FedRawDataInputSource") << "No file for me... sleep and try again...";
960  if (!useFileBroker_)
961  usleep(100000);
962  else {
963  backoff_exp = std::min(4, backoff_exp); // max 1.6 seconds
964  //backoff_exp=0; // disabled!
965  int sleeptime = (int)(100000. * pow(2, backoff_exp));
966  usleep(sleeptime);
967  backoff_exp++;
968  }
969  } else
970  backoff_exp = 0;
971  }
972  //end of file grab loop, parse result
974  if (fms_)
976  LogDebug("FedRawDataInputSource") << "The director says to grab -: " << nextFile;
977 
978  std::string rawFile;
979  //file service will report raw extension
980  if (useFileBroker_)
981  rawFile = nextFile;
982  else {
983  boost::filesystem::path rawFilePath(nextFile);
984  rawFile = rawFilePath.replace_extension(".raw").string();
985  }
986 
987  struct stat st;
988  int stat_res = stat(rawFile.c_str(), &st);
989  if (stat_res == -1) {
990  edm::LogError("FedRawDataInputSource") << "Can not stat file (" << errno << "):-" << rawFile << std::endl;
991  setExceptionState_ = true;
992  break;
993  }
994  uint64_t fileSize = st.st_size;
995 
996  if (fms_) {
1000  }
1001  int eventsInNewFile;
1002  if (fileListMode_) {
1003  if (fileSize == 0)
1004  eventsInNewFile = 0;
1005  else
1006  eventsInNewFile = -1;
1007  } else {
1009  if (!useFileBroker_)
1010  eventsInNewFile = daqDirector_->grabNextJsonFileAndUnlock(nextFile);
1011  else
1012  eventsInNewFile = serverEventsInNewFile;
1013  assert(eventsInNewFile >= 0);
1014  assert((eventsInNewFile > 0) ==
1015  (fileSize > rawHeaderSize)); //file without events must be empty or contain only header
1016  }
1017 
1018  if (!singleBufferMode_) {
1019  //calculate number of needed chunks
1020  unsigned int neededChunks = fileSize / eventChunkSize_;
1021  if (fileSize % eventChunkSize_)
1022  neededChunks++;
1023 
1024  std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1025  ls,
1026  rawFile,
1027  !fileListMode_,
1028  rawFd,
1029  fileSize,
1030  rawHeaderSize,
1031  neededChunks,
1032  eventsInNewFile,
1033  this));
1035  auto newInputFilePtr = newInputFile.get();
1036  fileQueue_.push(std::move(newInputFile));
1037 
1038  for (unsigned int i = 0; i < neededChunks; i++) {
1039  if (fms_) {
1040  bool copy_active = false;
1041  for (auto j : tid_active_)
1042  if (j)
1043  copy_active = true;
1044  if (copy_active)
1046  else
1048  }
1049  //get thread
1050  unsigned int newTid = 0xffffffff;
1051  while (!workerPool_.try_pop(newTid)) {
1052  usleep(100000);
1053  if (quit_threads_.load(std::memory_order_relaxed)) {
1054  stop = true;
1055  break;
1056  }
1057  }
1058 
1059  if (fms_) {
1060  bool copy_active = false;
1061  for (auto j : tid_active_)
1062  if (j)
1063  copy_active = true;
1064  if (copy_active)
1066  else
1068  }
1069  InputChunk* newChunk = nullptr;
1070  while (!freeChunks_.try_pop(newChunk)) {
1071  usleep(100000);
1072  if (quit_threads_.load(std::memory_order_relaxed)) {
1073  stop = true;
1074  break;
1075  }
1076  }
1077 
1078  if (newChunk == nullptr) {
1079  //return unused tid if we received shutdown (nullptr chunk)
1080  if (newTid != 0xffffffff)
1081  workerPool_.push(newTid);
1082  stop = true;
1083  break;
1084  }
1085  if (stop)
1086  break;
1087  if (fms_)
1089 
1090  std::unique_lock<std::mutex> lk(mReader_);
1091 
1092  unsigned int toRead = eventChunkSize_;
1093  if (i == neededChunks - 1 && fileSize % eventChunkSize_)
1094  toRead = fileSize % eventChunkSize_;
1095  newChunk->reset(i * eventChunkSize_, toRead, i);
1096 
1097  workerJob_[newTid].first = newInputFilePtr;
1098  workerJob_[newTid].second = newChunk;
1099 
1100  //wake up the worker thread
1101  cvReader_[newTid]->notify_one();
1102  }
1103  } else {
1104  if (!eventsInNewFile) {
1105  if (rawFd) {
1106  close(rawFd);
1107  rawFd = -1;
1108  }
1109  //still queue file for lumi update
1110  std::unique_lock<std::mutex> lkw(mWakeup_);
1111  //TODO: also file with only file header fits in this edge case. Check if read correctly in single buffer mode
1112  std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1113  ls,
1114  rawFile,
1115  !fileListMode_,
1116  rawFd,
1117  fileSize,
1118  rawHeaderSize,
1119  (rawHeaderSize > 0),
1120  0,
1121  this));
1123  fileQueue_.push(std::move(newInputFile));
1124  cvWakeup_.notify_one();
1125  break;
1126  }
1127  //in single-buffer mode put single chunk in the file and let the main thread read the file
1128  InputChunk* newChunk;
1129  //should be available immediately
1130  while (!freeChunks_.try_pop(newChunk)) {
1131  usleep(100000);
1132  if (quit_threads_.load(std::memory_order_relaxed))
1133  break;
1134  }
1135 
1136  std::unique_lock<std::mutex> lkw(mWakeup_);
1137 
1138  unsigned int toRead = eventChunkSize_;
1139  if (fileSize % eventChunkSize_)
1140  toRead = fileSize % eventChunkSize_;
1141  newChunk->reset(0, toRead, 0);
1142  newChunk->readComplete_ = true;
1143 
1144  //push file and wakeup main thread
1145  std::unique_ptr<InputFile> newInputFile(new InputFile(evf::EvFDaqDirector::FileStatus::newFile,
1146  ls,
1147  rawFile,
1148  !fileListMode_,
1149  rawFd,
1150  fileSize,
1151  rawHeaderSize,
1152  1,
1153  eventsInNewFile,
1154  this));
1155  newInputFile->chunks_[0] = newChunk;
1157  fileQueue_.push(std::move(newInputFile));
1158  cvWakeup_.notify_one();
1159  }
1160  }
1161  }
1162  if (fms_)
1164  //make sure threads finish reading
1165  unsigned numFinishedThreads = 0;
1166  while (numFinishedThreads < workerThreads_.size()) {
1167  unsigned int tid;
1168  while (!workerPool_.try_pop(tid)) {
1169  usleep(10000);
1170  }
1171  std::unique_lock<std::mutex> lk(mReader_);
1172  thread_quit_signal[tid] = true;
1173  cvReader_[tid]->notify_one();
1174  numFinishedThreads++;
1175  }
1176  for (unsigned int i = 0; i < workerThreads_.size(); i++) {
1177  workerThreads_[i]->join();
1178  delete workerThreads_[i];
1179  }
1180 }

References alwaysStartFromFirstLS_, cms::cuda::assert(), cvReader_, cvWakeup_, daqDirector_, relativeConstraints::empty, eventChunkSize_, fileListMode_, fileQueue_, fms_, freeChunks_, getFile(), getLSFromFilename_, evf::EvFDaqDirector::getLumisectionToStart(), evf::EvFDaqDirector::getNextFromFileBroker(), evf::EvFDaqDirector::grabNextJsonFileAndUnlock(), mps_fire::i, dqmiodatasetharvest::inf, InputFile, evf::FastMonitoringThread::inRunEnd, evf::FastMonitoringThread::inSupBusy, evf::FastMonitoringThread::inSupFileLimit, evf::FastMonitoringThread::inSupLockPolling, evf::FastMonitoringThread::inSupNewFile, evf::FastMonitoringThread::inSupNewFileWaitChunk, evf::FastMonitoringThread::inSupNewFileWaitChunkCopying, evf::FastMonitoringThread::inSupNewFileWaitThread, evf::FastMonitoringThread::inSupNewFileWaitThreadCopying, evf::FastMonitoringThread::inSupNoFile, evf::FastMonitoringThread::inSupWaitFreeChunk, evf::FastMonitoringThread::inSupWaitFreeChunkCopying, evf::FastMonitoringThread::inSupWaitFreeThread, evf::FastMonitoringThread::inSupWaitFreeThreadCopying, createfilelist::int, dqmiolumiharvest::j, LogDebug, eostools::ls(), maxBufferedFiles_, min(), eostools::move(), mReader_, mWakeup_, evf::EvFDaqDirector::newFile, evf::EvFDaqDirector::newLumi, evf::EvFDaqDirector::noFile, evf::EvFDaqDirector::parseFRDFileHeader(), castor_dqm_sourceclient_file_cfg::path, funct::pow(), quit_threads_, InputChunk::readComplete_, readingFilesCount_, evf::FastMonitoringService::reportLockWait(), InputChunk::reset(), evf::EvFDaqDirector::runAbort, evf::EvFDaqDirector::runEnded, setExceptionState_, evf::FastMonitoringService::setInStateSup(), edm::shutdown_flag, singleBufferMode_, evf::FastMonitoringService::startedLookingForFile(), startupCv_, startupLock_, hgcalPlots::stat, mps_update::status, evf::FastMonitoringService::stoppedLookingForFile(), AlCaHLTBitMon_QueryRunRegistry::string, thread_quit_signal, tid_active_, mps_check::timeout, evf::EvFDaqDirector::updateFuLock(), useFileBroker_, workerJob_, workerPool_, and workerThreads_.

Referenced by checkNext().

◆ readWorker()

void FedRawDataInputSource::readWorker ( unsigned int  tid)
private

Definition at line 1182 of file FedRawDataInputSource.cc.

1182  {
1183  bool init = true;
1184  threadInit_.exchange(true, std::memory_order_acquire);
1185 
1186  while (true) {
1187  tid_active_[tid] = false;
1188  std::unique_lock<std::mutex> lk(mReader_);
1189  workerJob_[tid].first = nullptr;
1190  workerJob_[tid].first = nullptr;
1191 
1192  assert(!thread_quit_signal[tid]); //should never get it here
1193  workerPool_.push(tid);
1194 
1195  if (init) {
1196  std::unique_lock<std::mutex> lk(startupLock_);
1197  init = false;
1198  startupCv_.notify_one();
1199  }
1200  cvReader_[tid]->wait(lk);
1201 
1202  if (thread_quit_signal[tid])
1203  return;
1204  tid_active_[tid] = true;
1205 
1206  InputFile* file;
1207  InputChunk* chunk;
1208 
1209  assert(workerJob_[tid].first != nullptr && workerJob_[tid].second != nullptr);
1210 
1211  file = workerJob_[tid].first;
1212  chunk = workerJob_[tid].second;
1213 
1214  //skip reading initial header size in first chunk if inheriting file descriptor (already set at appropriate position)
1215  unsigned int bufferLeft = (chunk->offset_ == 0 && file->rawFd_ != 0) ? file->rawHeaderSize_ : 0;
1216 
1217  //if only one worker thread exists, use single fd for all operations
1218  //if more worker threads exist, use rawFd_ for only the first read operation and then close file
1219  int fileDescriptor;
1220  bool fileOpenedHere = false;
1221 
1222  if (numConcurrentReads_ == 1) {
1223  fileDescriptor = file->rawFd_;
1224  if (fileDescriptor == -1) {
1225  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1226  fileOpenedHere = true;
1227  file->rawFd_ = fileDescriptor;
1228  }
1229  } else {
1230  if (chunk->offset_ == 0) {
1231  fileDescriptor = file->rawFd_;
1232  file->rawFd_ = -1;
1233  if (fileDescriptor == -1) {
1234  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1235  fileOpenedHere = true;
1236  }
1237  } else {
1238  fileDescriptor = open(file->fileName_.c_str(), O_RDONLY);
1239  fileOpenedHere = true;
1240  }
1241  }
1242 
1243  if (fileDescriptor < 0) {
1244  edm::LogError("FedRawDataInputSource") << "readWorker failed to open file -: " << file->fileName_
1245  << " fd:" << fileDescriptor << " error: " << strerror(errno);
1246  setExceptionState_ = true;
1247  continue;
1248  }
1249  if (fileOpenedHere) { //fast forward to this chunk position
1250  off_t pos = 0;
1251  pos = lseek(fileDescriptor, chunk->offset_, SEEK_SET);
1252  if (pos == -1) {
1253  edm::LogError("FedRawDataInputSource")
1254  << "readWorker failed to seek file -: " << file->fileName_ << " fd:" << fileDescriptor << " to offset "
1255  << chunk->offset_ << " error: " << strerror(errno);
1256  setExceptionState_ = true;
1257  continue;
1258  }
1259  }
1260 
1261  LogDebug("FedRawDataInputSource") << "Reader thread opened file -: TID: " << tid << " file: " << file->fileName_
1262  << " at offset " << lseek(fileDescriptor, 0, SEEK_CUR);
1263 
1264  unsigned int skipped = bufferLeft;
1266  for (unsigned int i = 0; i < readBlocks_; i++) {
1267  ssize_t last;
1268 
1269  //protect against reading into next block
1270  last = ::read(
1271  fileDescriptor, (void*)(chunk->buf_ + bufferLeft), std::min(chunk->usedSize_ - bufferLeft, eventChunkBlock_));
1272 
1273  if (last < 0) {
1274  edm::LogError("FedRawDataInputSource") << "readWorker failed to read file -: " << file->fileName_
1275  << " fd:" << fileDescriptor << " error: " << strerror(errno);
1276  setExceptionState_ = true;
1277  break;
1278  }
1279  if (last > 0)
1280  bufferLeft += last;
1281  if (last < eventChunkBlock_) { //last read
1282  //check if this is last block, then total read size must match file size
1283  if (!(chunk->usedSize_ - skipped == i * eventChunkBlock_ + last)) {
1284  edm::LogError("FedRawDataInputSource")
1285  << "readWorker failed to read file -: " << file->fileName_ << " fd:" << fileDescriptor << " last:" << last
1286  << " expectedChunkSize:" << chunk->usedSize_
1287  << " readChunkSize:" << (skipped + i * eventChunkBlock_ + last) << " skipped:" << skipped
1288  << " block:" << (i + 1) << "/" << readBlocks_ << " error: " << strerror(errno);
1289  setExceptionState_ = true;
1290  }
1291  break;
1292  }
1293  }
1294  if (setExceptionState_)
1295  continue;
1296 
1298  auto diff = end - start;
1299  std::chrono::milliseconds msec = std::chrono::duration_cast<std::chrono::milliseconds>(diff);
1300  LogDebug("FedRawDataInputSource") << " finished reading block -: " << (bufferLeft >> 20) << " MB"
1301  << " in " << msec.count() << " ms (" << (bufferLeft >> 20) / double(msec.count())
1302  << " GB/s)";
1303 
1304  if (chunk->offset_ + bufferLeft == file->fileSize_) { //file reading finished using same fd
1305  close(fileDescriptor);
1306  fileDescriptor = -1;
1307  if (numConcurrentReads_ == 1)
1308  file->rawFd_ = -1;
1309  }
1310  if (numConcurrentReads_ > 1 && fileDescriptor != -1)
1311  close(fileDescriptor);
1312 
1313  //detect FRD event version. Skip file Header if it exists
1314  if (detectedFRDversion_ == 0 && chunk->offset_ == 0) {
1315  detectedFRDversion_ = *((uint32*)(chunk->buf_ + file->rawHeaderSize_));
1316  }
1318  chunk->readComplete_ =
1319  true; //this is atomic to secure the sequential buffer fill before becoming available for processing)
1320  file->chunks_[chunk->fileIndex_] = chunk; //put the completed chunk in the file chunk vector at predetermined index
1321  }
1322 }

References cms::cuda::assert(), InputChunk::buf_, cvReader_, detectedFRDversion_, change_name::diff, end, eventChunkBlock_, FrontierConditions_GlobalTag_cff::file, InputChunk::fileIndex_, dqmdumpme::first, mps_fire::i, dqmdumpme::last, LogDebug, min(), mReader_, fileCollector::now, numConcurrentReads_, InputChunk::offset_, read(), readBlocks_, InputChunk::readComplete_, edm::second(), setExceptionState_, runEdmFileComparison::skipped, command_line::start, startupCv_, startupLock_, thread_quit_signal, threadInit_, tid_active_, InputChunk::usedSize_, workerJob_, and workerPool_.

Referenced by FedRawDataInputSource().

◆ reportEventsThisLumiInSource()

void FedRawDataInputSource::reportEventsThisLumiInSource ( unsigned int  lumi,
unsigned int  events 
)
private

Definition at line 1458 of file FedRawDataInputSource.cc.

1458  {
1459  std::lock_guard<std::mutex> lock(monlock_);
1460  auto itr = sourceEventsReport_.find(lumi);
1461  if (itr != sourceEventsReport_.end())
1462  itr->second += events;
1463  else
1465 }

References patZpeak::events, CommonMethods::lock(), monlock_, and sourceEventsReport_.

Referenced by checkNext(), and getNextEvent().

◆ rewind_()

void FedRawDataInputSource::rewind_ ( )
overrideprivatevirtual

Reimplemented from edm::InputSource.

Definition at line 735 of file FedRawDataInputSource.cc.

735 {}

◆ threadError()

void FedRawDataInputSource::threadError ( )
private

Definition at line 1324 of file FedRawDataInputSource.cc.

1324  {
1325  quit_threads_ = true;
1326  throw cms::Exception("FedRawDataInputSource:threadError") << " file reader thread error ";
1327 }

References Exception, and quit_threads_.

Referenced by InputFile::advance(), and getNextEvent().

Friends And Related Function Documentation

◆ InputChunk

friend struct InputChunk
friend

Definition at line 38 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

◆ InputFile

friend struct InputFile
friend

Definition at line 37 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

Member Data Documentation

◆ alwaysStartFromFirstLS_

const bool FedRawDataInputSource::alwaysStartFromFirstLS_
private

Definition at line 92 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

◆ bufferInputRead_

uint32_t FedRawDataInputSource::bufferInputRead_ = 0
private

Definition at line 169 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readNextChunkIntoBuffer().

◆ checkEvery_

unsigned int FedRawDataInputSource::checkEvery_ = 10
private

Definition at line 160 of file FedRawDataInputSource.h.

Referenced by read().

◆ chunkIsFree_

bool FedRawDataInputSource::chunkIsFree_ = false
private

Definition at line 132 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

◆ currentFile_

std::unique_ptr<InputFile> FedRawDataInputSource::currentFile_
private

Definition at line 131 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

◆ currentFileIndex_

int FedRawDataInputSource::currentFileIndex_ = -1
private

Definition at line 154 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and read().

◆ currentLumiSection_

unsigned int FedRawDataInputSource::currentLumiSection_
private

Definition at line 114 of file FedRawDataInputSource.h.

Referenced by checkNext(), getNextEvent(), maybeOpenNewLumiSection(), and read().

◆ cvReader_

std::vector<std::condition_variable*> FedRawDataInputSource::cvReader_
private

◆ cvWakeup_

std::condition_variable FedRawDataInputSource::cvWakeup_
private

Definition at line 164 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

◆ daqDirector_

evf::EvFDaqDirector* FedRawDataInputSource::daqDirector_ = nullptr
private

◆ daqProvenanceHelper_

const edm::DaqProvenanceHelper FedRawDataInputSource::daqProvenanceHelper_
private

Definition at line 107 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and read().

◆ defPath_

std::string FedRawDataInputSource::defPath_
private

Definition at line 80 of file FedRawDataInputSource.h.

◆ detectedFRDversion_

uint32 FedRawDataInputSource::detectedFRDversion_ = 0
private

Definition at line 130 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readWorker().

◆ event_

std::unique_ptr<FRDEventMsgView> FedRawDataInputSource::event_
private

Definition at line 109 of file FedRawDataInputSource.h.

Referenced by checkNext(), fillFEDRawDataCollection(), and getNextEvent().

◆ eventChunkBlock_

unsigned int FedRawDataInputSource::eventChunkBlock_
private

◆ eventChunkSize_

unsigned int FedRawDataInputSource::eventChunkSize_
private

◆ eventID_

edm::EventID FedRawDataInputSource::eventID_
private

Definition at line 111 of file FedRawDataInputSource.h.

Referenced by read().

◆ eventRunNumber_

uint32_t FedRawDataInputSource::eventRunNumber_ = 0
private

Definition at line 115 of file FedRawDataInputSource.h.

Referenced by checkNext(), and read().

◆ eventsThisLumi_

unsigned int FedRawDataInputSource::eventsThisLumi_
private

Definition at line 119 of file FedRawDataInputSource.h.

Referenced by checkNext(), getNextEvent(), and read().

◆ eventsThisRun_

unsigned long FedRawDataInputSource::eventsThisRun_ = 0
private

Definition at line 120 of file FedRawDataInputSource.h.

◆ fileDeleteLock_

std::mutex FedRawDataInputSource::fileDeleteLock_
private

Definition at line 157 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and read().

◆ fileDescriptor_

int FedRawDataInputSource::fileDescriptor_ = -1
private

Definition at line 168 of file FedRawDataInputSource.h.

Referenced by readNextChunkIntoBuffer().

◆ fileListIndex_

unsigned int FedRawDataInputSource::fileListIndex_ = 0
private

Definition at line 100 of file FedRawDataInputSource.h.

Referenced by getFile().

◆ fileListLoopMode_

const bool FedRawDataInputSource::fileListLoopMode_
private

Definition at line 101 of file FedRawDataInputSource.h.

Referenced by checkNext(), FedRawDataInputSource(), getFile(), and read().

◆ fileListMode_

const bool FedRawDataInputSource::fileListMode_
private

◆ fileNames_

std::vector<std::string> FedRawDataInputSource::fileNames_
private

Definition at line 95 of file FedRawDataInputSource.h.

Referenced by getFile(), and initFileList().

◆ fileNamesToDelete_

std::list<std::pair<int, std::string> > FedRawDataInputSource::fileNamesToDelete_
private

Definition at line 156 of file FedRawDataInputSource.h.

◆ fileQueue_

tbb::concurrent_queue<std::unique_ptr<InputFile> > FedRawDataInputSource::fileQueue_
private

Definition at line 142 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

◆ filesToDelete_

std::list<std::pair<int, std::unique_ptr<InputFile> > > FedRawDataInputSource::filesToDelete_
private

◆ fms_

evf::FastMonitoringService* FedRawDataInputSource::fms_ = nullptr
private

◆ freeChunks_

tbb::concurrent_queue<InputChunk*> FedRawDataInputSource::freeChunks_
private

Definition at line 141 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), read(), and readSupervisor().

◆ fuOutputDir_

std::string FedRawDataInputSource::fuOutputDir_
private

Definition at line 105 of file FedRawDataInputSource.h.

◆ getLSFromFilename_

const bool FedRawDataInputSource::getLSFromFilename_
private

Definition at line 91 of file FedRawDataInputSource.h.

Referenced by checkNext(), getNextEvent(), and readSupervisor().

◆ GTPEventID_

uint32_t FedRawDataInputSource::GTPEventID_ = 0
private

Definition at line 116 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

◆ L1EventID_

uint32_t FedRawDataInputSource::L1EventID_ = 0
private

Definition at line 117 of file FedRawDataInputSource.h.

Referenced by checkNext(), and read().

◆ loopModeIterationInc_

unsigned int FedRawDataInputSource::loopModeIterationInc_ = 0
private

Definition at line 102 of file FedRawDataInputSource.h.

Referenced by getFile().

◆ maxBufferedFiles_

unsigned int FedRawDataInputSource::maxBufferedFiles_
private

Definition at line 86 of file FedRawDataInputSource.h.

Referenced by readSupervisor().

◆ monlock_

std::mutex FedRawDataInputSource::monlock_
private

Definition at line 174 of file FedRawDataInputSource.h.

Referenced by getEventReport(), and reportEventsThisLumiInSource().

◆ mReader_

std::mutex FedRawDataInputSource::mReader_
private

Definition at line 144 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), readWorker(), and ~FedRawDataInputSource().

◆ mWakeup_

std::mutex FedRawDataInputSource::mWakeup_
private

Definition at line 163 of file FedRawDataInputSource.h.

Referenced by getNextEvent(), and readSupervisor().

◆ nStreams_

unsigned int FedRawDataInputSource::nStreams_ = 0
private

Definition at line 159 of file FedRawDataInputSource.h.

Referenced by read().

◆ numBuffers_

unsigned int FedRawDataInputSource::numBuffers_
private

Definition at line 85 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource().

◆ numConcurrentReads_

unsigned int FedRawDataInputSource::numConcurrentReads_
private

◆ processHistoryID_

edm::ProcessHistoryID FedRawDataInputSource::processHistoryID_
private

Definition at line 112 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), maybeOpenNewLumiSection(), and read().

◆ quit_threads_

std::atomic<bool> FedRawDataInputSource::quit_threads_
private

◆ readBlocks_

unsigned int FedRawDataInputSource::readBlocks_
private

◆ readingFilesCount_

std::atomic<unsigned int> FedRawDataInputSource::readingFilesCount_
private

Definition at line 88 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

◆ readSupervisorThread_

std::unique_ptr<std::thread> FedRawDataInputSource::readSupervisorThread_
private

Definition at line 135 of file FedRawDataInputSource.h.

Referenced by checkNext(), and ~FedRawDataInputSource().

◆ runNumber_

edm::RunNumber_t FedRawDataInputSource::runNumber_
private

Definition at line 104 of file FedRawDataInputSource.h.

Referenced by checkNext(), and FedRawDataInputSource().

◆ setExceptionState_

bool FedRawDataInputSource::setExceptionState_ = false
private

Definition at line 150 of file FedRawDataInputSource.h.

Referenced by exceptionState(), getNextEvent(), readSupervisor(), and readWorker().

◆ singleBufferMode_

bool FedRawDataInputSource::singleBufferMode_
private

Definition at line 167 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), getNextEvent(), and readSupervisor().

◆ sourceEventsReport_

std::map<unsigned int, unsigned int> FedRawDataInputSource::sourceEventsReport_
private

Definition at line 173 of file FedRawDataInputSource.h.

Referenced by getEventReport(), and reportEventsThisLumiInSource().

◆ startedSupervisorThread_

bool FedRawDataInputSource::startedSupervisorThread_ = false
private

Definition at line 134 of file FedRawDataInputSource.h.

Referenced by checkNext(), and ~FedRawDataInputSource().

◆ startupCv_

std::condition_variable FedRawDataInputSource::startupCv_
private

◆ startupLock_

std::mutex FedRawDataInputSource::startupLock_
private

◆ streamFileTracker_

std::vector<int> FedRawDataInputSource::streamFileTracker_
private

Definition at line 158 of file FedRawDataInputSource.h.

Referenced by read().

◆ tcds_pointer_

unsigned char* FedRawDataInputSource::tcds_pointer_
private

Definition at line 118 of file FedRawDataInputSource.h.

Referenced by fillFEDRawDataCollection(), and read().

◆ thread_quit_signal

std::vector<unsigned int> FedRawDataInputSource::thread_quit_signal
private

◆ threadInit_

std::atomic<bool> FedRawDataInputSource::threadInit_
private

Definition at line 171 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), and readWorker().

◆ tid_active_

std::vector<unsigned int> FedRawDataInputSource::tid_active_
private

Definition at line 146 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), readSupervisor(), and readWorker().

◆ useFileBroker_

bool FedRawDataInputSource::useFileBroker_
private

◆ useL1EventID_

const bool FedRawDataInputSource::useL1EventID_
private

Definition at line 94 of file FedRawDataInputSource.h.

Referenced by read().

◆ verifyChecksum_

const bool FedRawDataInputSource::verifyChecksum_
private

Definition at line 93 of file FedRawDataInputSource.h.

Referenced by getNextEvent().

◆ workerJob_

std::vector<ReaderInfo> FedRawDataInputSource::workerJob_
private

Definition at line 139 of file FedRawDataInputSource.h.

Referenced by FedRawDataInputSource(), readSupervisor(), and readWorker().

◆ workerPool_

tbb::concurrent_queue<unsigned int> FedRawDataInputSource::workerPool_
private

Definition at line 138 of file FedRawDataInputSource.h.

Referenced by readSupervisor(), and readWorker().

◆ workerThreads_

std::vector<std::thread*> FedRawDataInputSource::workerThreads_
private
runTheMatrix.ret
ret
prodAgent to be discontinued
Definition: runTheMatrix.py:355
FedRawDataInputSource::fileDeleteLock_
std::mutex fileDeleteLock_
Definition: FedRawDataInputSource.h:157
FedRawDataInputSource::fillFEDRawDataCollection
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection &)
Definition: FedRawDataInputSource.cc:683
change_name.diff
diff
Definition: change_name.py:13
evf::EvFDaqDirector::unlockFULocal2
void unlockFULocal2()
Definition: EvFDaqDirector.cc:893
eostools.ls
def ls(path, rec=False)
Definition: eostools.py:349
edm::RunNumber_t
unsigned int RunNumber_t
Definition: RunLumiEventNumber.h:14
counter
Definition: counter.py:1
FedRawDataInputSource::chunkIsFree_
bool chunkIsFree_
Definition: FedRawDataInputSource.h:132
dttmaxenums::L
Definition: DTTMax.h:29
evf::EvFDaqDirector::createBoLSFile
void createBoLSFile(const uint32_t lumiSection, bool checkIfExists) const
Definition: EvFDaqDirector.cc:898
FedRawDataInputSource::threadInit_
std::atomic< bool > threadInit_
Definition: FedRawDataInputSource.h:171
mps_fire.i
i
Definition: mps_fire.py:355
start
Definition: start.py:1
FEDNumbering::MINTriggerEGTPFEDID
Definition: FEDNumbering.h:63
evf::EvFDaqDirector::runEnded
Definition: EvFDaqDirector.h:64
evf::EvFDaqDirector::getEoLSFilePathOnBU
std::string getEoLSFilePathOnBU(const unsigned int ls) const
Definition: EvFDaqDirector.cc:470
evf::FastMonitoringThread::inSupFileLimit
Definition: FastMonitoringThread.h:54
FedRawDataInputSource::maybeOpenNewLumiSection
void maybeOpenNewLumiSection(const uint32_t lumiSection)
Definition: FedRawDataInputSource.cc:295
FedRawDataInputSource::quit_threads_
std::atomic< bool > quit_threads_
Definition: FedRawDataInputSource.h:148
FedRawDataInputSource::mWakeup_
std::mutex mWakeup_
Definition: FedRawDataInputSource.h:163
FedRawDataInputSource::fileNames_
std::vector< std::string > fileNames_
Definition: FedRawDataInputSource.h:95
FRDEventMsgView
Definition: FRDEventMessage.h:107
FedRawDataInputSource::workerThreads_
std::vector< std::thread * > workerThreads_
Definition: FedRawDataInputSource.h:136
FedRawDataInputSource::fileListIndex_
unsigned int fileListIndex_
Definition: FedRawDataInputSource.h:100
FedRawDataInputSource::threadError
void threadError()
Definition: FedRawDataInputSource.cc:1324
evf::FastMonitoringThread::inSupWaitFreeChunk
Definition: FastMonitoringThread.h:55
FedRawDataInputSource::readingFilesCount_
std::atomic< unsigned int > readingFilesCount_
Definition: FedRawDataInputSource.h:88
mps_update.status
status
Definition: mps_update.py:69
FedRawDataInputSource::useL1EventID_
const bool useL1EventID_
Definition: FedRawDataInputSource.h:94
min
T min(T a, T b)
Definition: MathUtil.h:58
evf::EvFDaqDirector::updateFuLock
FileStatus updateFuLock(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
Definition: EvFDaqDirector.cc:497
FEDRawDataCollection
Definition: FEDRawDataCollection.h:18
evf::EvFDaqDirector::getLumisectionToStart
unsigned int getLumisectionToStart() const
Definition: EvFDaqDirector.cc:1787
FedRawDataInputSource::numConcurrentReads_
unsigned int numConcurrentReads_
Definition: FedRawDataInputSource.h:87
evf::evtn::gtpe_board_sense
bool gtpe_board_sense(const unsigned char *p)
Definition: GlobalEventNumber.cc:11
FedRawDataInputSource::nStreams_
unsigned int nStreams_
Definition: FedRawDataInputSource.h:159
pos
Definition: PixelAliasList.h:18
FedRawDataInputSource::getNextEvent
evf::EvFDaqDirector::FileStatus getNextEvent()
Definition: FedRawDataInputSource.cc:341
FedRawDataInputSource::runNumber_
edm::RunNumber_t runNumber_
Definition: FedRawDataInputSource.h:104
FedRawDataInputSource::currentLumiSection_
unsigned int currentLumiSection_
Definition: FedRawDataInputSource.h:114
edm::LogInfo
Definition: MessageLogger.h:254
edm::ParameterSetDescription
Definition: ParameterSetDescription.h:52
l1tstage2_dqm_sourceclient-live_cfg.rawData
rawData
Definition: l1tstage2_dqm_sourceclient-live_cfg.py:156
evf::FastMonitoringThread::inRunEnd
Definition: FastMonitoringThread.h:41
FedRawDataInputSource::workerPool_
tbb::concurrent_queue< unsigned int > workerPool_
Definition: FedRawDataInputSource.h:138
FedRawDataInputSource::sourceEventsReport_
std::map< unsigned int, unsigned int > sourceEventsReport_
Definition: FedRawDataInputSource.h:173
DeadROCCounter.getRunNumber
def getRunNumber(filename)
Definition: DeadROCCounter.py:7
FedRawDataInputSource::readNextChunkIntoBuffer
void readNextChunkIntoBuffer(InputFile *file)
Definition: FedRawDataInputSource.cc:1401
cms::cuda::assert
assert(be >=bs)
FedRawDataInputSource::fms_
evf::FastMonitoringService * fms_
Definition: FedRawDataInputSource.h:77
edm::second
U second(std::pair< T, U > const &p)
Definition: ParameterSet.cc:215
FRDHeaderVersionSize
const uint32 FRDHeaderVersionSize[6]
Definition: FRDEventMessage.h:104
edm::Timestamp::beginOfTime
static Timestamp beginOfTime()
Definition: Timestamp.h:84
FedRawDataInputSource::cvReader_
std::vector< std::condition_variable * > cvReader_
Definition: FedRawDataInputSource.h:145
FEDNumbering::MINTCDSuTCAFEDID
Definition: FEDNumbering.h:101
FedRawDataInputSource::eventChunkSize_
unsigned int eventChunkSize_
Definition: FedRawDataInputSource.h:82
evf::evtn::get
unsigned int get(const unsigned char *, bool)
Definition: GlobalEventNumber.cc:77
evf::FastMonitoringThread::inCachedEvent
Definition: FastMonitoringThread.h:46
FedRawDataInputSource::cvWakeup_
std::condition_variable cvWakeup_
Definition: FedRawDataInputSource.h:164
evf::EvFDaqDirector::sameFile
Definition: EvFDaqDirector.h:64
evf::EvFDaqDirector::runAbort
Definition: EvFDaqDirector.h:64
FedRawDataInputSource::freeChunks_
tbb::concurrent_queue< InputChunk * > freeChunks_
Definition: FedRawDataInputSource.h:141
FedRawDataInputSource::detectedFRDversion_
uint32 detectedFRDversion_
Definition: FedRawDataInputSource.h:130
MillePedeFileConverter_cfg.fileName
fileName
Definition: MillePedeFileConverter_cfg.py:32
evf::evtn::getgpslow
unsigned int getgpslow(const unsigned char *)
Definition: GlobalEventNumber.cc:92
patZpeak.events
events
Definition: patZpeak.py:20
FedRawDataInputSource::defPath_
std::string defPath_
Definition: FedRawDataInputSource.h:80
newFWLiteAna.found
found
Definition: newFWLiteAna.py:118
dqmdumpme.first
first
Definition: dqmdumpme.py:55
edm::RawInputSource
Definition: RawInputSource.h:17
evf::FastMonitoringThread::inWaitChunk
Definition: FastMonitoringThread.h:43
evf::FastMonitoringService::setInputSource
void setInputSource(FedRawDataInputSource *inputSource)
Definition: FastMonitoringService.h:178
FedRawDataInputSource::bufferInputRead_
uint32_t bufferInputRead_
Definition: FedRawDataInputSource.h:169
edm::InputSource::processHistoryRegistryForUpdate
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:327
FEDRawData::data
const unsigned char * data() const
Return a const pointer to the beginning of the data buffer.
Definition: FEDRawData.cc:24
InputFile
Definition: FedRawDataInputSource.h:201
InputChunk::offset_
unsigned int offset_
Definition: FedRawDataInputSource.h:183
edm::Wrapper
Definition: Product.h:10
FedRawDataInputSource::workerJob_
std::vector< ReaderInfo > workerJob_
Definition: FedRawDataInputSource.h:139
end
#define end
Definition: vmac.h:39
uint32
unsigned int uint32
Definition: MsgTools.h:13
FEDRawData
Definition: FEDRawData.h:19
Utilities.operator
operator
Definition: Utilities.py:24
evf::FastMonitoringThread::inProcessingFile
Definition: FastMonitoringThread.h:42
FedRawDataInputSource::readBlocks_
unsigned int readBlocks_
Definition: FedRawDataInputSource.h:84
evf::FastMonitoringThread::inSupWaitFreeChunkCopying
Definition: FastMonitoringThread.h:56
edm::LuminosityBlockAuxiliary
Definition: LuminosityBlockAuxiliary.h:15
fileCollector.now
now
Definition: fileCollector.py:207
FedRawDataInputSource::streamFileTracker_
std::vector< int > streamFileTracker_
Definition: FedRawDataInputSource.h:158
InputChunk::usedSize_
uint32_t usedSize_
Definition: FedRawDataInputSource.h:181
InputChunk::buf_
unsigned char * buf_
Definition: FedRawDataInputSource.h:178
InputChunk::reset
void reset(unsigned int newOffset, unsigned int toRead, unsigned int fileIndex)
Definition: FedRawDataInputSource.h:191
FedRawDataInputSource::eventRunNumber_
uint32_t eventRunNumber_
Definition: FedRawDataInputSource.h:115
evf::EvFDaqDirector::getEoLSFilePathOnFU
std::string getEoLSFilePathOnFU(const unsigned int ls) const
Definition: EvFDaqDirector.cc:474
dqmdumpme.last
last
Definition: dqmdumpme.py:56
FedRawDataInputSource::InputFile
friend struct InputFile
Definition: FedRawDataInputSource.h:37
hgcalPlots.stat
stat
Definition: hgcalPlots.py:1111
edm::ConfigurationDescriptions::add
void add(std::string const &label, ParameterSetDescription const &psetDescription)
Definition: ConfigurationDescriptions.cc:57
evf::EvFDaqDirector::isSingleStreamThread
bool isSingleStreamThread()
Definition: EvFDaqDirector.h:117
rval
unsigned long long int rval
Definition: vlib.h:21
evf::FastMonitoringThread::inSupWaitFreeThread
Definition: FastMonitoringThread.h:57
evf::FastMonitoringThread::inWaitInput
Definition: FastMonitoringThread.h:37
evf::EvFDaqDirector::parseFRDFileHeader
static int parseFRDFileHeader(std::string const &rawSourcePath, int &rawFd, uint16_t &rawHeaderSize, uint32_t &lsFromHeader, int32_t &eventsFromHeader, int64_t &fileSizeFromHeader, bool requireHeader, bool retry, bool closeFile)
Definition: EvFDaqDirector.cc:926
FedRawDataInputSource::filesToDelete_
std::list< std::pair< int, std::unique_ptr< InputFile > > > filesToDelete_
Definition: FedRawDataInputSource.h:155
FedRawDataInputSource::getLSFromFilename_
const bool getLSFromFilename_
Definition: FedRawDataInputSource.h:91
FedRawDataInputSource::ReaderInfo
std::pair< InputFile *, InputChunk * > ReaderInfo
Definition: FedRawDataInputSource.h:128
InputChunk
Definition: FedRawDataInputSource.h:177
evf::FastMonitoringThread::inReadCleanup
Definition: FastMonitoringThread.h:48
FedRawDataInputSource::fileListLoopMode_
const bool fileListLoopMode_
Definition: FedRawDataInputSource.h:101
edm::InputSource::luminosityBlockAuxiliary
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:242
FedRawDataInputSource::daqDirector_
evf::EvFDaqDirector * daqDirector_
Definition: FedRawDataInputSource.h:78
tcds::Raw_v1
Definition: TCDSRaw.h:106
svgfig.load
def load(fileName)
Definition: svgfig.py:547
evf::evtn::makeEventAuxiliary
edm::EventAuxiliary makeEventAuxiliary(const tcds::Raw_v1 *, unsigned int runNumber, unsigned int lumiSection, const edm::EventAuxiliary::ExperimentType &, const std::string &processGUID, bool verifyLumiSection)
Definition: AuxiliaryMakers.cc:9
evf::EvFDaqDirector::noFile
Definition: EvFDaqDirector.h:64
FedRawDataInputSource::fileQueue_
tbb::concurrent_queue< std::unique_ptr< InputFile > > fileQueue_
Definition: FedRawDataInputSource.h:142
evf::EvFDaqDirector::useFileBroker
bool useFileBroker() const
Definition: EvFDaqDirector.h:79
evf::FastMonitoringThread::inChecksumEvent
Definition: FastMonitoringThread.h:45
InputChunk::readComplete_
std::atomic< bool > readComplete_
Definition: FedRawDataInputSource.h:185
b
double b
Definition: hdecay.h:118
edm::EventPrincipal::put
void put(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance) const
Definition: EventPrincipal.cc:178
FedRawDataInputSource::reportEventsThisLumiInSource
void reportEventsThisLumiInSource(unsigned int lumi, unsigned int events)
Definition: FedRawDataInputSource.cc:1458
evf::FastMonitoringThread::inNewLumi
Definition: FastMonitoringThread.h:38
FedRawDataInputSource::processHistoryID_
edm::ProcessHistoryID processHistoryID_
Definition: FedRawDataInputSource.h:112
evf::FastMonitoringThread::inSupNewFileWaitThreadCopying
Definition: FastMonitoringThread.h:64
edm::EventAuxiliary
Definition: EventAuxiliary.h:14
FedRawDataInputSource::startedSupervisorThread_
bool startedSupervisorThread_
Definition: FedRawDataInputSource.h:134
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
edm::LogWarning
Definition: MessageLogger.h:141
FedRawDataInputSource::daqProvenanceHelper_
const edm::DaqProvenanceHelper daqProvenanceHelper_
Definition: FedRawDataInputSource.h:107
crc32c
uint32_t crc32c(uint32_t crc, const unsigned char *buf, size_t len)
Definition: crc32c.cc:340
edm::InputSource::runAuxiliary
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:239
FedRawDataInputSource::initFileList
long initFileList()
Definition: FedRawDataInputSource.cc:1479
FedRawDataInputSource::L1EventID_
uint32_t L1EventID_
Definition: FedRawDataInputSource.h:117
edm::ParameterSetDescription::addUntracked
ParameterDescriptionBase * addUntracked(U const &iLabel, T const &value)
Definition: ParameterSetDescription.h:100
InputChunk::fileIndex_
unsigned int fileIndex_
Definition: FedRawDataInputSource.h:184
LogDebug
#define LogDebug(id)
Definition: MessageLogger.h:670
edm::InputSource::setRunAuxiliary
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:329
edm::ParameterSetDescription::setComment
void setComment(std::string const &value)
Definition: ParameterSetDescription.cc:33
FedRawDataInputSource::currentFileIndex_
int currentFileIndex_
Definition: FedRawDataInputSource.h:154
edm::LogError
Definition: MessageLogger.h:183
FEDTrailer
Definition: FEDTrailer.h:14
a
double a
Definition: hdecay.h:119
FedRawDataInputSource::alwaysStartFromFirstLS_
const bool alwaysStartFromFirstLS_
Definition: FedRawDataInputSource.h:92
edm::InputSource::productRegistryUpdate
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:326
FedRawDataInputSource::eventChunkBlock_
unsigned int eventChunkBlock_
Definition: FedRawDataInputSource.h:83
CommonMethods.lock
def lock()
Definition: CommonMethods.py:82
edm::DaqProvenanceHelper::daqInit
ProcessHistoryID daqInit(ProductRegistry &productRegistry, ProcessHistoryRegistry &processHistoryRegistry) const
Definition: DaqProvenanceHelper.cc:83
evf::EvFDaqDirector::getNextFromFileBroker
FileStatus getNextFromFileBroker(const unsigned int currentLumiSection, unsigned int &ls, std::string &nextFile, int &rawFd, uint16_t &rawHeaderSize, int32_t &serverEventsInNewFile_, int64_t &fileSize, uint64_t &thisLockWaitTimeUs)
Definition: EvFDaqDirector.cc:1635
FedRawDataInputSource::verifyChecksum_
const bool verifyChecksum_
Definition: FedRawDataInputSource.h:93
mps_check.timeout
int timeout
Definition: mps_check.py:53
evf::EvFDaqDirector::grabNextJsonFileAndUnlock
int grabNextJsonFileAndUnlock(boost::filesystem::path const &jsonSourcePath)
Definition: EvFDaqDirector.cc:1283
edm::shutdown_flag
volatile std::atomic< bool > shutdown_flag
Definition: UnixSignalHandlers.cc:22
evf::FastMonitoringService::setInStateSup
void setInStateSup(FastMonitoringThread::InputState inputState)
Definition: FastMonitoringService.h:180
evf::FastMonitoringThread::inChunkReceived
Definition: FastMonitoringThread.h:44
printConversionInfo.aux
aux
Definition: printConversionInfo.py:19
edm::Service
Definition: Service.h:30
createfilelist.int
int
Definition: createfilelist.py:10
FedRawDataInputSource::currentFile_
std::unique_ptr< InputFile > currentFile_
Definition: FedRawDataInputSource.h:131
evf::FastMonitoringThread::inSupNewFileWaitChunk
Definition: FastMonitoringThread.h:67
FedRawDataInputSource::setExceptionState_
bool setExceptionState_
Definition: FedRawDataInputSource.h:150
FrontierConditions_GlobalTag_cff.file
file
Definition: FrontierConditions_GlobalTag_cff.py:13
edm::InputSource::luminosityBlock
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:442
FedRawDataInputSource::eventID_
edm::EventID eventID_
Definition: FedRawDataInputSource.h:111
edm::RawInputSource::Next::kStop
evf::FastMonitoringThread::inSupNewFile
Definition: FastMonitoringThread.h:63
FedRawDataInputSource::mReader_
std::mutex mReader_
Definition: FedRawDataInputSource.h:144
FedRawDataInputSource::eventsThisLumi_
unsigned int eventsThisLumi_
Definition: FedRawDataInputSource.h:119
FEDHeader::length
static const uint32_t length
Definition: FEDHeader.h:54
evf::EvFDaqDirector::getEoRFilePathOnFU
std::string getEoRFilePathOnFU() const
Definition: EvFDaqDirector.cc:484
edm::EventPrincipal::streamID
StreamID streamID() const
Definition: EventPrincipal.h:106
edm::ParameterSetDescription::setAllowAnything
void setAllowAnything()
allow any parameter label/value pairs
Definition: ParameterSetDescription.cc:37
l1tstage2_dqm_sourceclient-live_cfg.fedId
fedId
Definition: l1tstage2_dqm_sourceclient-live_cfg.py:82
evf::EvFDaqDirector::newLumi
Definition: EvFDaqDirector.h:64
itr
std::vector< std::pair< float, float > >::iterator itr
Definition: HGCDigitizer.cc:28
evf::FastMonitoringThread::inSupNewFileWaitThread
Definition: FastMonitoringThread.h:65
FedRawDataInputSource::getFile
evf::EvFDaqDirector::FileStatus getFile(unsigned int &ls, std::string &nextFile, uint32_t &fsize, uint64_t &lockWaitTime)
Definition: FedRawDataInputSource.cc:1509
evf::FastMonitoringThread::inSupLockPolling
Definition: FastMonitoringThread.h:60
FedRawDataInputSource::startupLock_
std::mutex startupLock_
Definition: FedRawDataInputSource.h:151
visDQMUpload.buf
buf
Definition: visDQMUpload.py:154
tcds
Definition: TCDSRaw.h:16
evf::FastMonitoringService::reportLockWait
void reportLockWait(unsigned int ls, double waitTime, unsigned int lockCount)
Definition: FastMonitoringService.cc:702
evf::evtn::evm_board_sense
bool evm_board_sense(const unsigned char *p, size_t size)
Definition: GlobalEventNumber.cc:15
FedRawDataInputSource::thread_quit_signal
std::vector< unsigned int > thread_quit_signal
Definition: FedRawDataInputSource.h:149
FedRawDataInputSource::useFileBroker_
bool useFileBroker_
Definition: FedRawDataInputSource.h:96
edm::InputSource::setLuminosityBlockAuxiliary
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:333
evf::FastMonitoringThread::inInit
Definition: FastMonitoringThread.h:36
edm::Timestamp::invalidTimestamp
static Timestamp invalidTimestamp()
Definition: Timestamp.h:82
edm::EventAuxiliary::PhysicsTrigger
Definition: EventAuxiliary.h:20
FedRawDataInputSource::readSupervisor
void readSupervisor()
Definition: FedRawDataInputSource.cc:737
edm::TypeID
Definition: TypeID.h:22
eostools.move
def move(src, dest)
Definition: eostools.py:511
evf::EvFDaqDirector::createProcessingNotificationMaybe
void createProcessingNotificationMaybe() const
Definition: EvFDaqDirector.cc:1942
crc32c_hw_test
bool crc32c_hw_test()
Definition: crc32c.cc:354
dqmiodatasetharvest.inf
inf
Definition: dqmiodatasetharvest.py:38
init
Definition: init.py:1
evf::FastMonitoringThread::inNoRequest
Definition: FastMonitoringThread.h:49
edm::RawInputSource::makeEvent
void makeEvent(EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
Definition: RawInputSource.cc:51
edm::DaqProvenanceHelper::branchDescription
BranchDescription const & branchDescription() const
Definition: DaqProvenanceHelper.h:46
FedRawDataInputSource::event_
std::unique_ptr< FRDEventMsgView > event_
Definition: FedRawDataInputSource.h:109
edm::InputSource::setNewRun
void setNewRun()
Definition: InputSource.h:351
FEDTrailer::length
static const uint32_t length
Definition: FEDTrailer.h:57
FedRawDataInputSource::nextEvent
evf::EvFDaqDirector::FileStatus nextEvent()
Definition: FedRawDataInputSource.cc:332
relativeConstraints.empty
bool empty
Definition: relativeConstraints.py:46
FedRawDataInputSource::tid_active_
std::vector< unsigned int > tid_active_
Definition: FedRawDataInputSource.h:146
Exception
Definition: hltDiff.cc:246
evf::FastMonitoringThread::inSupBusy
Definition: FastMonitoringThread.h:59
FedRawDataInputSource::checkEvery_
unsigned int checkEvery_
Definition: FedRawDataInputSource.h:160
FEDRawData::resize
void resize(size_t newsize)
Definition: FEDRawData.cc:28
evf::EvFDaqDirector::setFMS
void setFMS(evf::FastMonitoringService *fms)
Definition: EvFDaqDirector.h:116
FedRawDataInputSource::fileDescriptor_
int fileDescriptor_
Definition: FedRawDataInputSource.h:168
edm::InputSource::setEventCached
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:358
evf::FastMonitoringThread::inReadEvent
Definition: FastMonitoringThread.h:47
evf::FastMonitoringService::startedLookingForFile
void startedLookingForFile()
Definition: FastMonitoringService.cc:666
evf::FastMonitoringThread::inSupNoFile
Definition: FastMonitoringThread.h:62
evf::FastMonitoringService::setExceptionDetected
void setExceptionDetected(unsigned int ls)
Definition: FastMonitoringService.cc:392
edm::InputSource::resetLuminosityBlockAuxiliary
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:341
FedRawDataInputSource::startupCv_
std::condition_variable startupCv_
Definition: FedRawDataInputSource.h:152
evf::FastMonitoringThread::inSupNewFileWaitChunkCopying
Definition: FastMonitoringThread.h:66
FedRawDataInputSource::tcds_pointer_
unsigned char * tcds_pointer_
Definition: FedRawDataInputSource.h:118
cond::uint64_t
unsigned long long uint64_t
Definition: Time.h:13
funct::pow
Power< A, B >::type pow(const A &a, const B &b)
Definition: Power.h:30
FedRawDataInputSource::fileListMode_
const bool fileListMode_
Definition: FedRawDataInputSource.h:99
cms::Exception
Definition: Exception.h:70
castor_dqm_sourceclient_file_cfg.path
path
Definition: castor_dqm_sourceclient_file_cfg.py:37
FedRawDataInputSource::InputChunk
friend struct InputChunk
Definition: FedRawDataInputSource.h:38
FedRawDataInputSource::readSupervisorThread_
std::unique_ptr< std::thread > readSupervisorThread_
Definition: FedRawDataInputSource.h:135
command_line.start
start
Definition: command_line.py:167
evf::FastMonitoringService::setInState
void setInState(FastMonitoringThread::InputState inputState)
Definition: FastMonitoringService.h:179
edm::InputSource::processGUID
std::string const & processGUID() const
Accessor for global process identifier.
Definition: InputSource.h:204
dqmiolumiharvest.j
j
Definition: dqmiolumiharvest.py:66
runEdmFileComparison.skipped
skipped
Definition: runEdmFileComparison.py:225
edm::RawInputSource::Next::kEvent
ntuplemaker.time
time
Definition: ntuplemaker.py:310
evf::evtn::gtpe_get
unsigned int gtpe_get(const unsigned char *)
Definition: GlobalEventNumber.cc:83
event
Definition: event.py:1
edm::EventID
Definition: EventID.h:31
evf::FastMonitoringService::stoppedLookingForFile
void stoppedLookingForFile(unsigned int lumi)
Definition: FastMonitoringService.cc:674
FEDHeader
Definition: FEDHeader.h:14
evf::FastMonitoringThread::inSupWaitFreeThreadCopying
Definition: FastMonitoringThread.h:58
lumi
Definition: LumiSectionData.h:20
evf::EvFDaqDirector::FileStatus
FileStatus
Definition: EvFDaqDirector.h:64
evf::EvFDaqDirector::lockFULocal2
void lockFULocal2()
Definition: EvFDaqDirector.cc:888
FedRawDataInputSource::monlock_
std::mutex monlock_
Definition: FedRawDataInputSource.h:174
FedRawDataInputSource::read
void read(edm::EventPrincipal &eventPrincipal) override
Definition: FedRawDataInputSource.cc:609
FedRawDataInputSource::numBuffers_
unsigned int numBuffers_
Definition: FedRawDataInputSource.h:85
evf::evtn::getgpshigh
unsigned int getgpshigh(const unsigned char *)
Definition: GlobalEventNumber.cc:95
FedRawDataInputSource::maxBufferedFiles_
unsigned int maxBufferedFiles_
Definition: FedRawDataInputSource.h:86
edm::TimeValue_t
unsigned long long TimeValue_t
Definition: Timestamp.h:28
muonDTDigis_cfi.pset
pset
Definition: muonDTDigis_cfi.py:27
FedRawDataInputSource::singleBufferMode_
bool singleBufferMode_
Definition: FedRawDataInputSource.h:167
FedRawDataInputSource::readWorker
void readWorker(unsigned int tid)
Definition: FedRawDataInputSource.cc:1182
FedRawDataInputSource::loopModeIterationInc_
unsigned int loopModeIterationInc_
Definition: FedRawDataInputSource.h:102
FEDNumbering::MAXFEDID
Definition: FEDNumbering.h:26
evf::EvFDaqDirector::newFile
Definition: EvFDaqDirector.h:64
edm::RunAuxiliary
Definition: RunAuxiliary.h:15
edm::InputSource::run
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:437
FedRawDataInputSource::GTPEventID_
uint32_t GTPEventID_
Definition: FedRawDataInputSource.h:116
evf::EvFDaqDirector::setDeleteTracking
void setDeleteTracking(std::mutex *fileDeleteLock, std::list< std::pair< int, std::unique_ptr< InputFile >>> *filesToDelete)
Definition: EvFDaqDirector.h:168
FEDNumbering::MINTriggerGTPFEDID
Definition: FEDNumbering.h:61
edm::DaqProvenanceHelper::dummyProvenance
ProductProvenance const & dummyProvenance() const
Definition: DaqProvenanceHelper.h:48
edm::Timestamp
Definition: Timestamp.h:30