CMS 3D CMS Logo

List of all members | Classes | Public Member Functions | Static Public Member Functions | Protected Member Functions | Private Member Functions | Private Attributes
dqmservices::DQMStreamerReader Class Reference

#include <DQMStreamerReader.h>

Inheritance diagram for dqmservices::DQMStreamerReader:
edm::streamer::StreamerInputSource edm::RawInputSource edm::InputSource

Classes

struct  OpenFile
 

Public Member Functions

 DQMStreamerReader (edm::ParameterSet const &pset, edm::InputSourceDescription const &desc)
 
bool newHeader ()
 
 ~DQMStreamerReader () override
 
- Public Member Functions inherited from edm::streamer::StreamerInputSource
void deserializeAndMergeWithRegistry (InitMsgView const &initView, bool subsequent=false)
 
void deserializeEvent (EventMsgView const &eventView)
 
void deserializeEventMetaData (EventMsgView const &eventView)
 
std::unique_ptr< SendJobHeaderdeserializeRegistry (InitMsgView const &initView)
 
uint32_t eventMetaDataChecksum (EventMsgView const &eventView) const
 
bool isBufferLZMA (unsigned char const *inputBuffer, unsigned int inputSize)
 
bool isBufferZSTD (unsigned char const *inputBuffer, unsigned int inputSize)
 
uint32_t presentEventMetaDataChecksum () const
 
 StreamerInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
 
void updateEventMetaData ()
 
 ~StreamerInputSource () override
 
- Public Member Functions inherited from edm::RawInputSource
 RawInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
 
 ~RawInputSource () override
 
- Public Member Functions inherited from edm::InputSource
std::shared_ptr< ActivityRegistryactReg () const
 Accessor for Activity Registry. More...
 
std::shared_ptr< BranchIDListHelper const > branchIDListHelper () const
 Accessors for branchIDListHelper. More...
 
std::shared_ptr< BranchIDListHelper > & branchIDListHelper ()
 
void closeFile (FileBlock *, bool cleaningUpAfterException)
 close current file More...
 
void doBeginJob ()
 Called by framework at beginning of job. More...
 
virtual void doBeginLumi (LuminosityBlockPrincipal &lbp, ProcessContext const *)
 Called by framework at beginning of lumi block. More...
 
virtual void doBeginRun (RunPrincipal &rp, ProcessContext const *)
 Called by framework at beginning of run. More...
 
void doEndJob ()
 Called by framework at end of job. More...
 
void fillProcessBlockHelper ()
 Fill the ProcessBlockHelper with info for the current file. More...
 
ProcessingController::ForwardState forwardState () const
 
bool goToEvent (EventID const &eventID)
 
 InputSource (ParameterSet const &, InputSourceDescription const &)
 Constructor. More...
 
 InputSource (InputSource const &)=delete
 
void issueReports (EventID const &eventID, StreamID streamID)
 issue an event report More...
 
LuminosityBlockNumber_t luminosityBlock () const
 Accessor for current luminosity block number. More...
 
std::shared_ptr< LuminosityBlockAuxiliaryluminosityBlockAuxiliary () const
 Called by the framework to merge or insert lumi in principal cache. More...
 
int maxEvents () const
 
int maxLuminosityBlocks () const
 
ModuleDescription const & moduleDescription () const
 Accessor for 'module' description. More...
 
ItemTypeInfo nextItemType ()
 Advances the source to the next item. More...
 
bool nextProcessBlock (ProcessBlockPrincipal &)
 Next process block, return false if there is none, sets the processName in the principal. More...
 
InputSourceoperator= (InputSource const &)=delete
 
std::shared_ptr< ProcessBlockHelper const > processBlockHelper () const
 Accessors for processBlockHelper. More...
 
std::shared_ptr< ProcessBlockHelper > & processBlockHelper ()
 
ProcessConfiguration const & processConfiguration () const
 Accessor for Process Configuration. More...
 
std::string const & processGUID () const
 Accessor for global process identifier. More...
 
ProcessHistoryRegistry const & processHistoryRegistry () const
 Accessors for process history registry. More...
 
ProcessHistoryRegistryprocessHistoryRegistry ()
 
ProcessingMode processingMode () const
 RunsLumisAndEvents (default), RunsAndLumis, or Runs. More...
 
std::shared_ptr< ProductRegistry const > productRegistry () const
 Accessors for product registry. More...
 
bool randomAccess () const
 
void readAndMergeLumi (LuminosityBlockPrincipal &lbp)
 Read next luminosity block (same as a prior lumi) More...
 
void readAndMergeRun (RunPrincipal &rp)
 Read next run (same as a prior run) More...
 
void readEvent (EventPrincipal &ep, StreamContext &)
 Read next event. More...
 
bool readEvent (EventPrincipal &ep, EventID const &, StreamContext &)
 Read a specific event. More...
 
std::shared_ptr< FileBlockreadFile ()
 Read next file. More...
 
void readLuminosityBlock (LuminosityBlockPrincipal &lumiPrincipal, HistoryAppender &historyAppender)
 Read next luminosity block (new lumi) More...
 
std::shared_ptr< LuminosityBlockAuxiliaryreadLuminosityBlockAuxiliary ()
 Read next luminosity block Auxilary. More...
 
