CMS 3D CMS Logo

List of all members | Classes | Public Member Functions | Static Public Member Functions | Protected Member Functions | Static Protected Member Functions | Private Member Functions | Private Attributes
edm::StreamerInputSource Class Reference

#include <StreamerInputSource.h>

Inheritance diagram for edm::StreamerInputSource:
edm::RawInputSource edm::InputSource dqmservices::DQMStreamerReader edm::StreamerFileReader edm::StreamerInputModule< Producer >

Classes

class  EventPrincipalHolder
 

Public Member Functions

void deserializeAndMergeWithRegistry (InitMsgView const &initView, bool subsequent=false)
 
void deserializeEvent (EventMsgView const &eventView)
 
std::unique_ptr< SendJobHeaderdeserializeRegistry (InitMsgView const &initView)
 
bool isBufferLZMA (unsigned char const *inputBuffer, unsigned int inputSize)
 
bool isBufferZSTD (unsigned char const *inputBuffer, unsigned int inputSize)
 
 StreamerInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
 
 ~StreamerInputSource () override
 
- Public Member Functions inherited from edm::RawInputSource
 RawInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
 
 ~RawInputSource () override
 
- Public Member Functions inherited from edm::InputSource
std::shared_ptr< ActivityRegistryactReg () const
 Accessor for Activity Registry. More...
 
std::shared_ptr< BranchIDListHelper const > branchIDListHelper () const
 Accessors for branchIDListHelper. More...
 
std::shared_ptr< BranchIDListHelper > & branchIDListHelper ()
 
void closeFile (FileBlock *, bool cleaningUpAfterException)
 close current file More...
 
void doBeginJob ()
 Called by framework at beginning of job. More...
 
virtual void doBeginLumi (LuminosityBlockPrincipal &lbp, ProcessContext const *)
 Called by framework at beginning of lumi block. More...
 
virtual void doBeginRun (RunPrincipal &rp, ProcessContext const *)
 Called by framework at beginning of run. More...
 
void doEndJob ()
 Called by framework at end of job. More...
 
ProcessingController::ForwardState forwardState () const
 
bool goToEvent (EventID const &eventID)
 
 InputSource (ParameterSet const &, InputSourceDescription const &)
 Constructor. More...
 
 InputSource (InputSource const &)=delete
 
void issueReports (EventID const &eventID, StreamID streamID)
 issue an event report More...
 
LuminosityBlockNumber_t luminosityBlock () const
 Accessor for current luminosity block number. More...
 
std::shared_ptr< LuminosityBlockAuxiliaryluminosityBlockAuxiliary () const
 Called by the framework to merge or insert lumi in principal cache. More...
 
int maxEvents () const
 
int maxLuminosityBlocks () const
 
ModuleDescription const & moduleDescription () const
 Accessor for 'module' description. More...
 
ItemType nextItemType ()
 Advances the source to the next item. More...
 
InputSourceoperator= (InputSource const &)=delete
 
ProcessConfiguration const & processConfiguration () const
 Accessor for Process Configuration. More...
 
std::string const & processGUID () const
 Accessor for global process identifier. More...
 
ProcessHistoryRegistry const & processHistoryRegistry () const
 Accessors for process history registry. More...
 
ProcessHistoryRegistryprocessHistoryRegistry ()
 
ProcessingMode processingMode () const
 RunsLumisAndEvents (default), RunsAndLumis, or Runs. More...
 
std::shared_ptr< ProductRegistry const > productRegistry () const
 Accessors for product registry. More...
 
std::shared_ptr< ProductRegistry > & productRegistry ()
 
bool randomAccess () const
 
void readAndMergeLumi (LuminosityBlockPrincipal &lbp)
 Read next luminosity block (same as a prior lumi) More...
 
void readAndMergeRun (RunPrincipal &rp)
 Read next run (same as a prior run) More...
 
void readEvent (EventPrincipal &ep, StreamContext &)
 Read next event. More...
 
bool readEvent (EventPrincipal &ep, EventID const &, StreamContext &)
 Read a specific event. More...
 
std::unique_ptr< FileBlockreadFile ()
 Read next file. More...
 
void readLuminosityBlock (LuminosityBlockPrincipal &lumiPrincipal, HistoryAppender &historyAppender)
 Read next luminosity block (new lumi) More...
 
std::shared_ptr< LuminosityBlockAuxiliaryreadLuminosityBlockAuxiliary ()
 Read next luminosity block Auxilary. More...
 
void readRun (RunPrincipal &runPrincipal, HistoryAppender &historyAppender)
 Read next run (new run) More...
 
std::shared_ptr< RunAuxiliaryreadRunAuxiliary ()
 Read next run Auxiliary. More...
 
ProcessHistoryID const & reducedProcessHistoryID () const
 
virtual void registerProducts ()
 Register any produced products. More...
 
int remainingEvents () const
 
int remainingLuminosityBlocks () const
 
void repeat ()
 Reset the remaining number of events/lumis to the maximum number. More...
 
std::pair< SharedResourcesAcquirer *, std::recursive_mutex * > resourceSharedWithDelayedReader ()
 Returns nullptr if no resource shared between the Source and a DelayedReader. More...
 
ProcessingController::ReverseState reverseState () const
 
void rewind ()
 Begin again at the first event. More...
 
RunNumber_t run () const
 Accessor for current run number. More...
 
std::shared_ptr< RunAuxiliaryrunAuxiliary () const
 Called by the framework to merge or insert run in principal cache. More...
 
