CMS 3D CMS Logo

List of all members | Classes | Public Types | Public Member Functions | Static Public Member Functions | Protected Member Functions | Private Member Functions | Private Attributes
dqmservices::DQMStreamerReader Class Reference

#include <DQMStreamerReader.h>

Inheritance diagram for dqmservices::DQMStreamerReader:
edm::StreamerInputSource edm::RawInputSource edm::InputSource

Classes

struct  OpenFile
 

Public Types

typedef std::vector< std::string > Strings
 
- Public Types inherited from edm::RawInputSource
enum  Next { Next::kEvent, Next::kFile, Next::kStop }
 
- Public Types inherited from edm::InputSource
enum  ItemType {
  IsInvalid, IsStop, IsFile, IsRun,
  IsLumi, IsEvent, IsRepeat, IsSynchronize
}
 
enum  ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents }
 

Public Member Functions

 DQMStreamerReader (edm::ParameterSet const &pset, edm::InputSourceDescription const &desc)
 
bool newHeader ()
 
 ~DQMStreamerReader () override
 
- Public Member Functions inherited from edm::StreamerInputSource
void deserializeAndMergeWithRegistry (InitMsgView const &initView, bool subsequent=false)
 
void deserializeEvent (EventMsgView const &eventView)
 
std::unique_ptr< SendJobHeaderdeserializeRegistry (InitMsgView const &initView)
 
bool isBufferLZMA (unsigned char const *inputBuffer, unsigned int inputSize)
 
bool isBufferZSTD (unsigned char const *inputBuffer, unsigned int inputSize)
 
 StreamerInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
 
 ~StreamerInputSource () override
 
- Public Member Functions inherited from edm::RawInputSource
 RawInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
 
 ~RawInputSource () override
 
- Public Member Functions inherited from edm::InputSource
std::shared_ptr< ActivityRegistryactReg () const
 Accessor for Activity Registry. More...
 
std::shared_ptr< BranchIDListHelper const > branchIDListHelper () const
 Accessors for branchIDListHelper. More...
 
std::shared_ptr< BranchIDListHelper > & branchIDListHelper ()
 
void closeFile (FileBlock *, bool cleaningUpAfterException)
 close current file More...
 
void doBeginJob ()
 Called by framework at beginning of job. More...
 
virtual void doBeginLumi (LuminosityBlockPrincipal &lbp, ProcessContext const *)
 Called by framework at beginning of lumi block. More...
 
virtual void doBeginRun (RunPrincipal &rp, ProcessContext const *)
 Called by framework at beginning of run. More...
 
void doEndJob ()
 Called by framework at end of job. More...
 
void fillProcessBlockHelper ()
 Fill the ProcessBlockHelper with info for the current file. More...
 
ProcessingController::ForwardState forwardState () const
 
bool goToEvent (EventID const &eventID)
 
 InputSource (ParameterSet const &, InputSourceDescription const &)
 Constructor. More...
 
 InputSource (InputSource const &)=delete
 
void issueReports (EventID const &eventID, StreamID streamID)
 issue an event report More...
 
LuminosityBlockNumber_t luminosityBlock () const
 Accessor for current luminosity block number. More...
 
std::shared_ptr< LuminosityBlockAuxiliaryluminosityBlockAuxiliary () const
 Called by the framework to merge or insert lumi in principal cache. More...
 
int maxEvents () const
 
int maxLuminosityBlocks () const
 
ModuleDescription const & moduleDescription () const
 Accessor for 'module' description. More...
 
ItemType nextItemType ()
 Advances the source to the next item. More...
 
bool nextProcessBlock (ProcessBlockPrincipal &)
 Next process block, return false if there is none, sets the processName in the principal. More...
 
InputSourceoperator= (InputSource const &)=delete
 
std::shared_ptr< ProcessBlockHelper const > processBlockHelper () const
 Accessors for processBlockHelper. More...
 
std::shared_ptr< ProcessBlockHelper > & processBlockHelper ()
 
ProcessConfiguration const & processConfiguration () const
 Accessor for Process Configuration. More...
 
std::string const & processGUID () const
 Accessor for global process identifier. More...
 
ProcessHistoryRegistry const & processHistoryRegistry () const
 Accessors for process history registry. More...
 
ProcessHistoryRegistryprocessHistoryRegistry ()
 
ProcessingMode processingMode () const
 RunsLumisAndEvents (default), RunsAndLumis, or Runs. More...
 
std::shared_ptr< ProductRegistry const > productRegistry () const
 Accessors for product registry. More...
 
bool randomAccess () const
 
void readAndMergeLumi (LuminosityBlockPrincipal &lbp)
 Read next luminosity block (same as a prior lumi) More...
 
void readAndMergeRun (RunPrincipal &rp)
 Read next run (same as a prior run) More...
 
void readEvent (EventPrincipal &ep, StreamContext &)
 Read next event. More...
 