void readProcessBlock (ProcessBlockPrincipal &)
 Read next process block. More...
 
void readRun (RunPrincipal &runPrincipal, HistoryAppender &historyAppender)
 Read next run (new run) More...
 
std::shared_ptr< RunAuxiliaryreadRunAuxiliary ()
 Read next run Auxiliary. More...
 
ProcessHistoryID const & reducedProcessHistoryID () const
 
virtual void registerProducts ()
 Register any produced products. More...
 
int remainingEvents () const
 
int remainingLuminosityBlocks () const
 
void repeat ()
 Reset the remaining number of events/lumis to the maximum number. More...
 
std::pair< SharedResourcesAcquirer *, std::recursive_mutex * > resourceSharedWithDelayedReader ()
 Returns nullptr if no resource shared between the Source and a DelayedReader. More...
 
ProcessingController::ReverseState reverseState () const
 
void rewind ()
 Begin again at the first event. More...
 
RunNumber_t run () const
 Accessor for current run number. More...
 
std::shared_ptr< RunAuxiliaryrunAuxiliary () const
 Called by the framework to merge or insert run in principal cache. More...
 
void setLuminosityBlockNumber_t (LuminosityBlockNumber_t lb)
 Set the luminosity block ID. More...
 
void setRunNumber (RunNumber_t r)
 Set the run number. More...
 
void skipEvents (int offset)
 
void switchTo (std::shared_ptr< ProductRegistry > iOther)
 switch to a different ProductRegistry. More...
 
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper () const
 Accessors for thinnedAssociationsHelper. More...
 
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper ()
 
Timestamp const & timestamp () const
 Accessor for the current time, as seen by the input source. More...
 
virtual ~InputSource () noexcept(false)
 Destructor. More...
 

Static Public Member Functions

static void fillDescriptions (edm::ConfigurationDescriptions &descriptions)
 
- Static Public Member Functions inherited from edm::streamer::StreamerInputSource
static void fillDescription (ParameterSetDescription &description)
 
static void mergeIntoRegistry (SendJobHeader const &header, ProductRegistry &, bool subsequent)
 
static unsigned int uncompressBuffer (unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize)
 
static unsigned int uncompressBufferLZMA (unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize, bool hasHeader=true)
 
static unsigned int uncompressBufferZSTD (unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize, bool hasHeader=true)
 
- Static Public Member Functions inherited from edm::RawInputSource
static void fillDescription (ParameterSetDescription &description)
 
- Static Public Member Functions inherited from edm::InputSource
static const std::string & baseType ()
 
static void fillDescription (ParameterSetDescription &desc)
 
static void fillDescriptions (ConfigurationDescriptions &descriptions)
 
static void prevalidate (ConfigurationDescriptions &)
 

Protected Member Functions

Next checkNext () override
 
void genuineCloseFile () override
 
void genuineReadFile () override
 
void skip (int toSkip) override
 
- Protected Member Functions inherited from edm::streamer::StreamerInputSource
void resetAfterEndRun ()
 
- Protected Member Functions inherited from edm::RawInputSource
void makeEvent (EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
 
void setInputFileTransitionsEachEvent ()
 
- Protected Member Functions inherited from edm::InputSource
virtual void beginJob ()
 Begin protected makes it easier to do template programming. More...
 
void decreaseRemainingEventsBy (int iSkipped)
 
bool eventCached () const
 
bool newLumi () const
 
bool newRun () const
 
ProcessHistoryRegistryprocessHistoryRegistryForUpdate ()
 
ProductRegistryproductRegistryUpdate ()
 
void reset () const
 
void resetEventCached ()
 
void resetLuminosityBlockAuxiliary (bool isNewLumi=true) const
 
void resetNewLumi ()
 
void resetNewRun ()
 
void resetRunAuxiliary (bool isNewRun=true) const
 
void setEventCached ()
 Called by the framework to merge or ached() const {return eventCached_;}. More...
 
void setLuminosityBlockAuxiliary (LuminosityBlockAuxiliary *lbp)
 
void setNewLumi ()
 
void setNewRun ()
 
void setRunAuxiliary (RunAuxiliary *rp)
 
void setTimestamp (Timestamp const &theTime)
 To set the current time, as seen by the input source. More...
 
ItemTypeInfo state () const
 

Private Member Functions

bool acceptEvent (const edm::streamer::EventMsgView *)
 
void closeFileImp_ (const std::string &reason)
 
edm::streamer::EventMsgView const * getEventMsg ()
 
edm::streamer::InitMsgView const * getHeaderMsg ()
 
void openFileImp_ (const DQMFileIterator::LumiEntry &entry)
 
bool openNextFileImp_ ()
 
edm::streamer::EventMsgView const * prepareNextEvent ()
 
bool prepareNextFile ()
 
void reset_ () override
 
bool setAcceptAllEvt ()
 
bool setMatchTriggerSel (std::vector< std::string > const &tnames)
 
void setupMetaData (edm::streamer::InitMsgView const &msg, bool subsequent)
 

Private Attributes

bool acceptAllEvt_ = false
 
bool artificialFileBoundary_ = false
 
std::shared_ptr< edm::EventSkipperByIDeventSkipperByID_
 
struct dqmservices::DQMStreamerReader::OpenFile file_
 
DQMFileIterator fiterator_
 
bool const flagDeleteDatFiles_
 
bool const flagEndOfRunKills_
 
bool const flagSkipFirstLumis_
 
std::vector< std::string > const hltSel_
 
bool isFirstFile_ = true
 
bool matchTriggerSel_ = false
 
unsigned int const minEventsPerLs_
 
unsigned int processedEventPerLs_ = 0
 
std::shared_ptr< TriggerSelectortriggerSelector_
 
bool const unitTest_
 

Additional Inherited Members

- Public Types inherited from edm::RawInputSource
enum  Next { Next::kEvent, Next::kFile, Next::kStop }
 
- Public Types inherited from edm::InputSource
enum  ItemPosition : char { ItemPosition::Invalid, ItemPosition::LastItemToBeMerged, ItemPosition::NotLastItemToBeMerged }
 
enum  ItemType : char {
  ItemType::IsInvalid, ItemType::IsStop, ItemType::IsFile, ItemType::IsRun,
  ItemType::IsLumi, ItemType::IsEvent, ItemType::IsRepeat, ItemType::IsSynchronize
}
 
enum  ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents }
 