void setLuminosityBlockNumber_t (LuminosityBlockNumber_t lb)
 Set the luminosity block ID. More...
 
void setRunNumber (RunNumber_t r)
 Set the run number. More...
 
void skipEvents (int offset)
 
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper () const
 Accessors for thinnedAssociationsHelper. More...
 
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper ()
 
Timestamp const & timestamp () const
 Accessor for the current time, as seen by the input source. More...
 
virtual ~InputSource () noexcept(false)
 Destructor. More...
 

Static Public Member Functions

static void fillDescription (ParameterSetDescription &description)
 
static void mergeIntoRegistry (SendJobHeader const &header, ProductRegistry &, BranchIDListHelper &, ThinnedAssociationsHelper &, bool subsequent)
 
static unsigned int uncompressBuffer (unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize)
 
static unsigned int uncompressBufferLZMA (unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize, bool hasHeader=true)
 
static unsigned int uncompressBufferZSTD (unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize, bool hasHeader=true)
 
- Static Public Member Functions inherited from edm::RawInputSource
static void fillDescription (ParameterSetDescription &description)
 
- Static Public Member Functions inherited from edm::InputSource
static const std::string & baseType ()
 
static void fillDescription (ParameterSetDescription &desc)
 
static void fillDescriptions (ConfigurationDescriptions &descriptions)
 
static void prevalidate (ConfigurationDescriptions &)
 

Protected Member Functions

void resetAfterEndRun ()
 
- Protected Member Functions inherited from edm::RawInputSource
virtual bool checkNextEvent ()=0
 
void makeEvent (EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
 
void setInputFileTransitionsEachEvent ()
 
- Protected Member Functions inherited from edm::InputSource
virtual void beginJob ()
 Begin protected makes it easier to do template programming. More...
 
void decreaseRemainingEventsBy (int iSkipped)
 
bool eventCached () const
 
bool newLumi () const
 
bool newRun () const
 
ProcessHistoryRegistryprocessHistoryRegistryForUpdate ()
 
ProductRegistryproductRegistryUpdate ()
 
void reset () const
 
void resetEventCached ()
 
void resetLuminosityBlockAuxiliary (bool isNewLumi=true) const
 
void resetNewLumi ()
 
void resetNewRun ()
 
void resetRunAuxiliary (bool isNewRun=true) const
 
void setEventCached ()
 Called by the framework to merge or ached() const {return eventCached_;}. More...
 
void setLuminosityBlockAuxiliary (LuminosityBlockAuxiliary *lbp)
 
void setNewLumi ()
 
void setNewRun ()
 
void setRunAuxiliary (RunAuxiliary *rp)
 
void setTimestamp (Timestamp const &theTime)
 To set the current time, as seen by the input source. More...
 
virtual void skip (int offset)
 
ItemType state () const
 

Static Protected Member Functions

static void buildClassCache (SendDescs const &descs)
 
static void declareStreamers (SendDescs const &descs)
 

Private Member Functions

void read (EventPrincipal &eventPrincipal) override
 
std::unique_ptr< FileBlockreadFile_ () override
 
void setRun (RunNumber_t r) override
 

Private Attributes

bool adjustEventToNewProductRegistry_
 
std::vector< unsigned char > dest_
 
edm::propagate_const< std::unique_ptr< EventPrincipalHolder > > eventPrincipalHolder_
 
std::string processName_
 
unsigned int protocolVersion_
 
edm::propagate_const< std::unique_ptr< SendEvent > > sendEvent_
 
std::vector< edm::propagate_const< std::unique_ptr< EventPrincipalHolder > > > streamToEventPrincipalHolders_
 
edm::propagate_const< TClass * > tc_
 
TBufferFile xbuf_
 

Additional Inherited Members

- Public Types inherited from edm::InputSource
enum  ItemType {
  IsInvalid, IsStop, IsFile, IsRun,
  IsLumi, IsEvent, IsRepeat, IsSynchronize
}
 
enum  ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents }
 
- Public Attributes inherited from edm::InputSource
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> postEventReadFromSourceSignal_
 
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> preEventReadFromSourceSignal_
 

Detailed Description

Definition at line 31 of file StreamerInputSource.h.

Constructor & Destructor Documentation

edm::StreamerInputSource::StreamerInputSource ( ParameterSet const &  pset,
InputSourceDescription const &  desc 
)
explicit

Definition at line 45 of file StreamerInputSource.cc.

46  : RawInputSource(pset, desc),
47  tc_(getTClass(typeid(SendEvent))),
48  dest_(init_size),
49  xbuf_(TBuffer::kRead, init_size),
50  sendEvent_(),
53  processName_(),
54  protocolVersion_(0U) {}
RawInputSource(ParameterSet const &pset, InputSourceDescription const &desc)
edm::propagate_const< std::unique_ptr< EventPrincipalHolder > > eventPrincipalHolder_
edm::propagate_const< std::unique_ptr< SendEvent > > sendEvent_
TClass * getTClass(const std::type_info &ti)
edm::propagate_const< TClass * > tc_
std::vector< unsigned char > dest_
edm::StreamerInputSource::~StreamerInputSource ( )
override

Definition at line 56 of file StreamerInputSource.cc.

56 {}

Member Function Documentation

void edm::StreamerInputSource::buildClassCache ( SendDescs const &  descs)
staticprotected

Definition at line 110 of file StreamerInputSource.cc.