bool readEvent (EventPrincipal &ep, EventID const &, StreamContext &)
 Read a specific event. More...
 
std::shared_ptr< FileBlockreadFile ()
 Read next file. More...
 
void readLuminosityBlock (LuminosityBlockPrincipal &lumiPrincipal, HistoryAppender &historyAppender)
 Read next luminosity block (new lumi) More...
 
std::shared_ptr< LuminosityBlockAuxiliaryreadLuminosityBlockAuxiliary ()
 Read next luminosity block Auxilary. More...
 
void readProcessBlock (ProcessBlockPrincipal &)
 Read next process block. More...
 
void readRun (RunPrincipal &runPrincipal, HistoryAppender &historyAppender)
 Read next run (new run) More...
 
std::shared_ptr< RunAuxiliaryreadRunAuxiliary ()
 Read next run Auxiliary. More...
 
ProcessHistoryID const & reducedProcessHistoryID () const
 
virtual void registerProducts ()
 Register any produced products. More...
 
int remainingEvents () const
 
int remainingLuminosityBlocks () const
 
void repeat ()
 Reset the remaining number of events/lumis to the maximum number. More...
 
std::pair< SharedResourcesAcquirer *, std::recursive_mutex * > resourceSharedWithDelayedReader ()
 Returns nullptr if no resource shared between the Source and a DelayedReader. More...
 
ProcessingController::ReverseState reverseState () const
 
void rewind ()
 Begin again at the first event. More...
 
RunNumber_t run () const
 Accessor for current run number. More...
 
std::shared_ptr< RunAuxiliaryrunAuxiliary () const
 Called by the framework to merge or insert run in principal cache. More...
 
void setLuminosityBlockNumber_t (LuminosityBlockNumber_t lb)
 Set the luminosity block ID. More...
 
void setRunNumber (RunNumber_t r)
 Set the run number. More...
 
void skipEvents (int offset)
 
void switchTo (std::shared_ptr< ProductRegistry > iOther)
 switch to a different ProductRegistry. More...
 
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper () const
 Accessors for thinnedAssociationsHelper. More...
 
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper ()
 
Timestamp const & timestamp () const
 Accessor for the current time, as seen by the input source. More...
 
virtual ~InputSource () noexcept(false)
 Destructor. More...
 

Static Public Member Functions

static void fillDescriptions (edm::ConfigurationDescriptions &descriptions)
 
- Static Public Member Functions inherited from edm::StreamerInputSource
static void fillDescription (ParameterSetDescription &description)
 
static void mergeIntoRegistry (SendJobHeader const &header, ProductRegistry &, BranchIDListHelper &, ThinnedAssociationsHelper &, bool subsequent)
 
static unsigned int uncompressBuffer (unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize)
 
static unsigned int uncompressBufferLZMA (unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize, bool hasHeader=true)
 
static unsigned int uncompressBufferZSTD (unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize, bool hasHeader=true)
 
- Static Public Member Functions inherited from edm::RawInputSource
static void fillDescription (ParameterSetDescription &description)
 
- Static Public Member Functions inherited from edm::InputSource
static const std::string & baseType ()
 
static void fillDescription (ParameterSetDescription &desc)
 
static void fillDescriptions (ConfigurationDescriptions &descriptions)
 
static void prevalidate (ConfigurationDescriptions &)
 

Protected Member Functions

Next checkNext () override
 
void genuineCloseFile () override
 
void genuineReadFile () override
 
void skip (int toSkip) override
 
- Protected Member Functions inherited from edm::StreamerInputSource
void resetAfterEndRun ()
 