- Public Attributes inherited from edm::InputSource
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> postEventReadFromSourceSignal_
 
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> preEventReadFromSourceSignal_
 
- Static Protected Member Functions inherited from edm::streamer::StreamerInputSource
static void buildClassCache (SendDescs const &descs)
 
static void declareStreamers (SendDescs const &descs)
 

Detailed Description

Definition at line 17 of file DQMStreamerReader.h.

Constructor & Destructor Documentation

◆ DQMStreamerReader()

dqmservices::DQMStreamerReader::DQMStreamerReader ( edm::ParameterSet const &  pset,
edm::InputSourceDescription const &  desc 
)

Definition at line 23 of file DQMStreamerReader.cc.

References reset_(), and setAcceptAllEvt().

26  minEventsPerLs_(pset.getUntrackedParameter<int>("minEventsPerLumi")),
27  flagSkipFirstLumis_(pset.getUntrackedParameter<bool>("skipFirstLumis")),
28  flagEndOfRunKills_(pset.getUntrackedParameter<bool>("endOfRunKills")),
29  flagDeleteDatFiles_(pset.getUntrackedParameter<bool>("deleteDatFiles")),
30  hltSel_(pset.getUntrackedParameter<std::vector<std::string>>("SelectEvents")),
31  unitTest_(pset.getUntrackedParameter<bool>("unitTest", false)) {
33  reset_();
34  }
unsigned int const minEventsPerLs_
std::vector< std::string > const hltSel_
StreamerInputSource(ParameterSet const &pset, InputSourceDescription const &desc)

◆ ~DQMStreamerReader()

dqmservices::DQMStreamerReader::~DQMStreamerReader ( )
override

Definition at line 36 of file DQMStreamerReader.cc.

36  {
37  // Sometimes(?) the destructor called after service registry was already destructed
38  // and closeFile_ throws away no ServiceRegistry found exception...
39  //
40  // Normally, this file should be closed before this destructor is called.
41  //closeFileImp_("destructor");
42  }

Member Function Documentation

◆ acceptEvent()

bool dqmservices::DQMStreamerReader::acceptEvent ( const edm::streamer::EventMsgView evtmsg)
private

Check the trigger path to accept event

Definition at line 411 of file DQMStreamerReader.cc.

References acceptAllEvt_, edm::streamer::EventMsgView::hltCount(), edm::streamer::EventMsgView::hltTriggerBits(), matchTriggerSel_, and triggerSelector_.

Referenced by prepareNextEvent().

411  {
412  if (acceptAllEvt_)
413  return true;
414  if (!matchTriggerSel_)
415  return false;
416 
417  std::vector<unsigned char> hltTriggerBits_;
418  int hltTriggerCount_ = evtmsg->hltCount();
419  if (hltTriggerCount_ > 0) {
420  hltTriggerBits_.resize(1 + (hltTriggerCount_ - 1) / 4);
421  }
422  evtmsg->hltTriggerBits(&hltTriggerBits_[0]);
423 
424  return (triggerSelector_->wantAll() || triggerSelector_->acceptEvent(&hltTriggerBits_[0], evtmsg->hltCount()));
425  }
std::shared_ptr< TriggerSelector > triggerSelector_
void hltTriggerBits(uint8 *put_here) const

◆ checkNext()

edm::RawInputSource::Next dqmservices::DQMStreamerReader::checkNext ( )
overrideprotectedvirtual

This is the actual code for checking the new event and/or deserializing it.

Implements edm::RawInputSource.

Definition at line 344 of file DQMStreamerReader.cc.

References artificialFileBoundary_, closeFileImp_(), edm::streamer::StreamerInputSource::deserializeEvent(), MillePedeFileConverter_cfg::e, file_, fiterator_, edm::RawInputSource::kEvent, edm::RawInputSource::kFile, edm::RawInputSource::kStop, dqmservices::DQMFileIterator::logFileAction(), or, prepareNextEvent(), processedEventPerLs_, dqmservices::DQMStreamerReader::OpenFile::streamFile_, and AlCaHLTBitMon_QueryRunRegistry::string.