References edm::doBuildRealData(), FDEBUG, B2GTnPMonitor_cfi::item, AlCaHLTBitMon_QueryRunRegistry::string, and edm::wrappedClassName().

Referenced by mergeIntoRegistry().

110  {
111  for (auto const& item : descs) {
112  //pi->init();
113  std::string const real_name = wrappedClassName(item.className());
114  FDEBUG(6) << "BuildReadData: " << real_name << std::endl;
115  doBuildRealData(real_name);
116  }
117  }
void doBuildRealData(const std::string &name)
std::string wrappedClassName(std::string const &iFullName)
#define FDEBUG(lev)
Definition: DebugMacros.h:19
void edm::StreamerInputSource::declareStreamers ( SendDescs const &  descs)
staticprotected

Definition at line 91 of file StreamerInputSource.cc.

References FDEBUG, B2GTnPMonitor_cfi::item, edm::loadCap(), AlCaHLTBitMon_QueryRunRegistry::string, edm::throwMissingDictionariesException(), and edm::wrappedClassName().

Referenced by mergeIntoRegistry().

91  {
92  std::vector<std::string> missingDictionaries;
93  std::vector<std::string> branchNamesForMissing;
94  std::vector<std::string> producedTypes;
95  for (auto const& item : descs) {
96  //pi->init();
97  std::string const real_name = wrappedClassName(item.className());
98  FDEBUG(6) << "declare: " << real_name << std::endl;
99  if (!loadCap(real_name, missingDictionaries)) {
100  branchNamesForMissing.emplace_back(item.branchName());
101  producedTypes.emplace_back(item.className() + std::string(" (read from input)"));
102  }
103  }
104  if (!missingDictionaries.empty()) {
105  std::string context("Calling StreamerInputSource::declareStreamers, checking dictionaries for input types");
106  throwMissingDictionariesException(missingDictionaries, context, producedTypes, branchNamesForMissing, true);
107  }
108  }
void throwMissingDictionariesException(std::vector< std::string > &missingDictionaries, std::string const &context)
std::string wrappedClassName(std::string const &iFullName)
bool loadCap(const std::string &name, std::vector< std::string > &missingDictionaries)
#define FDEBUG(lev)
Definition: DebugMacros.h:19
void edm::StreamerInputSource::deserializeAndMergeWithRegistry ( InitMsgView const &  initView,
bool  subsequent = false 
)

Deserializes the specified init message into a SendJobHeader object and merges registries.

Definition at line 169 of file StreamerInputSource.cc.

References adjustEventToNewProductRegistry_, edm::InputSource::branchIDListHelper(), deserializeRegistry(), edm::pset::Registry::insertMapped(), edm::pset::Registry::instance(), B2GTnPMonitor_cfi::item, mergeIntoRegistry(), edm::InputSource::productRegistryUpdate(), muonDTDigis_cfi::pset, sd, and edm::InputSource::thinnedAssociationsHelper().

Referenced by edm::StreamerFileReader::checkNextEvent(), dqmservices::DQMStreamerReader::checkNextEvent(), edm::StreamerInputModule< Producer >::checkNextEvent(), dqmservices::DQMStreamerReader::openFileImp_(), edm::StreamerFileReader::reset_(), and edm::StreamerInputModule< Producer >::StreamerInputModule().

169  {
170  std::unique_ptr<SendJobHeader> sd = deserializeRegistry(initView);
172  if (subsequent) {
174  }
175  SendJobHeader::ParameterSetMap const& psetMap = sd->processParameterSet();
176  pset::Registry& psetRegistry = *pset::Registry::instance();
177  for (auto const& item : psetMap) {
178  ParameterSet pset(item.second.pset());
179  pset.setID(item.first);
180  psetRegistry.insertMapped(pset);
181  }
182  }
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:326
static void mergeIntoRegistry(SendJobHeader const &header, ProductRegistry &, BranchIDListHelper &, ThinnedAssociationsHelper &, bool subsequent)
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
std::unique_ptr< SendJobHeader > deserializeRegistry(InitMsgView const &initView)
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Accessors for branchIDListHelper.
Definition: InputSource.h:159
double sd
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Accessors for thinnedAssociationsHelper.
Definition: InputSource.h:165
static Registry * instance()
Definition: Registry.cc:12
void edm::StreamerInputSource::deserializeEvent ( EventMsgView const &  eventView)

Deserializes the specified event message.

Definition at line 187 of file StreamerInputSource.cc.

References cms::Adler32(), EventMsgView::adler32_chksum(), EventMsgView::code(), filterCSVwithJSON::copy, dest_, Header::EVENT, EventMsgView::event(), EventMsgView::eventData(), EventMsgView::eventLength(), eventPrincipalHolder_, Exception, FDEBUG, EventMsgView::hostName(), edm::Timestamp::invalidTimestamp(), isBufferLZMA(), isBufferZSTD(), EventMsgView::lumi(), edm::InputSource::luminosityBlock(), edm::InputSource::luminosityBlockAuxiliary(), EventMsgView::origDataSize(), edm::InputSource::processHistoryRegistryForUpdate(), edm::ProcessHistoryRegistry::registerProcessHistory(), edm::InputSource::resetLuminosityBlockAuxiliary(), EventMsgView::run(), edm::InputSource::run(), edm::InputSource::runAuxiliary(), sendEvent_, edm::InputSource::setEventCached(), edm::InputSource::setLuminosityBlockAuxiliary(), edm::RunAuxiliary::setProcessHistoryID(), edm::LuminosityBlockAuxiliary::setProcessHistoryID(), edm::setRefCoreStreamer(), edm::InputSource::setRunAuxiliary(), EventMsgView::size(), tc_, uncompressBuffer(), uncompressBufferLZMA(), uncompressBufferZSTD(), and xbuf_.