- Protected Member Functions inherited from edm::RawInputSource
void makeEvent (EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
 
void setInputFileTransitionsEachEvent ()
 
- Protected Member Functions inherited from edm::InputSource
virtual void beginJob ()
 Begin protected makes it easier to do template programming. More...
 
void decreaseRemainingEventsBy (int iSkipped)
 
bool eventCached () const
 
bool newLumi () const
 
bool newRun () const
 
ProcessHistoryRegistryprocessHistoryRegistryForUpdate ()
 
ProductRegistryproductRegistryUpdate ()
 
void reset () const
 
void resetEventCached ()
 
void resetLuminosityBlockAuxiliary (bool isNewLumi=true) const
 
void resetNewLumi ()
 
void resetNewRun ()
 
void resetRunAuxiliary (bool isNewRun=true) const
 
void setEventCached ()
 Called by the framework to merge or ached() const {return eventCached_;}. More...
 
void setLuminosityBlockAuxiliary (LuminosityBlockAuxiliary *lbp)
 
void setNewLumi ()
 
void setNewRun ()
 
void setRunAuxiliary (RunAuxiliary *rp)
 
void setTimestamp (Timestamp const &theTime)
 To set the current time, as seen by the input source. More...
 
ItemType state () const
 

Private Member Functions

bool acceptEvent (const EventMsgView *)
 
void closeFileImp_ (const std::string &reason)
 
EventMsgView const * getEventMsg ()
 
InitMsgView const * getHeaderMsg ()
 
bool matchTriggerSel (Strings const &tnames)
 
void openFileImp_ (const DQMFileIterator::LumiEntry &entry)
 
bool openNextFileImp_ ()
 
EventMsgView const * prepareNextEvent ()
 
bool prepareNextFile ()
 
void reset_ () override
 
bool triggerSel ()
 

Private Attributes

bool acceptAllEvt_
 
std::shared_ptr< TriggerSelectoreventSelector_
 
std::shared_ptr< edm::EventSkipperByIDeventSkipperByID_
 
struct dqmservices::DQMStreamerReader::OpenFile file_
 
DQMFileIterator fiterator_
 
bool flagDeleteDatFiles_
 
bool flagEndOfRunKills_
 
bool flagSkipFirstLumis_
 
Strings hltSel_
 
bool isFirstFile_ = true
 
bool matchTriggerSel_
 
unsigned int minEventsPerLs_
 
edm::Service< DQMMonitoringServicemon_
 
unsigned int processedEventPerLs_
 
std::string runInputDir_
 
unsigned int runNumber_
 
std::string streamLabel_
 

Additional Inherited Members

- Public Attributes inherited from edm::InputSource
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> postEventReadFromSourceSignal_
 
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> preEventReadFromSourceSignal_
 
- Static Protected Member Functions inherited from edm::StreamerInputSource
static void buildClassCache (SendDescs const &descs)
 
static void declareStreamers (SendDescs const &descs)
 

Detailed Description

Definition at line 24 of file DQMStreamerReader.h.

Member Typedef Documentation

◆ Strings

typedef std::vector<std::string> dqmservices::DQMStreamerReader::Strings

Definition at line 32 of file DQMStreamerReader.h.

Constructor & Destructor Documentation

◆ DQMStreamerReader()

dqmservices::DQMStreamerReader::DQMStreamerReader ( edm::ParameterSet const &  pset,
edm::InputSourceDescription const &  desc 
)

Definition at line 25 of file DQMStreamerReader.cc.

References flagDeleteDatFiles_, flagEndOfRunKills_, flagSkipFirstLumis_, hltSel_, minEventsPerLs_, muonDTDigis_cfi::pset, reset_(), runInputDir_, runNumber_, AlCaHLTBitMon_QueryRunRegistry::string, and triggerSel().

27  runNumber_ = pset.getUntrackedParameter<unsigned int>("runNumber");
28  runInputDir_ = pset.getUntrackedParameter<std::string>("runInputDir");
29  hltSel_ = pset.getUntrackedParameter<std::vector<std::string> >("SelectEvents");
30 
31  minEventsPerLs_ = pset.getUntrackedParameter<int>("minEventsPerLumi");
32  flagSkipFirstLumis_ = pset.getUntrackedParameter<bool>("skipFirstLumis");
33  flagEndOfRunKills_ = pset.getUntrackedParameter<bool>("endOfRunKills");
34  flagDeleteDatFiles_ = pset.getUntrackedParameter<bool>("deleteDatFiles");
35 
36  triggerSel();
37 
38  reset_();
39  }
StreamerInputSource(ParameterSet const &pset, InputSourceDescription const &desc)

◆ ~DQMStreamerReader()

dqmservices::DQMStreamerReader::~DQMStreamerReader ( )
override

Definition at line 41 of file DQMStreamerReader.cc.

41  {
42  // Sometimes(?) the destructor called after service registry was already destructed
43  // and closeFile_ throws away no ServiceRegistry found exception...
44  //
45  // Normally, this file should be closed before this destructor is called.
46  //closeFileImp_("destructor");
47  }

Member Function Documentation

◆ acceptEvent()

bool dqmservices::DQMStreamerReader::acceptEvent ( const EventMsgView evtmsg)
private

Check the trigger path to accept event

Definition at line 383 of file DQMStreamerReader.cc.

References acceptAllEvt_, eventSelector_, EventMsgView::hltCount(), EventMsgView::hltTriggerBits(), and matchTriggerSel_.

Referenced by prepareNextEvent().

383  {
384  if (acceptAllEvt_)
385  return true;
386  if (!matchTriggerSel_)
387  return false;
388 
389  std::vector<unsigned char> hltTriggerBits_;
390  int hltTriggerCount_ = evtmsg->hltCount();
391  if (hltTriggerCount_ > 0) {
392  hltTriggerBits_.resize(1 + (hltTriggerCount_ - 1) / 4);
393  }
394  evtmsg->hltTriggerBits(&hltTriggerBits_[0]);
395 
396  if (eventSelector_->wantAll() || eventSelector_->acceptEvent(&hltTriggerBits_[0], evtmsg->hltCount())) {
397  return true;
398  } else {
399  return false;
400  }
401  }
void hltTriggerBits(uint8 *put_here) const
std::shared_ptr< TriggerSelector > eventSelector_
uint32 hltCount() const
Definition: EventMessage.h:94