344  {
345  try {
346  EventMsgView const* eview = prepareNextEvent();
347  if (eview == nullptr) {
348  if (artificialFileBoundary_ or (file_.streamFile_ and file_.streamFile_->newHeader())) {
349  return Next::kFile;
350  }
351  return Next::kStop;
352  }
353 
354  deserializeEvent(*eview);
355  } catch (const cms::Exception& e) {
356  // try to recover from corrupted files/events
357  fiterator_.logFileAction(std::string("Can't deserialize event or registry data: ") + e.what());
358  closeFileImp_("data file corrupted");
359 
360  // this is not optimal, but hopefully we won't catch this many times in a row
361  return checkNext();
362  }
363 
365 
366  return Next::kEvent;
367  }
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
void deserializeEvent(EventMsgView const &eventView)
void logFileAction(const std::string &msg, const std::string &fileName="") const
edm::streamer::EventMsgView const * prepareNextEvent()
std::unique_ptr< edm::streamer::StreamerInputFile > streamFile_
void closeFileImp_(const std::string &reason)
struct dqmservices::DQMStreamerReader::OpenFile file_

◆ closeFileImp_()

void dqmservices::DQMStreamerReader::closeFileImp_ ( const std::string &  reason)
private

◆ fillDescriptions()

void dqmservices::DQMStreamerReader::fillDescriptions ( edm::ConfigurationDescriptions descriptions)
static

Definition at line 443 of file DQMStreamerReader.cc.

References edm::ConfigurationDescriptions::add(), submitPVResolutionJobs::desc, edm::EventSkipperByID::fillDescription(), edm::streamer::StreamerInputSource::fillDescription(), and dqmservices::DQMFileIterator::fillDescription().

443  {
445  desc.setComment("Reads events from streamer files.");
446 
447  desc.addUntracked<std::vector<std::string>>("SelectEvents")->setComment("HLT path to select events");
448 
449  desc.addUntracked<int>("minEventsPerLumi", 1)
450  ->setComment(
451  "Minimum number of events to process per lumisection, "
452  "before switching to a new input file. If the next file "
453  "does not yet exist, "
454  "the number of processed events will be bigger.");
455 
456  desc.addUntracked<bool>("skipFirstLumis", false)
457  ->setComment(
458  "Skip (and ignore the minEventsPerLumi parameter) for the files "
459  "which have been available at the begining of the processing. "
460  "If set to true, the reader will open last available file for "
461  "processing.");
462 
463  desc.addUntracked<bool>("deleteDatFiles", false)
464  ->setComment(
465  "Delete data files after they have been closed, in order to "
466  "save disk space.");
467 
468  desc.addUntracked<bool>("endOfRunKills", false)
469  ->setComment(
470  "Kill the processing as soon as the end-of-run file appears, even if "
471  "there are/will be unprocessed lumisections.");
472 
473  desc.addUntracked<bool>("unitTest", false)
474  ->setComment("Kill the processing if the input data cannot be deserialized");
475 
476  // desc.addUntracked<unsigned int>("skipEvents", 0U)
477  // ->setComment("Skip the first 'skipEvents' events that otherwise would "
478  // "have been processed.");
479 
480  // This next parameter is read in the base class, but its default value
481  // depends on the derived class, so it is set here.
482  desc.addUntracked<bool>("inputFileTransitionsEachEvent", false);
483 
487 
488  descriptions.add("source", desc);
489  }
static void fillDescription(ParameterSetDescription &description)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
static void fillDescription(ParameterSetDescription &desc)
static void fillDescription(edm::ParameterSetDescription &d)

◆ genuineCloseFile()

void dqmservices::DQMStreamerReader::genuineCloseFile ( )
overrideprotectedvirtual

Reimplemented from edm::RawInputSource.

Definition at line 132 of file DQMStreamerReader.cc.

132 {}

◆ genuineReadFile()

void dqmservices::DQMStreamerReader::genuineReadFile ( )
overrideprotectedvirtual

Reimplemented from edm::RawInputSource.

Definition at line 143 of file DQMStreamerReader.cc.

References artificialFileBoundary_, getHeaderMsg(), RecoTauValidation_cfi::header, isFirstFile_, setupMetaData(), and edm::streamer::StreamerInputSource::updateEventMetaData().

143  {
144  if (isFirstFile_) {
145  //The file was already opened in the constructor
146  isFirstFile_ = false;
147  return;
148  }
149 
152  artificialFileBoundary_ = false;
153  return;
154  }
155  //Get header/init from reader
156  InitMsgView const* header = getHeaderMsg();
157  setupMetaData(*header, true);
158  }
edm::streamer::InitMsgView const * getHeaderMsg()
void setupMetaData(edm::streamer::InitMsgView const &msg, bool subsequent)

◆ getEventMsg()

EventMsgView const * dqmservices::DQMStreamerReader::getEventMsg ( )
private

Definition at line 203 of file DQMStreamerReader.cc.

References file_, edm::RawInputSource::kFile, edm::RawInputSource::kStop, mps_check::msg, GetRecoTauVFromDQM_MC_cff::next, and dqmservices::DQMStreamerReader::OpenFile::streamFile_.

Referenced by prepareNextEvent(), and setupMetaData().