Referenced by edm::StreamerFileReader::checkNextEvent(), dqmservices::DQMStreamerReader::checkNextEvent(), and edm::StreamerInputModule< Producer >::checkNextEvent().

187  {
188  if (eventView.code() != Header::EVENT)
189  throw cms::Exception("StreamTranslation", "Event deserialization error")
190  << "received wrong message type: expected EVENT, got " << eventView.code() << "\n";
191  FDEBUG(9) << "Decode event: " << eventView.event() << " " << eventView.run() << " " << eventView.size() << " "
192  << eventView.adler32_chksum() << " " << eventView.eventLength() << " " << eventView.eventData()
193  << std::endl;
194  // uncompress if we need to
195  // 78 was a dummy value (for no uncompressed) - should be 0 for uncompressed
196  // need to get rid of this when 090 MTCC streamers are gotten rid of
197  unsigned long origsize = eventView.origDataSize();
198  unsigned long dest_size; //(should be >= eventView.origDataSize())
199 
200  uint32_t adler32_chksum = cms::Adler32((char const*)eventView.eventData(), eventView.eventLength());
201  //std::cout << "Adler32 checksum of event = " << adler32_chksum << std::endl;
202  //std::cout << "Adler32 checksum from header = " << eventView.adler32_chksum() << " "
203  // << "host name = " << eventView.hostName() << " len = " << eventView.hostName_len() << std::endl;
204  if ((uint32)adler32_chksum != eventView.adler32_chksum()) {
205  // skip event (based on option?) or throw exception?
206  throw cms::Exception("StreamDeserialization", "Checksum error")
207  << " chksum from event = " << adler32_chksum << " from header = " << eventView.adler32_chksum()
208  << " host name = " << eventView.hostName() << std::endl;
209  }
210  if (origsize != 78 && origsize != 0) {
211  // compressed
212  if (isBufferLZMA((unsigned char const*)eventView.eventData(), eventView.eventLength())) {
213  dest_size = uncompressBufferLZMA(const_cast<unsigned char*>((unsigned char const*)eventView.eventData()),
214  eventView.eventLength(),
215  dest_,
216  origsize);
217  } else if (isBufferZSTD((unsigned char const*)eventView.eventData(), eventView.eventLength())) {
218  dest_size = uncompressBufferZSTD(const_cast<unsigned char*>((unsigned char const*)eventView.eventData()),
219  eventView.eventLength(),
220  dest_,
221  origsize);
222  } else
223  dest_size = uncompressBuffer(const_cast<unsigned char*>((unsigned char const*)eventView.eventData()),
224  eventView.eventLength(),
225  dest_,
226  origsize);
227  } else { // not compressed
228  // we need to copy anyway the buffer as we are using dest in xbuf
229  dest_size = eventView.eventLength();
230  dest_.resize(dest_size);
231  unsigned char* pos = (unsigned char*)&dest_[0];
232  unsigned char const* from = (unsigned char const*)eventView.eventData();
233  std::copy(from, from + dest_size, pos);
234  }
235  //TBuffer xbuf(TBuffer::kRead, dest_size,
236  // (char const*) &dest[0],kFALSE);
237  //TBuffer xbuf(TBuffer::kRead, eventView.eventLength(),
238  // (char const*) eventView.eventData(),kFALSE);
239  xbuf_.Reset();
240  xbuf_.SetBuffer(&dest_[0], dest_size, kFALSE);
241  RootDebug tracer(10, 10);
242 
243  //We do not yet know which EventPrincipal we will use, therefore
244  // we are using a new EventPrincipalHolder as a proxy. We need to
245  // make a new one instead of reusing the same one becuase when running
246  // multi-threaded there will be multiple EventPrincipals being used
247  // simultaneously.
248  eventPrincipalHolder_ = std::make_unique<EventPrincipalHolder>(); // propagate_const<T> has no reset() function
250  {
251  std::shared_ptr<void> refCoreStreamerGuard(nullptr, [](void*) {
253  ;
254  });
255  sendEvent_ = std::unique_ptr<SendEvent>((SendEvent*)xbuf_.ReadObjectAny(tc_));
256  }
257 
258  if (sendEvent_.get() == nullptr) {
259  throw cms::Exception("StreamTranslation", "Event deserialization error")
260  << "got a null event from input stream\n";
261  }
263 
264  FDEBUG(5) << "Got event: " << sendEvent_->aux().id() << " " << sendEvent_->products().size() << std::endl;
265  if (runAuxiliary().get() == nullptr || runAuxiliary()->run() != sendEvent_->aux().run()) {
266  RunAuxiliary* runAuxiliary =
267  new RunAuxiliary(sendEvent_->aux().run(), sendEvent_->aux().time(), Timestamp::invalidTimestamp());
268  runAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id());
269  setRunAuxiliary(runAuxiliary);
271  }
272  if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != eventView.lumi()) {
273  LuminosityBlockAuxiliary* luminosityBlockAuxiliary = new LuminosityBlockAuxiliary(
274  runAuxiliary()->run(), eventView.lumi(), sendEvent_->aux().time(), Timestamp::invalidTimestamp());
275  luminosityBlockAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id());
276  setLuminosityBlockAuxiliary(luminosityBlockAuxiliary);
277  }
278  setEventCached();
279  }
static Timestamp invalidTimestamp()
Definition: Timestamp.h:82
bool isBufferZSTD(unsigned char const *inputBuffer, unsigned int inputSize)
bool isBufferLZMA(unsigned char const *inputBuffer, unsigned int inputSize)
bool registerProcessHistory(ProcessHistory const &processHistory)
void setRefCoreStreamer(bool resetAll=false)
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:436
edm::propagate_const< std::unique_ptr< EventPrincipalHolder > > eventPrincipalHolder_
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:333
static unsigned int uncompressBufferLZMA(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize, bool hasHeader=true)
unsigned int uint32
Definition: MsgTools.h:13
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:358
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:441
edm::propagate_const< std::unique_ptr< SendEvent > > sendEvent_
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:239
edm::propagate_const< TClass * > tc_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:341
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:327
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:329
static unsigned int uncompressBufferZSTD(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize, bool hasHeader=true)
#define FDEBUG(lev)
Definition: DebugMacros.h:19
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:242
static unsigned int uncompressBuffer(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize)
std::vector< unsigned char > dest_
std::unique_ptr< SendJobHeader > edm::StreamerInputSource::deserializeRegistry ( InitMsgView const &  initView)