◆ checkNext()

edm::RawInputSource::Next dqmservices::DQMStreamerReader::checkNext ( )
overrideprotectedvirtual

This is the actual code for checking the new event and/or deserializing it.

Implements edm::RawInputSource.

Definition at line 313 of file DQMStreamerReader.cc.

References closeFileImp_(), edm::StreamerInputSource::deserializeEvent(), MillePedeFileConverter_cfg::e, file_, fiterator_, edm::RawInputSource::kEvent, edm::RawInputSource::kFile, edm::RawInputSource::kStop, dqmservices::DQMFileIterator::logFileAction(), prepareNextEvent(), processedEventPerLs_, dqmservices::DQMStreamerReader::OpenFile::streamFile_, and AlCaHLTBitMon_QueryRunRegistry::string.

313  {
314  try {
315  EventMsgView const* eview = prepareNextEvent();
316  if (eview == nullptr) {
317  if (file_.streamFile_ and file_.streamFile_->newHeader()) {
318  return Next::kFile;
319  }
320  return Next::kStop;
321  }
322 
323  deserializeEvent(*eview);
324  } catch (const cms::Exception& e) {
325  // try to recover from corrupted files/events
326  fiterator_.logFileAction(std::string("Can't deserialize event or registry data: ") + e.what());
327  closeFileImp_("data file corrupted");
328 
329  // this is not optimal, but hopefully we won't catch this many times in a row
330  return checkNext();
331  }
332 
334 
335  return Next::kEvent;
336  }
std::unique_ptr< edm::StreamerInputFile > streamFile_
void deserializeEvent(EventMsgView const &eventView)
void logFileAction(const std::string &msg, const std::string &fileName="") const
EventMsgView const * prepareNextEvent()
void closeFileImp_(const std::string &reason)
struct dqmservices::DQMStreamerReader::OpenFile file_

◆ closeFileImp_()

void dqmservices::DQMStreamerReader::closeFileImp_ ( const std::string &  reason)
private

◆ fillDescriptions()

void dqmservices::DQMStreamerReader::fillDescriptions ( edm::ConfigurationDescriptions descriptions)
static

Definition at line 419 of file DQMStreamerReader.cc.

References edm::ConfigurationDescriptions::add(), submitPVResolutionJobs::desc, edm::EventSkipperByID::fillDescription(), edm::StreamerInputSource::fillDescription(), and dqmservices::DQMFileIterator::fillDescription().

419  {
421  desc.setComment("Reads events from streamer files.");
422 
423  desc.addUntracked<std::vector<std::string> >("SelectEvents")->setComment("HLT path to select events ");
424 
425  desc.addUntracked<int>("minEventsPerLumi", 1)
426  ->setComment(
427  "Minimum number of events to process per lumisection, "
428  "before switching to a new input file. If the next file "
429  "does not yet exist, "
430  "the number of processed events will be bigger.");
431 
432  desc.addUntracked<bool>("skipFirstLumis", false)
433  ->setComment(
434  "Skip (and ignore the minEventsPerLumi parameter) for the files "
435  "which have been available at the begining of the processing. "
436  "If set to true, the reader will open last available file for "
437  "processing.");
438 
439  desc.addUntracked<bool>("deleteDatFiles", false)
440  ->setComment(
441  "Delete data files after they have been closed, in order to "
442  "save disk space.");
443 
444  desc.addUntracked<bool>("endOfRunKills", false)
445  ->setComment(
446  "Kill the processing as soon as the end-of-run file appears, even if "
447  "there are/will be unprocessed lumisections.");
448 
449  // desc.addUntracked<unsigned int>("skipEvents", 0U)
450  // ->setComment("Skip the first 'skipEvents' events that otherwise would "
451  // "have been processed.");
452 
453  // This next parameter is read in the base class, but its default value
454  // depends on the derived class, so it is set here.
455  desc.addUntracked<bool>("inputFileTransitionsEachEvent", false);
456 
460 
461  descriptions.add("source", desc);
462  }
static void fillDescription(ParameterSetDescription &description)
void add(std::string const &label, ParameterSetDescription const &psetDescription)
static void fillDescription(ParameterSetDescription &desc)
static void fillDescription(edm::ParameterSetDescription &d)

◆ genuineCloseFile()

void dqmservices::DQMStreamerReader::genuineCloseFile ( )
overrideprotectedvirtual

Reimplemented from edm::RawInputSource.

Definition at line 131 of file DQMStreamerReader.cc.

131 {}

◆ genuineReadFile()

void dqmservices::DQMStreamerReader::genuineReadFile ( )
overrideprotectedvirtual

Reimplemented from edm::RawInputSource.

Definition at line 142 of file DQMStreamerReader.cc.

References edm::StreamerInputSource::deserializeAndMergeWithRegistry(), getHeaderMsg(), RecoTauValidation_cfi::header, and isFirstFile_.