203  {
204  auto next = file_.streamFile_->next();
206  return nullptr;
207  }
208 
210  return nullptr;
211  }
212 
213  EventMsgView const* msg = file_.streamFile_->currentRecord();
214 
215  // if (msg != nullptr) dumpEventView(msg);
216  return msg;
217  }
std::unique_ptr< edm::streamer::StreamerInputFile > streamFile_
tuple msg
Definition: mps_check.py:286
struct dqmservices::DQMStreamerReader::OpenFile file_

◆ getHeaderMsg()

InitMsgView const * dqmservices::DQMStreamerReader::getHeaderMsg ( )
private

Definition at line 192 of file DQMStreamerReader.cc.

References Exception, file_, edm::errors::FileReadError, RecoTauValidation_cfi::header, edm::streamer::Header::INIT, and dqmservices::DQMStreamerReader::OpenFile::streamFile_.

Referenced by genuineReadFile(), and openFileImp_().

192  {
193  InitMsgView const* header = file_.streamFile_->startMessage();
194 
195  if (header->code() != Header::INIT) { // INIT Msg
196  throw edm::Exception(edm::errors::FileReadError, "DQMStreamerReader::readHeader")
197  << "received wrong message type: expected INIT, got " << header->code() << "\n";
198  }
199 
200  return header;
201  }
std::unique_ptr< edm::streamer::StreamerInputFile > streamFile_
struct dqmservices::DQMStreamerReader::OpenFile file_

◆ newHeader()

bool dqmservices::DQMStreamerReader::newHeader ( )

◆ openFileImp_()

void dqmservices::DQMStreamerReader::openFileImp_ ( const DQMFileIterator::LumiEntry entry)
private

Definition at line 96 of file DQMStreamerReader.cc.

References acceptAllEvt_, mps_splice::entry, file_, flagDeleteDatFiles_, getHeaderMsg(), RecoTauValidation_cfi::header, hltSel_, isFirstFile_, dqmservices::DQMStreamerReader::OpenFile::lumi_, castor_dqm_sourceclient_file_cfg::path, processedEventPerLs_, setMatchTriggerSel(), setupMetaData(), dqmservices::DQMStreamerReader::OpenFile::streamFile_, AlCaHLTBitMon_QueryRunRegistry::string, and triggerSelector_.

Referenced by openNextFileImp_().

96  {
98 
99  std::string path = entry.get_data_path();
100 
101  file_.lumi_ = entry;
102  file_.streamFile_ = std::make_unique<StreamerInputFile>(path);
103 
104  InitMsgView const* header = getHeaderMsg();
105  if (isFirstFile_) {
106  setupMetaData(*header, false);
107  }
108 
109  // dump the list of HLT trigger name from the header
110  // dumpInitHeader(header);
111 
112  // if specific trigger selection is requested, check if the requested triggers match with trigger paths in the header file
113  if (!acceptAllEvt_) {
114  std::vector<std::string> tnames;
115  header->hltTriggerNames(tnames);
116 
117  triggerSelector_.reset(new TriggerSelector(hltSel_, tnames));
118 
119  // check if any trigger path name requested matches with trigger name in the header file
120  setMatchTriggerSel(tnames);
121  }
122 
123  // our initialization
125 
126  if (flagDeleteDatFiles_) {
127  // unlink the file
128  unlink(path.c_str());
129  }
130  }
std::vector< std::string > const hltSel_
edm::streamer::InitMsgView const * getHeaderMsg()
std::shared_ptr< TriggerSelector > triggerSelector_
bool setMatchTriggerSel(std::vector< std::string > const &tnames)
void setupMetaData(edm::streamer::InitMsgView const &msg, bool subsequent)
std::unique_ptr< edm::streamer::StreamerInputFile > streamFile_
struct dqmservices::DQMStreamerReader::OpenFile file_

◆ openNextFileImp_()

bool dqmservices::DQMStreamerReader::openNextFileImp_ ( )
private

Definition at line 160 of file DQMStreamerReader.cc.

References closeFileImp_(), MillePedeFileConverter_cfg::e, Exception, edm::errors::FileReadError, fiterator_, dqmservices::DQMFileIterator::LumiEntry::get_data_path(), dqmservices::DQMFileIterator::logFileAction(), dqmservices::DQMFileIterator::logLumiState(), dqmservices::DQMFileIterator::open(), openFileImp_(), AlCaHLTBitMon_ParallelJobs::p, AlCaHLTBitMon_QueryRunRegistry::string, and unitTest_.

Referenced by prepareNextFile().

160  {
161  closeFileImp_("skipping to another file");
162 
163  DQMFileIterator::LumiEntry currentLumi = fiterator_.open();
164  std::string p = currentLumi.get_data_path();
165 
166  if (std::filesystem::exists(p)) {
167  try {
168  openFileImp_(currentLumi);
169  return true;
170  } catch (const cms::Exception& e) {
171  if (unitTest_) {
172  throw edm::Exception(edm::errors::FileReadError, "DQMStreamerReader::openNextFileInp")
173  << std::string("Can't deserialize registry data (in open file): ") + e.what()
174  << "\n error: data file corrupted";
175  }
176 
177  fiterator_.logFileAction(std::string("Can't deserialize registry data (in open file): ") + e.what(), p);
178  fiterator_.logLumiState(currentLumi, "error: data file corrupted");
179 
180  closeFileImp_("data file corrupted");
181  return false;
182  }
183  } else {
184  /* dat file missing */
185  fiterator_.logFileAction("Data file (specified in json) is missing:", p);
186  fiterator_.logLumiState(currentLumi, "error: data file missing");
187 
188  return false;
189  }
190  }
void logLumiState(const LumiEntry &lumi, const std::string &msg)
void openFileImp_(const DQMFileIterator::LumiEntry &entry)
void logFileAction(const std::string &msg, const std::string &fileName="") const
void closeFileImp_(const std::string &reason)