Deserializes the specified init message into a SendJobHeader object (which is related to the product registry).

Definition at line 123 of file StreamerInputSource.cc.

References cms::Adler32(), InitMsgView::adler32_chksum(), InitMsgView::code(), InitMsgView::descData(), InitMsgView::descLength(), Exception, FDEBUG, edm::getTClass(), InitMsgView::hostName(), Header::INIT, InitMsgView::processName(), processName_, InitMsgView::protocolVersion(), protocolVersion_, and sd.

Referenced by deserializeAndMergeWithRegistry().

123  {
124  if (initView.code() != Header::INIT)
125  throw cms::Exception("StreamTranslation", "Registry deserialization error")
126  << "received wrong message type: expected INIT, got " << initView.code() << "\n";
127 
128  //Get the process name and store if for Protocol version 4 and above.
129  if (initView.protocolVersion() > 3) {
130  processName_ = initView.processName();
131  protocolVersion_ = initView.protocolVersion();
132 
133  FDEBUG(10) << "StreamerInputSource::deserializeRegistry processName = " << processName_ << std::endl;
134  FDEBUG(10) << "StreamerInputSource::deserializeRegistry protocolVersion_= " << protocolVersion_ << std::endl;
135  }
136 
137  // calculate the adler32 checksum
138  uint32_t adler32_chksum = cms::Adler32((char const*)initView.descData(), initView.descLength());
139  //std::cout << "Adler32 checksum of init message = " << adler32_chksum << std::endl;
140  //std::cout << "Adler32 checksum of init messsage from header = " << initView.adler32_chksum() << " "
141  // << "host name = " << initView.hostName() << " len = " << initView.hostName_len() << std::endl;
142  if ((uint32)adler32_chksum != initView.adler32_chksum()) {
143  // skip event (based on option?) or throw exception?
144  throw cms::Exception("StreamDeserialization", "Checksum error")
145  << " chksum from registry data = " << adler32_chksum << " from header = " << initView.adler32_chksum()
146  << " host name = " << initView.hostName() << std::endl;
147  }
148 
149  TClass* desc = getTClass(typeid(SendJobHeader));
150 
151  TBufferFile xbuf(
152  TBuffer::kRead, initView.descLength(), const_cast<char*>((char const*)initView.descData()), kFALSE);
153  RootDebug tracer(10, 10);
154  std::unique_ptr<SendJobHeader> sd((SendJobHeader*)xbuf.ReadObjectAny(desc));
155 
156  if (sd.get() == nullptr) {
157  throw cms::Exception("StreamTranslation", "Registry deserialization error")
158  << "Could not read the initial product registry list\n";
159  }
160 
161  sd->initializeTransients();
162  return sd;
163  }
unsigned int uint32
Definition: MsgTools.h:13
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
double sd
TClass * getTClass(const std::type_info &ti)
#define FDEBUG(lev)
Definition: DebugMacros.h:19
void edm::StreamerInputSource::fillDescription ( ParameterSetDescription description)
static
bool edm::StreamerInputSource::isBufferLZMA ( unsigned char const *  inputBuffer,
unsigned int  inputSize 
)

Detect if buffer starts with "XZ\0" which means it is compressed in LZMA format

Definition at line 378 of file StreamerInputSource.cc.

Referenced by deserializeEvent().

378  {
379  if (inputSize >= 4 && !strcmp((const char*)inputBuffer, "XZ"))
380  return true;
381  else
382  return false;
383  }
bool edm::StreamerInputSource::isBufferZSTD ( unsigned char const *  inputBuffer,
unsigned int  inputSize 
)

Detect if buffer starts with "Z\0" which means it is compressed in ZStandard format

Definition at line 430 of file StreamerInputSource.cc.

Referenced by deserializeEvent().

430  {
431  if (inputSize >= 4 && !strcmp((const char*)inputBuffer, "ZS"))
432  return true;
433  else
434  return false;
435  }
void edm::StreamerInputSource::mergeIntoRegistry ( SendJobHeader const &  header,
ProductRegistry reg,
BranchIDListHelper branchIDListHelper,
ThinnedAssociationsHelper thinnedHelper,
bool  subsequent 
)
static

Definition at line 61 of file StreamerInputSource.cc.