142  {
143  if (isFirstFile_) {
144  //The file was already opened in the constructor
145  isFirstFile_ = false;
146  return;
147  }
148 
149  //Get header/init from reader
150  InitMsgView const* header = getHeaderMsg();
152  }
InitMsgView const * getHeaderMsg()
void deserializeAndMergeWithRegistry(InitMsgView const &initView, bool subsequent=false)

◆ getEventMsg()

EventMsgView const * dqmservices::DQMStreamerReader::getEventMsg ( )
private

Definition at line 191 of file DQMStreamerReader.cc.

References file_, edm::RawInputSource::kFile, edm::RawInputSource::kStop, mps_check::msg, GetRecoTauVFromDQM_MC_cff::next, and dqmservices::DQMStreamerReader::OpenFile::streamFile_.

Referenced by prepareNextEvent().

191  {
192  auto next = file_.streamFile_->next();
194  return nullptr;
195  }
196 
198  return nullptr;
199  }
200 
201  EventMsgView const* msg = file_.streamFile_->currentRecord();
202 
203  // if (msg != nullptr) dumpEventView(msg);
204  return msg;
205  }
std::unique_ptr< edm::StreamerInputFile > streamFile_
tuple msg
Definition: mps_check.py:286
struct dqmservices::DQMStreamerReader::OpenFile file_

◆ getHeaderMsg()

InitMsgView const * dqmservices::DQMStreamerReader::getHeaderMsg ( )
private

Definition at line 180 of file DQMStreamerReader.cc.

References Exception, file_, edm::errors::FileReadError, RecoTauValidation_cfi::header, Header::INIT, and dqmservices::DQMStreamerReader::OpenFile::streamFile_.

Referenced by genuineReadFile(), and openFileImp_().

180  {
181  InitMsgView const* header = file_.streamFile_->startMessage();
182 
183  if (header->code() != Header::INIT) { // INIT Msg
184  throw edm::Exception(edm::errors::FileReadError, "DQMStreamerReader::readHeader")
185  << "received wrong message type: expected INIT, got " << header->code() << "\n";
186  }
187 
188  return header;
189  }
std::unique_ptr< edm::StreamerInputFile > streamFile_
struct dqmservices::DQMStreamerReader::OpenFile file_

◆ matchTriggerSel()

bool dqmservices::DQMStreamerReader::matchTriggerSel ( Strings const &  tnames)
private

Check if hlt selection matches any trigger name taken from the header file

Definition at line 359 of file DQMStreamerReader.cc.

References c, mps_fire::end, TriggerAnalyzer::hltPath, hltSel_, mps_fire::i, oniaPATMuonsWithTrigger_cff::matches, matchTriggerSel_, edm::regexMatch(), and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by openFileImp_().

359  {
360  matchTriggerSel_ = false;
361  for (Strings::const_iterator i(hltSel_.begin()), end(hltSel_.end()); i != end; ++i) {
363  hltPath.erase(
364  std::remove_if(
365  hltPath.begin(), hltPath.end(), [](char c) { return std::isspace(static_cast<unsigned char>(c)); }),
366  hltPath.end());
367  std::vector<Strings::const_iterator> matches = edm::regexMatch(tnames, hltPath);
368  if (!matches.empty()) {
369  matchTriggerSel_ = true;
370  }
371  }
372 
373  if (!matchTriggerSel_) {
374  edm::LogWarning("Trigger selection does not match any trigger path!!!") << std::endl;
375  }
376 
377  return matchTriggerSel_;
378  }
std::vector< std::vector< std::string >::const_iterator > regexMatch(std::vector< std::string > const &strings, std::regex const &regexp)
Definition: RegexMatch.cc:26
Log< level::Warning, false > LogWarning

◆ newHeader()

bool dqmservices::DQMStreamerReader::newHeader ( )

◆ openFileImp_()

void dqmservices::DQMStreamerReader::openFileImp_ ( const DQMFileIterator::LumiEntry entry)
private

Definition at line 91 of file DQMStreamerReader.cc.

References acceptAllEvt_, edm::StreamerInputSource::deserializeAndMergeWithRegistry(), mps_splice::entry, eventSelector_, file_, flagDeleteDatFiles_, getHeaderMsg(), RecoTauValidation_cfi::header, hltSel_, isFirstFile_, dqmservices::DQMStreamerReader::OpenFile::lumi_, matchTriggerSel(), castor_dqm_sourceclient_file_cfg::path, processedEventPerLs_, muonDTDigis_cfi::pset, dqmservices::DQMStreamerReader::OpenFile::streamFile_, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by openNextFileImp_().