◆ prepareNextEvent()

EventMsgView const * dqmservices::DQMStreamerReader::prepareNextEvent ( )
private

Waits and reads the event header. If end-of-run nullptr is returned.

Definition at line 286 of file DQMStreamerReader.cc.

References acceptEvent(), artificialFileBoundary_, cms::cuda::assert(), closeFileImp_(), dqmservices::DQMFileIterator::delay(), edm::streamer::StreamerInputSource::deserializeEventMetaData(), edm::streamer::StreamerInputSource::eventMetaDataChecksum(), file_, fiterator_, getEventMsg(), edm::streamer::EventMsgView::isEventMetaData(), GetRecoTauVFromDQM_MC_cff::next, dqmservices::DQMStreamerReader::OpenFile::open(), or, prepareNextFile(), and edm::streamer::StreamerInputSource::presentEventMetaDataChecksum().

Referenced by checkNext(), and skip().

286  {
287  EventMsgView const* eview = nullptr;
289 
290  // wait for the next event
291  for (;;) {
292  // edm::LogAbsolute("DQMStreamerReader")
293  // << "State loop.";
294  bool next = prepareNextFile();
295  if (!next)
296  return nullptr;
297 
298  // sleep
299  if (!file_.open()) {
300  // the reader does not exist
301  fiterator_.delay();
302  } else {
303  // our reader exists, try to read out an event
304  eview = getEventMsg();
305 
306  if (eview == nullptr) {
307  // read unsuccessful
308  // this means end of file, so close the file
309  closeFileImp_("eof");
310  } else {
311  //NOTE: at this point need to see if meta data checksum changed. If it did
312  // we need to issue a 'new File' transition
313  if (eview->isEventMetaData()) {
314  auto lastEventMetaData = presentEventMetaDataChecksum();
315  if (eventMetaDataChecksum(*eview) != lastEventMetaData) {
316  deserializeEventMetaData(*eview);
318  return nullptr;
319  } else {
320  //skipping
321  eview = getEventMsg();
322  assert((eview == nullptr) or (not eview->isEventMetaData()));
323  if (eview == nullptr) {
324  closeFileImp_("eof");
325  continue;
326  }
327  }
328  }
329 
330  if (!acceptEvent(eview)) {
331  continue;
332  } else {
333  return eview;
334  }
335  }
336  }
337  }
338  return eview;
339  }
void deserializeEventMetaData(EventMsgView const &eventView)
bool acceptEvent(const edm::streamer::EventMsgView *)
assert(be >=bs)
edm::streamer::EventMsgView const * getEventMsg()
The Signals That Services Can Subscribe To This is based on ActivityRegistry and is current per Services can connect to the signals distributed by the ActivityRegistry in order to monitor the activity of the application Each possible callback has some defined which we here list in angle e< void, edm::EventID const &, edm::Timestamp const & > We also list in braces which AR_WATCH_USING_METHOD_ is used for those or
Definition: Activities.doc:12
uint32_t eventMetaDataChecksum(EventMsgView const &eventView) const
void closeFileImp_(const std::string &reason)
struct dqmservices::DQMStreamerReader::OpenFile file_

◆ prepareNextFile()

bool dqmservices::DQMStreamerReader::prepareNextFile ( )
private

Prepare (open) the next file for reading. It is used by prepareNextEvent and in the constructor.

Does not block/wait.

Return false if this is end of run and/or no more file are available. However, return of "true" does not imply the file has been openned, but we need to wait until some future file becomes available.

Definition at line 229 of file DQMStreamerReader.cc.

References closeFileImp_(), file_, fiterator_, flagEndOfRunKills_, svgfig::load(), dqmservices::DQMFileIterator::logFileAction(), dqmservices::DQMFileIterator::lumiReady(), minEventsPerLs_, dqmservices::DQMStreamerReader::OpenFile::open(), openNextFileImp_(), processedEventPerLs_, edm::shutdown_flag, dqmservices::DQMFileIterator::state(), and dqmservices::DQMFileIterator::update_state().

Referenced by prepareNextEvent(), and reset_().

229  {
231 
232  for (;;) {
234 
235  if (edm::shutdown_flag.load()) {
236  fiterator_.logFileAction("Shutdown flag was set, shutting down.");
237 
238  closeFileImp_("shutdown flag is set");
239  return false;
240  }
241 
242  // check for end of run file and force quit
243  if (flagEndOfRunKills_ && (fiterator_.state() != State::OPEN)) {
244  closeFileImp_("forced end-of-run");
245  return false;
246  }
247 
248  // check for end of run and quit if everything has been processed.
249  // this clean exit
250  if ((!file_.open()) && (!fiterator_.lumiReady()) && (fiterator_.state() == State::EOR)) {
251  return false;
252  }
253 
254  // if this is end of run and no more files to process
255  // close it
257  (fiterator_.state() == State::EOR)) {
258  closeFileImp_("graceful end-of-run");
259  return false;
260  }
261 
262  // skip to the next file if we have no files openned yet
263  if (!file_.open()) {
264  if (fiterator_.lumiReady()) {
266  // we might need to open once more (if .dat is missing)
267  continue;
268  }
269  }
270 
271  // or if there is a next file and enough eventshas been processed.
274  // we might need to open once more (if .dat is missing)
275  continue;
276  }
277 
278  return true;
279  }
280  }
unsigned int const minEventsPerLs_
volatile std::atomic< bool > shutdown_flag
void logFileAction(const std::string &msg, const std::string &fileName="") const
def load(fileName)
Definition: svgfig.py:547
void closeFileImp_(const std::string &reason)
struct dqmservices::DQMStreamerReader::OpenFile file_