References edm::SendJobHeader::branchIDLists(), buildClassCache(), declareStreamers(), edm::SendJobHeader::descs(), Exception, FDEBUG, edm::ProductRegistry::frozen(), edm::loadExtraClasses(), edm::ProductRegistry::merge(), edm::BranchDescription::Permissive, AlCaHLTBitMon_QueryRunRegistry::string, edm::SendJobHeader::thinnedAssociationsHelper(), edm::BranchIDListHelper::updateFromInput(), edm::ProductRegistry::updateFromInput(), and edm::ThinnedAssociationsHelper::updateFromPrimaryInput().

Referenced by deserializeAndMergeWithRegistry().

65  {
66  SendDescs const& descs = header.descs();
67 
68  FDEBUG(6) << "mergeIntoRegistry: Product List: " << std::endl;
69 
70  if (subsequent) {
71  ProductRegistry pReg;
72  pReg.updateFromInput(descs);
73  std::string mergeInfo = reg.merge(pReg, std::string(), BranchDescription::Permissive);
74  if (!mergeInfo.empty()) {
75  throw cms::Exception("MismatchedInput", "RootInputFileSequence::previousEvent()") << mergeInfo;
76  }
77  branchIDListHelper.updateFromInput(header.branchIDLists());
78  thinnedHelper.updateFromPrimaryInput(header.thinnedAssociationsHelper());
79  } else {
80  declareStreamers(descs);
81  buildClassCache(descs);
83  if (!reg.frozen()) {
84  reg.updateFromInput(descs);
85  }
86  branchIDListHelper.updateFromInput(header.branchIDLists());
87  thinnedHelper.updateFromPrimaryInput(header.thinnedAssociationsHelper());
88  }
89  }
std::vector< BranchDescription > SendDescs
static void declareStreamers(SendDescs const &descs)
static void buildClassCache(SendDescs const &descs)
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Accessors for branchIDListHelper.
Definition: InputSource.h:159
#define FDEBUG(lev)
Definition: DebugMacros.h:19
void loadExtraClasses()
Definition: ClassFiller.cc:33
void edm::StreamerInputSource::read ( EventPrincipal eventPrincipal)
overrideprivatevirtual

Implements edm::RawInputSource.

Definition at line 281 of file StreamerInputSource.cc.

References adjustEventToNewProductRegistry_, edm::Principal::adjustIndexesAfterProductRegistryAddition(), edm::Principal::adjustToNewProductRegistry(), edm::InputSource::branchIDListHelper(), eventPrincipalHolder_, FDEBUG, edm::EventPrincipal::fillEventPrincipal(), photons_cff::ids, eostools::move(), edm::InputSource::processHistoryRegistry(), edm::InputSource::productRegistry(), edm::EventPrincipal::putOnRead(), sendEvent_, edm::Principal::size(), edm::EventPrincipal::streamID(), streamToEventPrincipalHolders_, and edm::StreamID::value().

Referenced by edmIntegrityCheck.PublishToFileSystem::get().

281  {
283  eventPrincipal.adjustIndexesAfterProductRegistryAddition();
284  bool eventOK = eventPrincipal.adjustToNewProductRegistry(*productRegistry());
285  assert(eventOK);
287  }
288  EventSelectionIDVector ids(sendEvent_->eventSelectionIDs());
289  BranchListIndexes indexes(sendEvent_->branchListIndexes());
290  branchIDListHelper()->fixBranchListIndexes(indexes);
291  eventPrincipal.fillEventPrincipal(sendEvent_->aux(), processHistoryRegistry(), std::move(ids), std::move(indexes));
292 
293  //We now know which eventPrincipal to use and we can reuse the slot in
294  // streamToEventPrincipalHolders to own the memory
295  eventPrincipalHolder_->setEventPrincipal(&eventPrincipal);
296  if (streamToEventPrincipalHolders_.size() < eventPrincipal.streamID().value() + 1) {
297  streamToEventPrincipalHolders_.resize(eventPrincipal.streamID().value() + 1);
298  }
299  streamToEventPrincipalHolders_[eventPrincipal.streamID().value()] = std::move(eventPrincipalHolder_);
300 
301  // no process name list handling
302 
303  SendProds& sps = sendEvent_->products();
304  for (auto& spitem : sps) {
305  FDEBUG(10) << "check prodpair" << std::endl;
306  if (spitem.desc() == nullptr)
307  throw cms::Exception("StreamTranslation", "Empty Provenance");
308  FDEBUG(5) << "Prov:"
309  << " " << spitem.desc()->className() << " " << spitem.desc()->productInstanceName() << " "
310  << spitem.desc()->branchID() << std::endl;
311 
312  BranchDescription const branchDesc(*spitem.desc());
313  // This ProductProvenance constructor inserts into the entry description registry
314  if (spitem.parents()) {
315  ProductProvenance productProvenance(spitem.branchID(), *spitem.parents());
316  if (spitem.prod() != nullptr) {
317  FDEBUG(10) << "addproduct next " << spitem.branchID() << std::endl;
318  eventPrincipal.putOnRead(
319  branchDesc, std::unique_ptr<WrapperBase>(const_cast<WrapperBase*>(spitem.prod())), &productProvenance);
320  FDEBUG(10) << "addproduct done" << std::endl;
321  } else {
322  FDEBUG(10) << "addproduct empty next " << spitem.branchID() << std::endl;
323  eventPrincipal.putOnRead(branchDesc, std::unique_ptr<WrapperBase>(), &productProvenance);
324  FDEBUG(10) << "addproduct empty done" << std::endl;
325  }
326  } else {
327  ProductProvenance const* productProvenance = nullptr;
328  if (spitem.prod() != nullptr) {
329  FDEBUG(10) << "addproduct next " << spitem.branchID() << std::endl;
330  eventPrincipal.putOnRead(
331  branchDesc, std::unique_ptr<WrapperBase>(const_cast<WrapperBase*>(spitem.prod())), productProvenance);
332  FDEBUG(10) << "addproduct done" << std::endl;
333  } else {
334  FDEBUG(10) << "addproduct empty next " << spitem.branchID() << std::endl;
335  eventPrincipal.putOnRead(branchDesc, std::unique_ptr<WrapperBase>(), productProvenance);
336  FDEBUG(10) << "addproduct empty done" << std::endl;
337  }
338  }
339  spitem.clear();
340  }
341 
342  FDEBUG(10) << "Size = " << eventPrincipal.size() << std::endl;
343  }
ProcessHistoryRegistry const & processHistoryRegistry() const
Accessors for process history registry.
Definition: InputSource.h:155
std::vector< EventSelectionID > EventSelectionIDVector
std::vector< BranchListIndex > BranchListIndexes
edm::propagate_const< std::unique_ptr< EventPrincipalHolder > > eventPrincipalHolder_
std::vector< StreamedProduct > SendProds
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Accessors for branchIDListHelper.
Definition: InputSource.h:159
edm::propagate_const< std::unique_ptr< SendEvent > > sendEvent_
std::shared_ptr< ProductRegistry const > productRegistry() const
Accessors for product registry.
Definition: InputSource.h:151
std::vector< edm::propagate_const< std::unique_ptr< EventPrincipalHolder > > > streamToEventPrincipalHolders_
#define FDEBUG(lev)
Definition: DebugMacros.h:19
def move(src, dest)
Definition: eostools.py:511
std::unique_ptr< FileBlock > edm::StreamerInputSource::readFile_ ( )
overrideprivatevirtual