91  {
94 
95  std::string path = entry.get_data_path();
96 
97  file_.lumi_ = entry;
98  file_.streamFile_ = std::make_unique<edm::StreamerInputFile>(path);
99 
100  InitMsgView const* header = getHeaderMsg();
101  if (isFirstFile_) {
103  }
104 
105  // dump the list of HLT trigger name from the header
106  // dumpInitHeader(header);
107 
108  // if specific trigger selection is requested, check if the requested triggers
109  // match with trigger paths in the header file
110  if (!acceptAllEvt_) {
111  Strings tnames;
112  header->hltTriggerNames(tnames);
113 
114  pset.addParameter<Strings>("SelectEvents", hltSel_);
115  eventSelector_.reset(new TriggerSelector(pset, tnames));
116 
117  // check if any trigger path name requested matches with trigger name in the
118  // header file
119  matchTriggerSel(tnames);
120  }
121 
122  // our initialization
124 
125  if (flagDeleteDatFiles_) {
126  // unlink the file
127  unlink(path.c_str());
128  }
129  }
std::vector< std::string > Strings
Definition: MsgTools.h:18
std::shared_ptr< TriggerSelector > eventSelector_
InitMsgView const * getHeaderMsg()
std::unique_ptr< edm::StreamerInputFile > streamFile_
void deserializeAndMergeWithRegistry(InitMsgView const &initView, bool subsequent=false)
bool matchTriggerSel(Strings const &tnames)
struct dqmservices::DQMStreamerReader::OpenFile file_

◆ openNextFileImp_()

bool dqmservices::DQMStreamerReader::openNextFileImp_ ( )
private

Definition at line 154 of file DQMStreamerReader.cc.

References closeFileImp_(), MillePedeFileConverter_cfg::e, fiterator_, dqmservices::DQMFileIterator::LumiEntry::get_data_path(), dqmservices::DQMFileIterator::logFileAction(), dqmservices::DQMFileIterator::logLumiState(), dqmservices::DQMFileIterator::open(), openFileImp_(), AlCaHLTBitMon_ParallelJobs::p, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by prepareNextFile().

154  {
155  closeFileImp_("skipping to another file");
156 
157  DQMFileIterator::LumiEntry currentLumi = fiterator_.open();
158  std::string p = currentLumi.get_data_path();
159 
160  if (std::filesystem::exists(p)) {
161  try {
162  openFileImp_(currentLumi);
163  return true;
164  } catch (const cms::Exception& e) {
165  fiterator_.logFileAction(std::string("Can't deserialize registry data (in open file): ") + e.what(), p);
166  fiterator_.logLumiState(currentLumi, "error: data file corrupted");
167 
168  closeFileImp_("data file corrupted");
169  return false;
170  }
171  } else {
172  /* dat file missing */
173  fiterator_.logFileAction("Data file (specified in json) is missing:", p);
174  fiterator_.logLumiState(currentLumi, "error: data file missing");
175 
176  return false;
177  }
178  }
void logLumiState(const LumiEntry &lumi, const std::string &msg)
void openFileImp_(const DQMFileIterator::LumiEntry &entry)
void logFileAction(const std::string &msg, const std::string &fileName="") const
void closeFileImp_(const std::string &reason)

◆ prepareNextEvent()

EventMsgView const * dqmservices::DQMStreamerReader::prepareNextEvent ( )
private

Waits and reads the event header. If end-of-run nullptr is returned.

Definition at line 274 of file DQMStreamerReader.cc.

References acceptEvent(), closeFileImp_(), dqmservices::DQMFileIterator::delay(), file_, fiterator_, getEventMsg(), GetRecoTauVFromDQM_MC_cff::next, dqmservices::DQMStreamerReader::OpenFile::open(), and prepareNextFile().

Referenced by checkNext(), and skip().

274  {
275  EventMsgView const* eview = nullptr;
277 
278  // wait for the next event
279  for (;;) {
280  // edm::LogAbsolute("DQMStreamerReader")
281  // << "State loop.";
282  bool next = prepareNextFile();
283  if (!next)
284  return nullptr;
285 
286  // sleep
287  if (!file_.open()) {
288  // the reader does not exist
289  fiterator_.delay();
290  } else {
291  // our reader exists, try to read out an event
292  eview = getEventMsg();
293 
294  if (eview == nullptr) {
295  // read unsuccessful
296  // this means end of file, so close the file
297  closeFileImp_("eof");
298  } else {
299  if (!acceptEvent(eview)) {
300  continue;
301  } else {
302  return eview;
303  }
304  }
305  }
306  }
307  return eview;
308  }
EventMsgView const * getEventMsg()
bool acceptEvent(const EventMsgView *)
void closeFileImp_(const std::string &reason)
struct dqmservices::DQMStreamerReader::OpenFile file_

◆ prepareNextFile()

bool dqmservices::DQMStreamerReader::prepareNextFile ( )
private

Prepare (open) the next file for reading. It is used by prepareNextEvent and in the constructor.

Does not block/wait.

Return false if this is end of run and/or no more file are available. However, return of "true" does not imply the file has been openned, but we need to wait until some future file becomes available.

Definition at line 217 of file DQMStreamerReader.cc.