◆ reset_()

void dqmservices::DQMStreamerReader::reset_ ( )
overrideprivatevirtual

Reimplemented from edm::RawInputSource.

Definition at line 44 of file DQMStreamerReader.cc.

References dqmservices::DQMFileIterator::advanceToLumi(), dqmservices::DQMFileIterator::delay(), file_, fiterator_, flagSkipFirstLumis_, MainPageGenerator::l, dqmservices::DQMFileIterator::lastLumiFound(), dqmservices::DQMFileIterator::logFileAction(), GetRecoTauVFromDQM_MC_cff::next, dqmservices::DQMStreamerReader::OpenFile::open(), prepareNextFile(), and dqmservices::DQMFileIterator::update_state().

Referenced by DQMStreamerReader().

44  {
45  // We have to load at least a single header,
46  // so the ProductRegistry gets initialized.
47  //
48  // This must happen here (inside the constructor),
49  // as ProductRegistry gets frozen after we initialize:
50  // https://cmssdt.cern.ch/SDT/lxr/source/FWCore/Framework/src/Schedule.cc#441
51 
52  fiterator_.logFileAction("Waiting for the first lumi in order to initialize.");
53 
55 
56  // Fast-forward to the last open file.
57  if (flagSkipFirstLumis_) {
58  unsigned int l = fiterator_.lastLumiFound();
59  if (l > 1) {
60  fiterator_.advanceToLumi(l, "skipped: fast-forward to the latest lumi");
61  }
62  }
63 
64  for (;;) {
65  bool next = prepareNextFile();
66 
67  // check for end of run
68  if (!next) {
69  fiterator_.logFileAction("End of run reached before DQMStreamerReader was initialised.");
70  return;
71  }
72 
73  // check if we have a file openned
74  if (file_.open()) {
75  // we are now initialised
76  break;
77  }
78 
79  // wait
80  fiterator_.delay();
81  }
82 
83  fiterator_.logFileAction("DQMStreamerReader initialised.");
84  }
void logFileAction(const std::string &msg, const std::string &fileName="") const
void advanceToLumi(unsigned int lumi, std::string reason)
struct dqmservices::DQMStreamerReader::OpenFile file_

◆ setAcceptAllEvt()

bool dqmservices::DQMStreamerReader::setAcceptAllEvt ( )
private

If hlt trigger selection is '*', return a boolean variable to accept all events

Definition at line 373 of file DQMStreamerReader.cc.

References acceptAllEvt_, DummyCfis::c, TriggerAnalyzer::hltPath, and hltSel_.

Referenced by DQMStreamerReader().

373  {
374  acceptAllEvt_ = false;
375  for (auto hltPath : hltSel_) {
376  hltPath.erase(std::remove_if(hltPath.begin(), hltPath.end(), [](unsigned char c) { return std::isspace(c); }),
377  hltPath.end());
378  if (hltPath == "*") {
379  acceptAllEvt_ = true;
380  break;
381  }
382  }
383  return acceptAllEvt_;
384  }
std::vector< std::string > const hltSel_

◆ setMatchTriggerSel()

bool dqmservices::DQMStreamerReader::setMatchTriggerSel ( std::vector< std::string > const &  tnames)
private

Check if hlt selection matches any trigger name taken from the header file

Definition at line 389 of file DQMStreamerReader.cc.

References DummyCfis::c, TriggerAnalyzer::hltPath, hltSel_, oniaPATMuonsWithTrigger_cff::matches, matchTriggerSel_, and edm::regexMatch().

Referenced by openFileImp_().

389  {
390  matchTriggerSel_ = false;
391  for (auto hltPath : hltSel_) {
392  hltPath.erase(std::remove_if(hltPath.begin(), hltPath.end(), [](unsigned char c) { return std::isspace(c); }),
393  hltPath.end());
394  auto const matches = edm::regexMatch(tnames, hltPath);
395  if (not matches.empty()) {
396  matchTriggerSel_ = true;
397  break;
398  }
399  }
400 
401  if (not matchTriggerSel_) {
402  edm::LogWarning("DQMStreamerReader") << "Trigger selection does not match any trigger path!!!";
403  }
404 
405  return matchTriggerSel_;
406  }
std::vector< std::string > const hltSel_
std::vector< std::vector< std::string >::const_iterator > regexMatch(std::vector< std::string > const &strings, std::regex const &regexp)
Definition: RegexMatch.cc:26
Log< level::Warning, false > LogWarning

◆ setupMetaData()