Reimplemented from edm::InputSource.

Definition at line 59 of file StreamerInputSource.cc.

59 { return std::make_unique<FileBlock>(); }
void edm::StreamerInputSource::resetAfterEndRun ( )
protected

Definition at line 457 of file StreamerInputSource.cc.

References edm::InputSource::eventCached(), edm::InputSource::reset(), edm::InputSource::resetLuminosityBlockAuxiliary(), and edm::InputSource::resetRunAuxiliary().

457  {
458  // called from an online streamer source to reset after a stop command
459  // so an enable command will work
462  assert(!eventCached());
463  reset();
464  }
void resetRunAuxiliary(bool isNewRun=true) const
Definition: InputSource.h:337
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:341
void reset() const
Definition: InputSource.h:345
bool eventCached() const
Definition: InputSource.h:356
void edm::StreamerInputSource::setRun ( RunNumber_t  r)
overrideprivatevirtual

Reimplemented from edm::InputSource.

Definition at line 466 of file StreamerInputSource.cc.

References Exception, and edm::errors::LogicError.

466  {
467  // Need to define a dummy setRun here or else the InputSource::setRun is called
468  // if we have a source inheriting from this and wants to define a setRun method
469  throw Exception(errors::LogicError) << "StreamerInputSource::setRun()\n"
470  << "Run number cannot be modified for this type of Input Source\n"
471  << "Contact a Storage Manager Developer\n";
472  }
unsigned int edm::StreamerInputSource::uncompressBuffer ( unsigned char *  inputBuffer,
unsigned int  inputSize,
std::vector< unsigned char > &  outputBuffer,
unsigned int  expectedFullSize 
)
static

Uncompresses the data in the specified input buffer into the specified output buffer. The inputSize should be set to the size of the compressed data in the inputBuffer. The expectedFullSize should be set to the original size of the data (before compression). Returns the actual size of the uncompressed data. Errors are reported by throwing exceptions.

Definition at line 353 of file StreamerInputSource.cc.

References Exception, FDEBUG, and runTheMatrix::ret.

Referenced by deserializeEvent().

356  {
357  unsigned long origSize = expectedFullSize;
358  unsigned long uncompressedSize = expectedFullSize * 1.1;
359  FDEBUG(1) << "Uncompress: original size = " << origSize << ", compressed size = " << inputSize << std::endl;
360  outputBuffer.resize(uncompressedSize);
361  int ret = uncompress(&outputBuffer[0], &uncompressedSize, inputBuffer, inputSize); // do not need compression level
362  //std::cout << "unCompress Return value: " << ret << " Okay = " << Z_OK << std::endl;
363  if (ret == Z_OK) {
364  // check the length against original uncompressed length
365  FDEBUG(10) << " original size = " << origSize << " final size = " << uncompressedSize << std::endl;
366  if (origSize != uncompressedSize) {
367  // we throw an error and return without event! null pointer
368  throw cms::Exception("StreamDeserialization", "Uncompression error")
369  << "mismatch event lengths should be" << origSize << " got " << uncompressedSize << "\n";
370  }
371  } else {
372  // we throw an error and return without event! null pointer
373  throw cms::Exception("StreamDeserialization", "Uncompression error") << "Error code = " << ret << "\n ";
374  }
375  return (unsigned int)uncompressedSize;
376  }
ret
prodAgent to be discontinued
#define FDEBUG(lev)
Definition: DebugMacros.h:19
unsigned int edm::StreamerInputSource::uncompressBufferLZMA ( unsigned char *  inputBuffer,
unsigned int  inputSize,
std::vector< unsigned char > &  outputBuffer,
unsigned int  expectedFullSize,
bool  hasHeader = true 
)
static