References closeFileImp_(), file_, fiterator_, flagEndOfRunKills_, svgfig::load(), dqmservices::DQMFileIterator::logFileAction(), dqmservices::DQMFileIterator::lumiReady(), minEventsPerLs_, dqmservices::DQMStreamerReader::OpenFile::open(), openNextFileImp_(), processedEventPerLs_, edm::shutdown_flag, dqmservices::DQMFileIterator::state(), and dqmservices::DQMFileIterator::update_state().

Referenced by prepareNextEvent(), and reset_().

217  {
219 
220  for (;;) {
222 
223  if (edm::shutdown_flag.load()) {
224  fiterator_.logFileAction("Shutdown flag was set, shutting down.");
225 
226  closeFileImp_("shutdown flag is set");
227  return false;
228  }
229 
230  // check for end of run file and force quit
231  if (flagEndOfRunKills_ && (fiterator_.state() != State::OPEN)) {
232  closeFileImp_("forced end-of-run");
233  return false;
234  }
235 
236  // check for end of run and quit if everything has been processed.
237  // this clean exit
238  if ((!file_.open()) && (!fiterator_.lumiReady()) && (fiterator_.state() == State::EOR)) {
239  return false;
240  }
241 
242  // if this is end of run and no more files to process
243  // close it
245  (fiterator_.state() == State::EOR)) {
246  closeFileImp_("graceful end-of-run");
247  return false;
248  }
249 
250  // skip to the next file if we have no files openned yet
251  if (!file_.open()) {
252  if (fiterator_.lumiReady()) {
254  // we might need to open once more (if .dat is missing)
255  continue;
256  }
257  }
258 
259  // or if there is a next file and enough eventshas been processed.
262  // we might need to open once more (if .dat is missing)
263  continue;
264  }
265 
266  return true;
267  }
268  }
volatile std::atomic< bool > shutdown_flag
void logFileAction(const std::string &msg, const std::string &fileName="") const
def load(fileName)
Definition: svgfig.py:547
void closeFileImp_(const std::string &reason)
struct dqmservices::DQMStreamerReader::OpenFile file_

◆ reset_()

void dqmservices::DQMStreamerReader::reset_ ( )
overrideprivatevirtual

Reimplemented from edm::RawInputSource.

Definition at line 49 of file DQMStreamerReader.cc.

References dqmservices::DQMFileIterator::advanceToLumi(), dqmservices::DQMFileIterator::delay(), file_, fiterator_, flagSkipFirstLumis_, cmsLHEtoEOSManager::l, dqmservices::DQMFileIterator::lastLumiFound(), dqmservices::DQMFileIterator::logFileAction(), GetRecoTauVFromDQM_MC_cff::next, dqmservices::DQMStreamerReader::OpenFile::open(), prepareNextFile(), and dqmservices::DQMFileIterator::update_state().

Referenced by DQMStreamerReader().

49  {
50  // We have to load at least a single header,
51  // so the ProductRegistry gets initialized.
52  //
53  // This must happen here (inside the constructor),
54  // as ProductRegistry gets frozen after we initialize:
55  // https://cmssdt.cern.ch/SDT/lxr/source/FWCore/Framework/src/Schedule.cc#441
56 
57  fiterator_.logFileAction("Waiting for the first lumi in order to initialize.");
58 
60 
61  // Fast-forward to the last open file.
62  if (flagSkipFirstLumis_) {
63  unsigned int l = fiterator_.lastLumiFound();
64  if (l > 1) {
65  fiterator_.advanceToLumi(l, "skipped: fast-forward to the latest lumi");
66  }
67  }
68 
69  for (;;) {
70  bool next = prepareNextFile();
71 
72  // check for end of run
73  if (!next) {
74  fiterator_.logFileAction("End of run reached before DQMStreamerReader was initialised.");
75  return;
76  }
77 
78  // check if we have a file openned
79  if (file_.open()) {
80  // we are now initialised
81  break;
82  }
83 
84  // wait
85  fiterator_.delay();
86  }
87 
88  fiterator_.logFileAction("DQMStreamerReader initialised.");
89  }
void logFileAction(const std::string &msg, const std::string &fileName="") const
void advanceToLumi(unsigned int lumi, std::string reason)
struct dqmservices::DQMStreamerReader::OpenFile file_

◆ skip()

void dqmservices::DQMStreamerReader::skip ( int  toSkip)
overrideprotectedvirtual

Reimplemented from edm::InputSource.

Definition at line 403 of file DQMStreamerReader.cc.

References closeFileImp_(), MillePedeFileConverter_cfg::e, fiterator_, mps_fire::i, dqmservices::DQMFileIterator::logFileAction(), prepareNextEvent(), and AlCaHLTBitMon_QueryRunRegistry::string.