void dqmservices::DQMStreamerReader::setupMetaData ( edm::streamer::InitMsgView const &  msg,
bool  subsequent 
)
private

Definition at line 86 of file DQMStreamerReader.cc.

References cms::cuda::assert(), edm::streamer::StreamerInputSource::deserializeAndMergeWithRegistry(), edm::streamer::StreamerInputSource::deserializeEventMetaData(), getEventMsg(), mps_check::msg, and edm::streamer::StreamerInputSource::updateEventMetaData().

Referenced by genuineReadFile(), and openFileImp_().

86  {
88  auto event = getEventMsg();
89  //file might be empty
90  if (not event)
91  return;
92  assert(event->isEventMetaData());
95  }
void deserializeEventMetaData(EventMsgView const &eventView)
assert(be >=bs)
edm::streamer::EventMsgView const * getEventMsg()
void deserializeAndMergeWithRegistry(InitMsgView const &initView, bool subsequent=false)
tuple msg
Definition: mps_check.py:286
Definition: event.py:1

◆ skip()

void dqmservices::DQMStreamerReader::skip ( int  toSkip)
overrideprotectedvirtual

Reimplemented from edm::InputSource.

Definition at line 427 of file DQMStreamerReader.cc.

References closeFileImp_(), MillePedeFileConverter_cfg::e, fiterator_, mps_fire::i, dqmservices::DQMFileIterator::logFileAction(), prepareNextEvent(), and AlCaHLTBitMon_QueryRunRegistry::string.

427  {
428  try {
429  for (int i = 0; i != toSkip; ++i) {
430  EventMsgView const* evMsg = prepareNextEvent();
431 
432  if (evMsg == nullptr) {
433  return;
434  }
435  }
436  } catch (const cms::Exception& e) {
437  // try to recover from corrupted files/events
438  fiterator_.logFileAction(std::string("Can't deserialize event data: ") + e.what());
439  closeFileImp_("data file corrupted");
440  }
441  }
void logFileAction(const std::string &msg, const std::string &fileName="") const
edm::streamer::EventMsgView const * prepareNextEvent()
void closeFileImp_(const std::string &reason)

Member Data Documentation

◆ acceptAllEvt_

bool dqmservices::DQMStreamerReader::acceptAllEvt_ = false
private

Definition at line 62 of file DQMStreamerReader.h.

Referenced by acceptEvent(), openFileImp_(), and setAcceptAllEvt().

◆ artificialFileBoundary_

bool dqmservices::DQMStreamerReader::artificialFileBoundary_ = false
private

Definition at line 70 of file DQMStreamerReader.h.

Referenced by checkNext(), genuineReadFile(), and prepareNextEvent().

◆ eventSkipperByID_

std::shared_ptr<edm::EventSkipperByID> dqmservices::DQMStreamerReader::eventSkipperByID_
private

Definition at line 79 of file DQMStreamerReader.h.

◆ file_

struct dqmservices::DQMStreamerReader::OpenFile dqmservices::DQMStreamerReader::file_
private

◆ fiterator_

DQMFileIterator dqmservices::DQMStreamerReader::fiterator_
private

◆ flagDeleteDatFiles_

bool const dqmservices::DQMStreamerReader::flagDeleteDatFiles_
private

Definition at line 58 of file DQMStreamerReader.h.

Referenced by openFileImp_().

◆ flagEndOfRunKills_

bool const dqmservices::DQMStreamerReader::flagEndOfRunKills_
private

Definition at line 57 of file DQMStreamerReader.h.

Referenced by prepareNextFile().

◆ flagSkipFirstLumis_

bool const dqmservices::DQMStreamerReader::flagSkipFirstLumis_
private

Definition at line 56 of file DQMStreamerReader.h.

Referenced by reset_().

◆ hltSel_

std::vector<std::string> const dqmservices::DQMStreamerReader::hltSel_
private

Definition at line 59 of file DQMStreamerReader.h.

Referenced by openFileImp_(), setAcceptAllEvt(), and setMatchTriggerSel().

◆ isFirstFile_

bool dqmservices::DQMStreamerReader::isFirstFile_ = true
private

Definition at line 48 of file DQMStreamerReader.h.

Referenced by genuineReadFile(), and openFileImp_().

◆ matchTriggerSel_

bool dqmservices::DQMStreamerReader::matchTriggerSel_ = false
private

Definition at line 65 of file DQMStreamerReader.h.

Referenced by acceptEvent(), and setMatchTriggerSel().

◆ minEventsPerLs_

unsigned int const dqmservices::DQMStreamerReader::minEventsPerLs_
private

Definition at line 55 of file DQMStreamerReader.h.

Referenced by prepareNextFile().

◆ processedEventPerLs_

unsigned int dqmservices::DQMStreamerReader::processedEventPerLs_ = 0
private

Definition at line 53 of file DQMStreamerReader.h.

Referenced by checkNext(), openFileImp_(), and prepareNextFile().

◆ triggerSelector_

std::shared_ptr<TriggerSelector> dqmservices::DQMStreamerReader::triggerSelector_
private

Definition at line 80 of file DQMStreamerReader.h.

Referenced by acceptEvent(), and openFileImp_().

◆ unitTest_

bool const dqmservices::DQMStreamerReader::unitTest_
private

Definition at line 60 of file DQMStreamerReader.h.

Referenced by openNextFileImp_().