Definition at line 385 of file StreamerInputSource.cc.

References Exception, FDEBUG, createfilelist::int, and mitigatedMETSequence_cff::U.

Referenced by deserializeEvent().

389  {
390  unsigned long origSize = expectedFullSize;
391  unsigned long uncompressedSize = expectedFullSize * 1.1;
392  FDEBUG(1) << "Uncompress: original size = " << origSize << ", compressed size = " << inputSize << std::endl;
393  outputBuffer.resize(uncompressedSize);
394 
395  lzma_stream stream = LZMA_STREAM_INIT;
396  lzma_ret returnStatus;
397 
398  returnStatus = lzma_stream_decoder(&stream, UINT64_MAX, 0U);
399  if (returnStatus != LZMA_OK) {
400  throw cms::Exception("StreamDeserializationLZM", "LZMA stream decoder error")
401  << "Error code = " << returnStatus << "\n ";
402  }
403 
404  size_t hdrSize = hasHeader ? 4 : 0;
405  stream.next_in = (const uint8_t*)(inputBuffer + hdrSize);
406  stream.avail_in = (size_t)(inputSize - hdrSize);
407  stream.next_out = (uint8_t*)&outputBuffer[0];
408  stream.avail_out = (size_t)uncompressedSize;
409 
410  returnStatus = lzma_code(&stream, LZMA_FINISH);
411  if (returnStatus != LZMA_STREAM_END) {
412  lzma_end(&stream);
413  throw cms::Exception("StreamDeserializationLZM", "LZMA uncompression error")
414  << "Error code = " << returnStatus << "\n ";
415  }
416  lzma_end(&stream);
417 
418  uncompressedSize = (unsigned int)stream.total_out;
419 
420  FDEBUG(10) << " original size = " << origSize << " final size = " << uncompressedSize << std::endl;
421  if (origSize != uncompressedSize) {
422  // we throw an error and return without event! null pointer
423  throw cms::Exception("StreamDeserialization", "LZMA uncompression error")
424  << "mismatch event lengths should be" << origSize << " got " << uncompressedSize << "\n";
425  }
426 
427  return uncompressedSize;
428  }
#define FDEBUG(lev)
Definition: DebugMacros.h:19
unsigned int edm::StreamerInputSource::uncompressBufferZSTD ( unsigned char *  inputBuffer,
unsigned int  inputSize,
std::vector< unsigned char > &  outputBuffer,
unsigned int  expectedFullSize,
bool  hasHeader = true 
)
static

Definition at line 437 of file StreamerInputSource.cc.

References Exception, FDEBUG, and runTheMatrix::ret.

Referenced by deserializeEvent().

441  {
442  unsigned long uncompressedSize = expectedFullSize * 1.1;
443  FDEBUG(1) << "Uncompress: original size = " << expectedFullSize << ", compressed size = " << inputSize << std::endl;
444  outputBuffer.resize(uncompressedSize);
445 
446  size_t hdrSize = hasHeader ? 4 : 0;
447  size_t ret = ZSTD_decompress(
448  (void*)&(outputBuffer[0]), uncompressedSize, (const void*)(inputBuffer + hdrSize), inputSize - hdrSize);
449 
450  if (ZSTD_isError(ret)) {
451  throw cms::Exception("StreamDeserializationZSTD", "ZSTD uncompression error")
452  << "Error core " << ret << ", message:" << ZSTD_getErrorName(ret);
453  }
454  return (unsigned int)ret;
455  }
ret
prodAgent to be discontinued
#define FDEBUG(lev)
Definition: DebugMacros.h:19

Member Data Documentation

bool edm::StreamerInputSource::adjustEventToNewProductRegistry_
private

Definition at line 122 of file StreamerInputSource.h.

Referenced by deserializeAndMergeWithRegistry(), and read().

std::vector<unsigned char> edm::StreamerInputSource::dest_
private

Definition at line 117 of file StreamerInputSource.h.

Referenced by deserializeEvent().

edm::propagate_const<std::unique_ptr<EventPrincipalHolder> > edm::StreamerInputSource::eventPrincipalHolder_
private

Definition at line 120 of file StreamerInputSource.h.

Referenced by deserializeEvent(), and read().

std::string edm::StreamerInputSource::processName_
private

Definition at line 124 of file StreamerInputSource.h.

Referenced by deserializeRegistry().

unsigned int edm::StreamerInputSource::protocolVersion_
private

Definition at line 125 of file StreamerInputSource.h.

Referenced by deserializeRegistry().

edm::propagate_const<std::unique_ptr<SendEvent> > edm::StreamerInputSource::sendEvent_
private

Definition at line 119 of file StreamerInputSource.h.

Referenced by deserializeEvent(), and read().

std::vector<edm::propagate_const<std::unique_ptr<EventPrincipalHolder> > > edm::StreamerInputSource::streamToEventPrincipalHolders_
private

Definition at line 121 of file StreamerInputSource.h.

Referenced by read().

edm::propagate_const<TClass*> edm::StreamerInputSource::tc_
private

Definition at line 116 of file StreamerInputSource.h.

Referenced by deserializeEvent().

TBufferFile edm::StreamerInputSource::xbuf_
private

Definition at line 118 of file StreamerInputSource.h.

Referenced by deserializeEvent().