403  {
404  try {
405  for (int i = 0; i != toSkip; ++i) {
406  EventMsgView const* evMsg = prepareNextEvent();
407 
408  if (evMsg == nullptr) {
409  return;
410  }
411  }
412  } catch (const cms::Exception& e) {
413  // try to recover from corrupted files/events
414  fiterator_.logFileAction(std::string("Can't deserialize event data: ") + e.what());
415  closeFileImp_("data file corrupted");
416  }
417  }
void logFileAction(const std::string &msg, const std::string &fileName="") const
EventMsgView const * prepareNextEvent()
void closeFileImp_(const std::string &reason)

◆ triggerSel()

bool dqmservices::DQMStreamerReader::triggerSel ( )
private

If hlt trigger selection is '*', return a boolean variable to accept all events

Definition at line 342 of file DQMStreamerReader.cc.

References acceptAllEvt_, c, mps_fire::end, TriggerAnalyzer::hltPath, hltSel_, mps_fire::i, and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by DQMStreamerReader().

342  {
343  acceptAllEvt_ = false;
344  for (Strings::const_iterator i(hltSel_.begin()), end(hltSel_.end()); i != end; ++i) {
346  hltPath.erase(
347  std::remove_if(
348  hltPath.begin(), hltPath.end(), [](char c) { return std::isspace(static_cast<unsigned char>(c)); }),
349  hltPath.end());
350  if (hltPath == "*")
351  acceptAllEvt_ = true;
352  }
353  return acceptAllEvt_;
354  }

Member Data Documentation

◆ acceptAllEvt_

bool dqmservices::DQMStreamerReader::acceptAllEvt_
private

Definition at line 59 of file DQMStreamerReader.h.

Referenced by acceptEvent(), openFileImp_(), and triggerSel().

◆ eventSelector_

std::shared_ptr<TriggerSelector> dqmservices::DQMStreamerReader::eventSelector_
private

Definition at line 86 of file DQMStreamerReader.h.

Referenced by acceptEvent(), and openFileImp_().

◆ eventSkipperByID_

std::shared_ptr<edm::EventSkipperByID> dqmservices::DQMStreamerReader::eventSkipperByID_
private

Definition at line 85 of file DQMStreamerReader.h.

◆ file_

struct dqmservices::DQMStreamerReader::OpenFile dqmservices::DQMStreamerReader::file_
private

◆ fiterator_

DQMFileIterator dqmservices::DQMStreamerReader::fiterator_
private

◆ flagDeleteDatFiles_

bool dqmservices::DQMStreamerReader::flagDeleteDatFiles_
private

Definition at line 73 of file DQMStreamerReader.h.

Referenced by DQMStreamerReader(), and openFileImp_().

◆ flagEndOfRunKills_

bool dqmservices::DQMStreamerReader::flagEndOfRunKills_
private

Definition at line 72 of file DQMStreamerReader.h.

Referenced by DQMStreamerReader(), and prepareNextFile().

◆ flagSkipFirstLumis_

bool dqmservices::DQMStreamerReader::flagSkipFirstLumis_
private

Definition at line 71 of file DQMStreamerReader.h.

Referenced by DQMStreamerReader(), and reset_().

◆ hltSel_

Strings dqmservices::DQMStreamerReader::hltSel_
private

Definition at line 66 of file DQMStreamerReader.h.

Referenced by DQMStreamerReader(), matchTriggerSel(), openFileImp_(), and triggerSel().

◆ isFirstFile_

bool dqmservices::DQMStreamerReader::isFirstFile_ = true
private

Definition at line 61 of file DQMStreamerReader.h.

Referenced by genuineReadFile(), and openFileImp_().

◆ matchTriggerSel_

bool dqmservices::DQMStreamerReader::matchTriggerSel_
private

Definition at line 60 of file DQMStreamerReader.h.

Referenced by acceptEvent(), and matchTriggerSel().

◆ minEventsPerLs_

unsigned int dqmservices::DQMStreamerReader::minEventsPerLs_
private

Definition at line 69 of file DQMStreamerReader.h.

Referenced by DQMStreamerReader(), and prepareNextFile().

◆ mon_

edm::Service<DQMMonitoringService> dqmservices::DQMStreamerReader::mon_
private

Definition at line 89 of file DQMStreamerReader.h.

◆ processedEventPerLs_

unsigned int dqmservices::DQMStreamerReader::processedEventPerLs_
private

Definition at line 68 of file DQMStreamerReader.h.

Referenced by checkNext(), openFileImp_(), and prepareNextFile().

◆ runInputDir_

std::string dqmservices::DQMStreamerReader::runInputDir_
private

Definition at line 64 of file DQMStreamerReader.h.

Referenced by DQMStreamerReader().

◆ runNumber_

unsigned int dqmservices::DQMStreamerReader::runNumber_
private

Definition at line 63 of file DQMStreamerReader.h.

Referenced by DQMStreamerReader().

◆ streamLabel_

std::string dqmservices::DQMStreamerReader::streamLabel_
private

Definition at line 65 of file DQMStreamerReader.h.