CMS 3D CMS Logo

List of all members | Classes | Public Member Functions | Static Public Member Functions | Protected Member Functions | Static Protected Member Functions | Private Member Functions | Private Attributes
edm::StreamerInputSource Class Reference

#include <StreamerInputSource.h>

Inheritance diagram for edm::StreamerInputSource:
edm::RawInputSource edm::InputSource dqmservices::DQMStreamerReader edm::StreamerFileReader edm::StreamerInputModule< Producer >

Classes

class  EventPrincipalHolder
 

Public Member Functions

void deserializeAndMergeWithRegistry (InitMsgView const &initView, bool subsequent=false)
 
void deserializeEvent (EventMsgView const &eventView)
 
std::unique_ptr< SendJobHeaderdeserializeRegistry (InitMsgView const &initView)
 
bool isBufferLZMA (unsigned char const *inputBuffer, unsigned int inputSize)
 
bool isBufferZSTD (unsigned char const *inputBuffer, unsigned int inputSize)
 
 StreamerInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
 
 ~StreamerInputSource () override
 
- Public Member Functions inherited from edm::RawInputSource
 RawInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
 
 ~RawInputSource () override
 
- Public Member Functions inherited from edm::InputSource
std::shared_ptr< ActivityRegistryactReg () const
 Accessor for Activity Registry. More...
 
std::shared_ptr< BranchIDListHelper > & branchIDListHelper ()
 
std::shared_ptr< BranchIDListHelper const > branchIDListHelper () const
 Accessors for branchIDListHelper. More...
 
void closeFile (FileBlock *, bool cleaningUpAfterException)
 close current file More...
 
void doBeginJob ()
 Called by framework at beginning of job. More...
 
virtual void doBeginLumi (LuminosityBlockPrincipal &lbp, ProcessContext const *)
 Called by framework at beginning of lumi block. More...
 
virtual void doBeginRun (RunPrincipal &rp, ProcessContext const *)
 Called by framework at beginning of run. More...
 
void doEndJob ()
 Called by framework at end of job. More...
 
ProcessingController::ForwardState forwardState () const
 
bool goToEvent (EventID const &eventID)
 
 InputSource (InputSource const &)=delete
 
 InputSource (ParameterSet const &, InputSourceDescription const &)
 Constructor. More...
 
void issueReports (EventID const &eventID, StreamID streamID)
 issue an event report More...
 
LuminosityBlockNumber_t luminosityBlock () const
 Accessor for current luminosity block number. More...
 
std::shared_ptr< LuminosityBlockAuxiliaryluminosityBlockAuxiliary () const
 Called by the framework to merge or insert lumi in principal cache. More...
 
int maxEvents () const
 
int maxLuminosityBlocks () const
 
ModuleDescription const & moduleDescription () const
 Accessor for 'module' description. More...
 
ItemType nextItemType ()
 Advances the source to the next item. More...
 
InputSourceoperator= (InputSource const &)=delete
 
ProcessConfiguration const & processConfiguration () const
 Accessor for Process Configuration. More...
 
std::string const & processGUID () const
 Accessor for global process identifier. More...
 
ProcessHistoryRegistryprocessHistoryRegistry ()
 
ProcessHistoryRegistry const & processHistoryRegistry () const
 Accessors for process history registry. More...
 
ProcessingMode processingMode () const
 RunsLumisAndEvents (default), RunsAndLumis, or Runs. More...
 
std::shared_ptr< ProductRegistry > & productRegistry ()
 
std::shared_ptr< ProductRegistry const > productRegistry () const
 Accessors for product registry. More...
 
bool randomAccess () const
 
void readAndMergeLumi (LuminosityBlockPrincipal &lbp)
 Read next luminosity block (same as a prior lumi) More...
 
void readAndMergeRun (RunPrincipal &rp)
 Read next run (same as a prior run) More...
 
bool readEvent (EventPrincipal &ep, EventID const &, StreamContext &)
 Read a specific event. More...
 
void readEvent (EventPrincipal &ep, StreamContext &)
 Read next event. More...
 
std::unique_ptr< FileBlockreadFile ()
 Read next file. More...
 
void readLuminosityBlock (LuminosityBlockPrincipal &lumiPrincipal, HistoryAppender &historyAppender)
 Read next luminosity block (new lumi) More...
 
std::shared_ptr< LuminosityBlockAuxiliaryreadLuminosityBlockAuxiliary ()
 Read next luminosity block Auxilary. More...
 
void readRun (RunPrincipal &runPrincipal, HistoryAppender &historyAppender)
 Read next run (new run) More...
 
std::shared_ptr< RunAuxiliaryreadRunAuxiliary ()
 Read next run Auxiliary. More...
 
ProcessHistoryID const & reducedProcessHistoryID () const
 
virtual void registerProducts ()
 Register any produced products. More...
 
int remainingEvents () const
 
int remainingLuminosityBlocks () const
 
void repeat ()
 Reset the remaining number of events/lumis to the maximum number. More...
 
std::pair< SharedResourcesAcquirer *, std::recursive_mutex * > resourceSharedWithDelayedReader ()
 Returns nullptr if no resource shared between the Source and a DelayedReader. More...
 
ProcessingController::ReverseState reverseState () const
 
void rewind ()
 Begin again at the first event. More...
 
RunNumber_t run () const
 Accessor for current run number. More...
 
std::shared_ptr< RunAuxiliaryrunAuxiliary () const
 Called by the framework to merge or insert run in principal cache. More...
 
void setLuminosityBlockNumber_t (LuminosityBlockNumber_t lb)
 Set the luminosity block ID. More...
 
void setRunNumber (RunNumber_t r)
 Set the run number. More...
 
void skipEvents (int offset)
 
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper ()
 
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper () const
 Accessors for thinnedAssociationsHelper. More...
 
Timestamp const & timestamp () const
 Accessor for the current time, as seen by the input source. More...
 
virtual ~InputSource () noexcept(false)
 Destructor. More...
 

Static Public Member Functions

static void fillDescription (ParameterSetDescription &description)
 
static void mergeIntoRegistry (SendJobHeader const &header, ProductRegistry &, BranchIDListHelper &, ThinnedAssociationsHelper &, bool subsequent)
 
static unsigned int uncompressBuffer (unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize)
 
static unsigned int uncompressBufferLZMA (unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize, bool hasHeader=true)
 
static unsigned int uncompressBufferZSTD (unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize, bool hasHeader=true)
 
- Static Public Member Functions inherited from edm::RawInputSource
static void fillDescription (ParameterSetDescription &description)
 
- Static Public Member Functions inherited from edm::InputSource
static const std::string & baseType ()
 
static void fillDescription (ParameterSetDescription &desc)
 
static void fillDescriptions (ConfigurationDescriptions &descriptions)
 
static void prevalidate (ConfigurationDescriptions &)
 

Protected Member Functions

void resetAfterEndRun ()
 
- Protected Member Functions inherited from edm::RawInputSource
virtual Next checkNext ()=0
 
void makeEvent (EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
 
void setInputFileTransitionsEachEvent ()
 
- Protected Member Functions inherited from edm::InputSource
virtual void beginJob ()
 Begin protected makes it easier to do template programming. More...
 
void decreaseRemainingEventsBy (int iSkipped)
 
bool eventCached () const
 
bool newLumi () const
 
bool newRun () const
 
ProcessHistoryRegistryprocessHistoryRegistryForUpdate ()
 
ProductRegistryproductRegistryUpdate ()
 
void reset () const
 
void resetEventCached ()
 
void resetLuminosityBlockAuxiliary (bool isNewLumi=true) const
 
void resetNewLumi ()
 
void resetNewRun ()
 
void resetRunAuxiliary (bool isNewRun=true) const
 
void setEventCached ()
 Called by the framework to merge or ached() const {return eventCached_;}. More...
 
void setLuminosityBlockAuxiliary (LuminosityBlockAuxiliary *lbp)
 
void setNewLumi ()
 
void setNewRun ()
 
void setRunAuxiliary (RunAuxiliary *rp)
 
void setTimestamp (Timestamp const &theTime)
 To set the current time, as seen by the input source. More...
 
virtual void skip (int offset)
 
ItemType state () const
 

Static Protected Member Functions

static void buildClassCache (SendDescs const &descs)
 
static void declareStreamers (SendDescs const &descs)
 

Private Member Functions

void read (EventPrincipal &eventPrincipal) override
 
void setRun (RunNumber_t r) override
 

Private Attributes

bool adjustEventToNewProductRegistry_
 
std::vector< unsigned char > dest_
 
edm::propagate_const< std::unique_ptr< EventPrincipalHolder > > eventPrincipalHolder_
 
std::string processName_
 
unsigned int protocolVersion_
 
edm::propagate_const< std::unique_ptr< SendEvent > > sendEvent_
 
std::vector< edm::propagate_const< std::unique_ptr< EventPrincipalHolder > > > streamToEventPrincipalHolders_
 
edm::propagate_const< TClass * > tc_
 
TBufferFile xbuf_
 

Additional Inherited Members

- Public Types inherited from edm::RawInputSource
enum  Next { Next::kEvent, Next::kFile, Next::kStop }
 
- Public Types inherited from edm::InputSource
enum  ItemType {
  IsInvalid, IsStop, IsFile, IsRun,
  IsLumi, IsEvent, IsRepeat, IsSynchronize
}
 
enum  ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents }
 
- Public Attributes inherited from edm::InputSource
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> postEventReadFromSourceSignal_
 
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> preEventReadFromSourceSignal_
 

Detailed Description

Definition at line 31 of file StreamerInputSource.h.

Constructor & Destructor Documentation

◆ StreamerInputSource()

edm::StreamerInputSource::StreamerInputSource ( ParameterSet const &  pset,
InputSourceDescription const &  desc 
)
explicit

Definition at line 45 of file StreamerInputSource.cc.

46  : RawInputSource(pset, desc),
47  tc_(getTClass(typeid(SendEvent))),
48  dest_(init_size),
49  xbuf_(TBuffer::kRead, init_size),
50  sendEvent_(),
53  processName_(),
54  protocolVersion_(0U) {}

◆ ~StreamerInputSource()

edm::StreamerInputSource::~StreamerInputSource ( )
override

Definition at line 56 of file StreamerInputSource.cc.

56 {}

Member Function Documentation

◆ buildClassCache()

void edm::StreamerInputSource::buildClassCache ( SendDescs const &  descs)
staticprotected

Definition at line 108 of file StreamerInputSource.cc.

108  {
109  for (auto const& item : descs) {
110  //pi->init();
111  std::string const real_name = wrappedClassName(item.className());
112  FDEBUG(6) << "BuildReadData: " << real_name << std::endl;
113  doBuildRealData(real_name);
114  }
115  }

References edm::doBuildRealData(), FDEBUG, B2GTnPMonitor_cfi::item, AlCaHLTBitMon_QueryRunRegistry::string, and edm::wrappedClassName().

Referenced by mergeIntoRegistry().

◆ declareStreamers()

void edm::StreamerInputSource::declareStreamers ( SendDescs const &  descs)
staticprotected

Definition at line 89 of file StreamerInputSource.cc.

89  {
90  std::vector<std::string> missingDictionaries;
91  std::vector<std::string> branchNamesForMissing;
92  std::vector<std::string> producedTypes;
93  for (auto const& item : descs) {
94  //pi->init();
95  std::string const real_name = wrappedClassName(item.className());
96  FDEBUG(6) << "declare: " << real_name << std::endl;
97  if (!loadCap(real_name, missingDictionaries)) {
98  branchNamesForMissing.emplace_back(item.branchName());
99  producedTypes.emplace_back(item.className() + std::string(" (read from input)"));
100  }
101  }
102  if (!missingDictionaries.empty()) {
103  std::string context("Calling StreamerInputSource::declareStreamers, checking dictionaries for input types");
104  throwMissingDictionariesException(missingDictionaries, context, producedTypes, branchNamesForMissing, true);
105  }
106  }

References FDEBUG, B2GTnPMonitor_cfi::item, edm::loadCap(), AlCaHLTBitMon_QueryRunRegistry::string, edm::throwMissingDictionariesException(), and edm::wrappedClassName().

Referenced by mergeIntoRegistry().

◆ deserializeAndMergeWithRegistry()

void edm::StreamerInputSource::deserializeAndMergeWithRegistry ( InitMsgView const &  initView,
bool  subsequent = false 
)

◆ deserializeEvent()

void edm::StreamerInputSource::deserializeEvent ( EventMsgView const &  eventView)

Deserializes the specified event message.

Definition at line 185 of file StreamerInputSource.cc.

185  {
186  if (eventView.code() != Header::EVENT)
187  throw cms::Exception("StreamTranslation", "Event deserialization error")
188  << "received wrong message type: expected EVENT, got " << eventView.code() << "\n";
189  FDEBUG(9) << "Decode event: " << eventView.event() << " " << eventView.run() << " " << eventView.size() << " "
190  << eventView.adler32_chksum() << " " << eventView.eventLength() << " " << eventView.eventData()
191  << std::endl;
192  // uncompress if we need to
193  // 78 was a dummy value (for no uncompressed) - should be 0 for uncompressed
194  // need to get rid of this when 090 MTCC streamers are gotten rid of
195  unsigned long origsize = eventView.origDataSize();
196  unsigned long dest_size; //(should be >= eventView.origDataSize())
197 
198  uint32_t adler32_chksum = cms::Adler32((char const*)eventView.eventData(), eventView.eventLength());
199  //std::cout << "Adler32 checksum of event = " << adler32_chksum << std::endl;
200  //std::cout << "Adler32 checksum from header = " << eventView.adler32_chksum() << " "
201  // << "host name = " << eventView.hostName() << " len = " << eventView.hostName_len() << std::endl;
202  if ((uint32)adler32_chksum != eventView.adler32_chksum()) {
203  // skip event (based on option?) or throw exception?
204  throw cms::Exception("StreamDeserialization", "Checksum error")
205  << " chksum from event = " << adler32_chksum << " from header = " << eventView.adler32_chksum()
206  << " host name = " << eventView.hostName() << std::endl;
207  }
208  if (origsize != 78 && origsize != 0) {
209  // compressed
210  if (isBufferLZMA((unsigned char const*)eventView.eventData(), eventView.eventLength())) {
211  dest_size = uncompressBufferLZMA(const_cast<unsigned char*>((unsigned char const*)eventView.eventData()),
212  eventView.eventLength(),
213  dest_,
214  origsize);
215  } else if (isBufferZSTD((unsigned char const*)eventView.eventData(), eventView.eventLength())) {
216  dest_size = uncompressBufferZSTD(const_cast<unsigned char*>((unsigned char const*)eventView.eventData()),
217  eventView.eventLength(),
218  dest_,
219  origsize);
220  } else
221  dest_size = uncompressBuffer(const_cast<unsigned char*>((unsigned char const*)eventView.eventData()),
222  eventView.eventLength(),
223  dest_,
224  origsize);
225  } else { // not compressed
226  // we need to copy anyway the buffer as we are using dest in xbuf
227  dest_size = eventView.eventLength();
228  dest_.resize(dest_size);
229  unsigned char* pos = (unsigned char*)&dest_[0];
230  unsigned char const* from = (unsigned char const*)eventView.eventData();
231  std::copy(from, from + dest_size, pos);
232  }
233  //TBuffer xbuf(TBuffer::kRead, dest_size,
234  // (char const*) &dest[0],kFALSE);
235  //TBuffer xbuf(TBuffer::kRead, eventView.eventLength(),
236  // (char const*) eventView.eventData(),kFALSE);
237  xbuf_.Reset();
238  xbuf_.SetBuffer(&dest_[0], dest_size, kFALSE);
239  RootDebug tracer(10, 10);
240 
241  //We do not yet know which EventPrincipal we will use, therefore
242  // we are using a new EventPrincipalHolder as a proxy. We need to
243  // make a new one instead of reusing the same one becuase when running
244  // multi-threaded there will be multiple EventPrincipals being used
245  // simultaneously.
246  eventPrincipalHolder_ = std::make_unique<EventPrincipalHolder>(); // propagate_const<T> has no reset() function
248  {
249  std::shared_ptr<void> refCoreStreamerGuard(nullptr, [](void*) {
251  ;
252  });
253  sendEvent_ = std::unique_ptr<SendEvent>((SendEvent*)xbuf_.ReadObjectAny(tc_));
254  }
255 
256  if (sendEvent_.get() == nullptr) {
257  throw cms::Exception("StreamTranslation", "Event deserialization error")
258  << "got a null event from input stream\n";
259  }
261 
262  FDEBUG(5) << "Got event: " << sendEvent_->aux().id() << " " << sendEvent_->products().size() << std::endl;
263  if (runAuxiliary().get() == nullptr || runAuxiliary()->run() != sendEvent_->aux().run() ||
264  runAuxiliary()->processHistoryID() != sendEvent_->processHistory().id()) {
265  RunAuxiliary* runAuxiliary =
266  new RunAuxiliary(sendEvent_->aux().run(), sendEvent_->aux().time(), Timestamp::invalidTimestamp());
267  runAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id());
270  }
271  if (!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != eventView.lumi()) {
272  LuminosityBlockAuxiliary* luminosityBlockAuxiliary = new LuminosityBlockAuxiliary(
273  runAuxiliary()->run(), eventView.lumi(), sendEvent_->aux().time(), Timestamp::invalidTimestamp());
274  luminosityBlockAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id());
276  }
277  setEventCached();
278  }

References cms::Adler32(), EventMsgView::adler32_chksum(), EventMsgView::code(), filterCSVwithJSON::copy, dest_, Header::EVENT, EventMsgView::event(), EventMsgView::eventData(), EventMsgView::eventLength(), eventPrincipalHolder_, Exception, FDEBUG, edm::get(), EventMsgView::hostName(), edm::Timestamp::invalidTimestamp(), isBufferLZMA(), isBufferZSTD(), EventMsgView::lumi(), edm::InputSource::luminosityBlock(), edm::InputSource::luminosityBlockAuxiliary(), EventMsgView::origDataSize(), edm::InputSource::processHistoryRegistryForUpdate(), edm::ProcessHistoryRegistry::registerProcessHistory(), edm::InputSource::resetLuminosityBlockAuxiliary(), EventMsgView::run(), edm::InputSource::run(), edm::InputSource::runAuxiliary(), sendEvent_, edm::InputSource::setEventCached(), edm::InputSource::setLuminosityBlockAuxiliary(), edm::setRefCoreStreamer(), edm::InputSource::setRunAuxiliary(), EventMsgView::size(), tc_, uncompressBuffer(), uncompressBufferLZMA(), uncompressBufferZSTD(), and xbuf_.

Referenced by edm::StreamerFileReader::checkNext(), and dqmservices::DQMStreamerReader::checkNext().

◆ deserializeRegistry()

std::unique_ptr< SendJobHeader > edm::StreamerInputSource::deserializeRegistry ( InitMsgView const &  initView)

Deserializes the specified init message into a SendJobHeader object (which is related to the product registry).

Definition at line 121 of file StreamerInputSource.cc.

121  {
122  if (initView.code() != Header::INIT)
123  throw cms::Exception("StreamTranslation", "Registry deserialization error")
124  << "received wrong message type: expected INIT, got " << initView.code() << "\n";
125 
126  //Get the process name and store if for Protocol version 4 and above.
127  if (initView.protocolVersion() > 3) {
128  processName_ = initView.processName();
129  protocolVersion_ = initView.protocolVersion();
130 
131  FDEBUG(10) << "StreamerInputSource::deserializeRegistry processName = " << processName_ << std::endl;
132  FDEBUG(10) << "StreamerInputSource::deserializeRegistry protocolVersion_= " << protocolVersion_ << std::endl;
133  }
134 
135  // calculate the adler32 checksum
136  uint32_t adler32_chksum = cms::Adler32((char const*)initView.descData(), initView.descLength());
137  //std::cout << "Adler32 checksum of init message = " << adler32_chksum << std::endl;
138  //std::cout << "Adler32 checksum of init messsage from header = " << initView.adler32_chksum() << " "
139  // << "host name = " << initView.hostName() << " len = " << initView.hostName_len() << std::endl;
140  if ((uint32)adler32_chksum != initView.adler32_chksum()) {
141  // skip event (based on option?) or throw exception?
142  throw cms::Exception("StreamDeserialization", "Checksum error")
143  << " chksum from registry data = " << adler32_chksum << " from header = " << initView.adler32_chksum()
144  << " host name = " << initView.hostName() << std::endl;
145  }
146 
147  TClass* desc = getTClass(typeid(SendJobHeader));
148 
149  TBufferFile xbuf(
150  TBuffer::kRead, initView.descLength(), const_cast<char*>((char const*)initView.descData()), kFALSE);
151  RootDebug tracer(10, 10);
152  std::unique_ptr<SendJobHeader> sd((SendJobHeader*)xbuf.ReadObjectAny(desc));
153 
154  if (sd.get() == nullptr) {
155  throw cms::Exception("StreamTranslation", "Registry deserialization error")
156  << "Could not read the initial product registry list\n";
157  }
158 
159  sd->initializeTransients();
160  return sd;
161  }

References cms::Adler32(), InitMsgView::adler32_chksum(), InitMsgView::code(), InitMsgView::descData(), InitMsgView::descLength(), Exception, FDEBUG, edm::getTClass(), InitMsgView::hostName(), Header::INIT, InitMsgView::processName(), processName_, InitMsgView::protocolVersion(), protocolVersion_, and sd.

Referenced by deserializeAndMergeWithRegistry().

◆ fillDescription()

void edm::StreamerInputSource::fillDescription ( ParameterSetDescription description)
static

◆ isBufferLZMA()

bool edm::StreamerInputSource::isBufferLZMA ( unsigned char const *  inputBuffer,
unsigned int  inputSize 
)

Detect if buffer starts with "XZ\0" which means it is compressed in LZMA format

Definition at line 379 of file StreamerInputSource.cc.

379  {
380  if (inputSize >= 4 && !strcmp((const char*)inputBuffer, "XZ"))
381  return true;
382  else
383  return false;
384  }

Referenced by deserializeEvent().

◆ isBufferZSTD()

bool edm::StreamerInputSource::isBufferZSTD ( unsigned char const *  inputBuffer,
unsigned int  inputSize 
)

Detect if buffer starts with "Z\0" which means it is compressed in ZStandard format

Definition at line 431 of file StreamerInputSource.cc.

431  {
432  if (inputSize >= 4 && !strcmp((const char*)inputBuffer, "ZS"))
433  return true;
434  else
435  return false;
436  }

Referenced by deserializeEvent().

◆ mergeIntoRegistry()

void edm::StreamerInputSource::mergeIntoRegistry ( SendJobHeader const &  header,
ProductRegistry reg,
BranchIDListHelper branchIDListHelper,
ThinnedAssociationsHelper thinnedHelper,
bool  subsequent 
)
static

Definition at line 59 of file StreamerInputSource.cc.

63  {
64  SendDescs const& descs = header.descs();
65 
66  FDEBUG(6) << "mergeIntoRegistry: Product List: " << std::endl;
67 
68  if (subsequent) {
69  ProductRegistry pReg;
70  pReg.updateFromInput(descs);
71  std::string mergeInfo = reg.merge(pReg, std::string(), BranchDescription::Permissive);
72  if (!mergeInfo.empty()) {
73  throw cms::Exception("MismatchedInput", "RootInputFileSequence::previousEvent()") << mergeInfo;
74  }
75  branchIDListHelper.updateFromInput(header.branchIDLists());
76  thinnedHelper.updateFromPrimaryInput(header.thinnedAssociationsHelper());
77  } else {
78  declareStreamers(descs);
79  buildClassCache(descs);
81  if (!reg.frozen()) {
82  reg.updateFromInput(descs);
83  }
84  branchIDListHelper.updateFromInput(header.branchIDLists());
85  thinnedHelper.updateFromPrimaryInput(header.thinnedAssociationsHelper());
86  }
87  }

References edm::InputSource::branchIDListHelper(), buildClassCache(), declareStreamers(), Exception, FDEBUG, edm::ProductRegistry::frozen(), RecoTauValidation_cfi::header, edm::loadExtraClasses(), edm::ProductRegistry::merge(), edm::BranchDescription::Permissive, AlCaHLTBitMon_QueryRunRegistry::string, edm::ProductRegistry::updateFromInput(), and edm::ThinnedAssociationsHelper::updateFromPrimaryInput().

Referenced by deserializeAndMergeWithRegistry().

◆ read()

void edm::StreamerInputSource::read ( EventPrincipal eventPrincipal)
overrideprivatevirtual

Implements edm::RawInputSource.

Definition at line 280 of file StreamerInputSource.cc.

280  {
282  eventPrincipal.adjustIndexesAfterProductRegistryAddition();
283  bool eventOK = eventPrincipal.adjustToNewProductRegistry(*productRegistry());
284  assert(eventOK);
286  }
287  EventSelectionIDVector ids(sendEvent_->eventSelectionIDs());
288  BranchListIndexes indexes(sendEvent_->branchListIndexes());
289  branchIDListHelper()->fixBranchListIndexes(indexes);
290  auto history = processHistoryRegistry().getMapped(sendEvent_->aux().processHistoryID());
291  eventPrincipal.fillEventPrincipal(sendEvent_->aux(), history, std::move(ids), std::move(indexes));
292 
293  //We now know which eventPrincipal to use and we can reuse the slot in
294  // streamToEventPrincipalHolders to own the memory
295  eventPrincipalHolder_->setEventPrincipal(&eventPrincipal);
296  if (streamToEventPrincipalHolders_.size() < eventPrincipal.streamID().value() + 1) {
297  streamToEventPrincipalHolders_.resize(eventPrincipal.streamID().value() + 1);
298  }
299  streamToEventPrincipalHolders_[eventPrincipal.streamID().value()] = std::move(eventPrincipalHolder_);
300 
301  // no process name list handling
302 
303  SendProds& sps = sendEvent_->products();
304  for (auto& spitem : sps) {
305  FDEBUG(10) << "check prodpair" << std::endl;
306  if (spitem.desc() == nullptr)
307  throw cms::Exception("StreamTranslation", "Empty Provenance");
308  FDEBUG(5) << "Prov:"
309  << " " << spitem.desc()->className() << " " << spitem.desc()->productInstanceName() << " "
310  << spitem.desc()->branchID() << std::endl;
311 
312  BranchDescription const branchDesc(*spitem.desc());
313  // This ProductProvenance constructor inserts into the entry description registry
314  if (spitem.parents()) {
315  std::optional<ProductProvenance> productProvenance{std::in_place, spitem.branchID(), *spitem.parents()};
316  if (spitem.prod() != nullptr) {
317  FDEBUG(10) << "addproduct next " << spitem.branchID() << std::endl;
318  eventPrincipal.putOnRead(branchDesc,
319  std::unique_ptr<WrapperBase>(const_cast<WrapperBase*>(spitem.prod())),
320  std::move(productProvenance));
321  FDEBUG(10) << "addproduct done" << std::endl;
322  } else {
323  FDEBUG(10) << "addproduct empty next " << spitem.branchID() << std::endl;
324  eventPrincipal.putOnRead(branchDesc, std::unique_ptr<WrapperBase>(), std::move(productProvenance));
325  FDEBUG(10) << "addproduct empty done" << std::endl;
326  }
327  } else {
328  std::optional<ProductProvenance> productProvenance;
329  if (spitem.prod() != nullptr) {
330  FDEBUG(10) << "addproduct next " << spitem.branchID() << std::endl;
331  eventPrincipal.putOnRead(
332  branchDesc, std::unique_ptr<WrapperBase>(const_cast<WrapperBase*>(spitem.prod())), productProvenance);
333  FDEBUG(10) << "addproduct done" << std::endl;
334  } else {
335  FDEBUG(10) << "addproduct empty next " << spitem.branchID() << std::endl;
336  eventPrincipal.putOnRead(branchDesc, std::unique_ptr<WrapperBase>(), productProvenance);
337  FDEBUG(10) << "addproduct empty done" << std::endl;
338  }
339  }
340  spitem.clear();
341  }
342 
343  FDEBUG(10) << "Size = " << eventPrincipal.size() << std::endl;
344  }

References adjustEventToNewProductRegistry_, edm::Principal::adjustIndexesAfterProductRegistryAddition(), edm::Principal::adjustToNewProductRegistry(), cms::cuda::assert(), edm::BranchDescription::branchID(), edm::InputSource::branchIDListHelper(), eventPrincipalHolder_, FDEBUG, edm::EventPrincipal::fillEventPrincipal(), edm::ProcessHistoryRegistry::getMapped(), eostools::move(), edm::InputSource::processHistoryRegistry(), edm::InputSource::productRegistry(), edm::EventPrincipal::putOnRead(), sendEvent_, edm::Principal::size(), edm::EventPrincipal::streamID(), streamToEventPrincipalHolders_, and edm::StreamID::value().

Referenced by edmIntegrityCheck.PublishToFileSystem::get().

◆ resetAfterEndRun()

void edm::StreamerInputSource::resetAfterEndRun ( )
protected

Definition at line 458 of file StreamerInputSource.cc.

458  {
459  // called from an online streamer source to reset after a stop command
460  // so an enable command will work
463  assert(!eventCached());
464  reset();
465  }

References cms::cuda::assert(), edm::InputSource::eventCached(), edm::InputSource::reset(), edm::InputSource::resetLuminosityBlockAuxiliary(), and edm::InputSource::resetRunAuxiliary().

◆ setRun()

void edm::StreamerInputSource::setRun ( RunNumber_t  r)
overrideprivatevirtual

Reimplemented from edm::InputSource.

Definition at line 467 of file StreamerInputSource.cc.

467  {
468  // Need to define a dummy setRun here or else the InputSource::setRun is called
469  // if we have a source inheriting from this and wants to define a setRun method
470  throw Exception(errors::LogicError) << "StreamerInputSource::setRun()\n"
471  << "Run number cannot be modified for this type of Input Source\n"
472  << "Contact a Storage Manager Developer\n";
473  }

References Exception, and edm::errors::LogicError.

◆ uncompressBuffer()

unsigned int edm::StreamerInputSource::uncompressBuffer ( unsigned char *  inputBuffer,
unsigned int  inputSize,
std::vector< unsigned char > &  outputBuffer,
unsigned int  expectedFullSize 
)
static

Uncompresses the data in the specified input buffer into the specified output buffer. The inputSize should be set to the size of the compressed data in the inputBuffer. The expectedFullSize should be set to the original size of the data (before compression). Returns the actual size of the uncompressed data. Errors are reported by throwing exceptions.

Definition at line 354 of file StreamerInputSource.cc.

357  {
358  unsigned long origSize = expectedFullSize;
359  unsigned long uncompressedSize = expectedFullSize * 1.1;
360  FDEBUG(1) << "Uncompress: original size = " << origSize << ", compressed size = " << inputSize << std::endl;
361  outputBuffer.resize(uncompressedSize);
362  int ret = uncompress(&outputBuffer[0], &uncompressedSize, inputBuffer, inputSize); // do not need compression level
363  //std::cout << "unCompress Return value: " << ret << " Okay = " << Z_OK << std::endl;
364  if (ret == Z_OK) {
365  // check the length against original uncompressed length
366  FDEBUG(10) << " original size = " << origSize << " final size = " << uncompressedSize << std::endl;
367  if (origSize != uncompressedSize) {
368  // we throw an error and return without event! null pointer
369  throw cms::Exception("StreamDeserialization", "Uncompression error")
370  << "mismatch event lengths should be" << origSize << " got " << uncompressedSize << "\n";
371  }
372  } else {
373  // we throw an error and return without event! null pointer
374  throw cms::Exception("StreamDeserialization", "Uncompression error") << "Error code = " << ret << "\n ";
375  }
376  return (unsigned int)uncompressedSize;
377  }

References Exception, FDEBUG, and runTheMatrix::ret.

Referenced by deserializeEvent().

◆ uncompressBufferLZMA()

unsigned int edm::StreamerInputSource::uncompressBufferLZMA ( unsigned char *  inputBuffer,
unsigned int  inputSize,
std::vector< unsigned char > &  outputBuffer,
unsigned int  expectedFullSize,
bool  hasHeader = true 
)
static

Definition at line 386 of file StreamerInputSource.cc.

390  {
391  unsigned long origSize = expectedFullSize;
392  unsigned long uncompressedSize = expectedFullSize * 1.1;
393  FDEBUG(1) << "Uncompress: original size = " << origSize << ", compressed size = " << inputSize << std::endl;
394  outputBuffer.resize(uncompressedSize);
395 
396  lzma_stream stream = LZMA_STREAM_INIT;
397  lzma_ret returnStatus;
398 
399  returnStatus = lzma_stream_decoder(&stream, UINT64_MAX, 0U);
400  if (returnStatus != LZMA_OK) {
401  throw cms::Exception("StreamDeserializationLZM", "LZMA stream decoder error")
402  << "Error code = " << returnStatus << "\n ";
403  }
404 
405  size_t hdrSize = hasHeader ? 4 : 0;
406  stream.next_in = (const uint8_t*)(inputBuffer + hdrSize);
407  stream.avail_in = (size_t)(inputSize - hdrSize);
408  stream.next_out = (uint8_t*)&outputBuffer[0];
409  stream.avail_out = (size_t)uncompressedSize;
410 
411  returnStatus = lzma_code(&stream, LZMA_FINISH);
412  if (returnStatus != LZMA_STREAM_END) {
413  lzma_end(&stream);
414  throw cms::Exception("StreamDeserializationLZM", "LZMA uncompression error")
415  << "Error code = " << returnStatus << "\n ";
416  }
417  lzma_end(&stream);
418 
419  uncompressedSize = (unsigned int)stream.total_out;
420 
421  FDEBUG(10) << " original size = " << origSize << " final size = " << uncompressedSize << std::endl;
422  if (origSize != uncompressedSize) {
423  // we throw an error and return without event! null pointer
424  throw cms::Exception("StreamDeserialization", "LZMA uncompression error")
425  << "mismatch event lengths should be" << origSize << " got " << uncompressedSize << "\n";
426  }
427 
428  return uncompressedSize;
429  }

References Exception, FDEBUG, createfilelist::int, cms::cuda::stream, and mitigatedMETSequence_cff::U.

Referenced by deserializeEvent().

◆ uncompressBufferZSTD()

unsigned int edm::StreamerInputSource::uncompressBufferZSTD ( unsigned char *  inputBuffer,
unsigned int  inputSize,
std::vector< unsigned char > &  outputBuffer,
unsigned int  expectedFullSize,
bool  hasHeader = true 
)
static

Definition at line 438 of file StreamerInputSource.cc.

442  {
443  unsigned long uncompressedSize = expectedFullSize * 1.1;
444  FDEBUG(1) << "Uncompress: original size = " << expectedFullSize << ", compressed size = " << inputSize << std::endl;
445  outputBuffer.resize(uncompressedSize);
446 
447  size_t hdrSize = hasHeader ? 4 : 0;
448  size_t ret = ZSTD_decompress(
449  (void*)&(outputBuffer[0]), uncompressedSize, (const void*)(inputBuffer + hdrSize), inputSize - hdrSize);
450 
451  if (ZSTD_isError(ret)) {
452  throw cms::Exception("StreamDeserializationZSTD", "ZSTD uncompression error")
453  << "Error core " << ret << ", message:" << ZSTD_getErrorName(ret);
454  }
455  return (unsigned int)ret;
456  }

References Exception, FDEBUG, and runTheMatrix::ret.

Referenced by deserializeEvent().

Member Data Documentation

◆ adjustEventToNewProductRegistry_

bool edm::StreamerInputSource::adjustEventToNewProductRegistry_
private

Definition at line 120 of file StreamerInputSource.h.

Referenced by deserializeAndMergeWithRegistry(), and read().

◆ dest_

std::vector<unsigned char> edm::StreamerInputSource::dest_
private

Definition at line 115 of file StreamerInputSource.h.

Referenced by deserializeEvent().

◆ eventPrincipalHolder_

edm::propagate_const<std::unique_ptr<EventPrincipalHolder> > edm::StreamerInputSource::eventPrincipalHolder_
private

Definition at line 118 of file StreamerInputSource.h.

Referenced by deserializeEvent(), and read().

◆ processName_

std::string edm::StreamerInputSource::processName_
private

Definition at line 122 of file StreamerInputSource.h.

Referenced by deserializeRegistry().

◆ protocolVersion_

unsigned int edm::StreamerInputSource::protocolVersion_
private

Definition at line 123 of file StreamerInputSource.h.

Referenced by deserializeRegistry().

◆ sendEvent_

edm::propagate_const<std::unique_ptr<SendEvent> > edm::StreamerInputSource::sendEvent_
private

Definition at line 117 of file StreamerInputSource.h.

Referenced by deserializeEvent(), and read().

◆ streamToEventPrincipalHolders_

std::vector<edm::propagate_const<std::unique_ptr<EventPrincipalHolder> > > edm::StreamerInputSource::streamToEventPrincipalHolders_
private

Definition at line 119 of file StreamerInputSource.h.

Referenced by read().

◆ tc_

edm::propagate_const<TClass*> edm::StreamerInputSource::tc_
private

Definition at line 114 of file StreamerInputSource.h.

Referenced by deserializeEvent().

◆ xbuf_

TBufferFile edm::StreamerInputSource::xbuf_
private

Definition at line 116 of file StreamerInputSource.h.

Referenced by deserializeEvent().

runTheMatrix.ret
ret
prodAgent to be discontinued
Definition: runTheMatrix.py:355
edm::pset::Registry::instance
static Registry * instance()
Definition: Registry.cc:12
edm::ProcessHistoryRegistry::registerProcessHistory
bool registerProcessHistory(ProcessHistory const &processHistory)
Definition: ProcessHistoryRegistry.cc:11
edm::StreamerInputSource::deserializeRegistry
std::unique_ptr< SendJobHeader > deserializeRegistry(InitMsgView const &initView)
Definition: StreamerInputSource.cc:121
edm::throwMissingDictionariesException
void throwMissingDictionariesException(std::vector< std::string > &missingDictionaries, std::string const &context)
Definition: DictionaryTools.cc:193
filterCSVwithJSON.copy
copy
Definition: filterCSVwithJSON.py:36
edm::InputSource::thinnedAssociationsHelper
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Accessors for thinnedAssociationsHelper.
Definition: InputSource.h:165
edm::StreamerInputSource::tc_
edm::propagate_const< TClass * > tc_
Definition: StreamerInputSource.h:114
edm::errors::LogicError
Definition: EDMException.h:37
edm::doBuildRealData
void doBuildRealData(const std::string &name)
Definition: ClassFiller.cc:22
cms::cuda::stream
cudaStream_t stream
Definition: HistoContainer.h:57
pos
Definition: PixelAliasList.h:18
edm::BranchListIndexes
std::vector< BranchListIndex > BranchListIndexes
Definition: BranchListIndex.h:18
edm::StreamerInputSource::declareStreamers
static void declareStreamers(SendDescs const &descs)
Definition: StreamerInputSource.cc:89
cms::cuda::assert
assert(be >=bs)
edm::StreamerInputSource::uncompressBuffer
static unsigned int uncompressBuffer(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize)
Definition: StreamerInputSource.cc:354
edm::StreamerInputSource::sendEvent_
edm::propagate_const< std::unique_ptr< SendEvent > > sendEvent_
Definition: StreamerInputSource.h:117
edm::StreamerInputSource::isBufferLZMA
bool isBufferLZMA(unsigned char const *inputBuffer, unsigned int inputSize)
Definition: StreamerInputSource.cc:379
edm::StreamerInputSource::buildClassCache
static void buildClassCache(SendDescs const &descs)
Definition: StreamerInputSource.cc:108
Header::INIT
Definition: MsgHeader.h:15
edm::InputSource::processHistoryRegistryForUpdate
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:327
edm::InputSource::productRegistry
std::shared_ptr< ProductRegistry const > productRegistry() const
Accessors for product registry.
Definition: InputSource.h:151
uint32
unsigned int uint32
Definition: MsgTools.h:13
edm::StreamerInputSource::xbuf_
TBufferFile xbuf_
Definition: StreamerInputSource.h:116
edm::StreamerInputSource::streamToEventPrincipalHolders_
std::vector< edm::propagate_const< std::unique_ptr< EventPrincipalHolder > > > streamToEventPrincipalHolders_
Definition: StreamerInputSource.h:119
edm::BranchDescription::Permissive
Definition: BranchDescription.h:36
edm::StreamerInputSource::protocolVersion_
unsigned int protocolVersion_
Definition: StreamerInputSource.h:123
edm::RawInputSource::RawInputSource
RawInputSource(ParameterSet const &pset, InputSourceDescription const &desc)
Definition: RawInputSource.cc:17
Header::EVENT
Definition: MsgHeader.h:16
cms::Adler32
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
Definition: Adler32Calculator.cc:10
edm::InputSource::luminosityBlockAuxiliary
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:242
edm::StreamerInputSource::adjustEventToNewProductRegistry_
bool adjustEventToNewProductRegistry_
Definition: StreamerInputSource.h:120
edm::getTClass
TClass * getTClass(const std::type_info &ti)
Definition: ClassFiller.cc:63
mitigatedMETSequence_cff.U
U
Definition: mitigatedMETSequence_cff.py:36
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
edm::StreamerInputSource::processName_
std::string processName_
Definition: StreamerInputSource.h:122
edm::InputSource::runAuxiliary
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:239
edm::InputSource::processHistoryRegistry
ProcessHistoryRegistry const & processHistoryRegistry() const
Accessors for process history registry.
Definition: InputSource.h:155
edm::InputSource::setRunAuxiliary
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:329
edm::InputSource::reset
void reset() const
Definition: InputSource.h:345
edm::InputSource::productRegistryUpdate
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:326
ParameterSet
Definition: Functions.h:16
FDEBUG
#define FDEBUG(lev)
Definition: DebugMacros.h:19
edm::get
T const & get(Event const &event, InputTag const &tag) noexcept(false)
Definition: Event.h:669
createfilelist.int
int
Definition: createfilelist.py:10
edm::StreamerInputSource::uncompressBufferZSTD
static unsigned int uncompressBufferZSTD(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize, bool hasHeader=true)
Definition: StreamerInputSource.cc:438
edm::InputSource::luminosityBlock
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:442
B2GTnPMonitor_cfi.item
item
Definition: B2GTnPMonitor_cfi.py:147
edm::wrappedClassName
std::string wrappedClassName(std::string const &iFullName)
Definition: WrappedClassName.cc:4
edm::StreamerInputSource::isBufferZSTD
bool isBufferZSTD(unsigned char const *inputBuffer, unsigned int inputSize)
Definition: StreamerInputSource.cc:431
edm::InputSource::setLuminosityBlockAuxiliary
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:333
edm::SendJobHeader::ParameterSetMap
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
Definition: StreamedProducts.h:104
edm::InputSource::branchIDListHelper
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Accessors for branchIDListHelper.
Definition: InputSource.h:159
edm::Timestamp::invalidTimestamp
static Timestamp invalidTimestamp()
Definition: Timestamp.h:82
edm::InputSource::eventCached
bool eventCached() const
Definition: InputSource.h:356
edm::StreamerInputSource::eventPrincipalHolder_
edm::propagate_const< std::unique_ptr< EventPrincipalHolder > > eventPrincipalHolder_
Definition: StreamerInputSource.h:118
eostools.move
def move(src, dest)
Definition: eostools.py:511
edm::loadCap
bool loadCap(const std::string &name, std::vector< std::string > &missingDictionaries)
Definition: ClassFiller.cc:16
edm::StreamerInputSource::mergeIntoRegistry
static void mergeIntoRegistry(SendJobHeader const &header, ProductRegistry &, BranchIDListHelper &, ThinnedAssociationsHelper &, bool subsequent)
Definition: StreamerInputSource.cc:59
Exception
Definition: hltDiff.cc:246
edm::StreamerInputSource::dest_
std::vector< unsigned char > dest_
Definition: StreamerInputSource.h:115
edm::ProcessHistoryRegistry::getMapped
bool getMapped(ProcessHistoryID const &key, ProcessHistory &value) const
Definition: ProcessHistoryRegistry.cc:29
edm::InputSource::setEventCached
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:358
edm::loadExtraClasses
void loadExtraClasses()
Definition: ClassFiller.cc:33
edm::setRefCoreStreamer
void setRefCoreStreamer(bool resetAll=false)
Definition: RefCoreStreamer.cc:83
edm::StreamerInputSource::uncompressBufferLZMA
static unsigned int uncompressBufferLZMA(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize, bool hasHeader=true)
Definition: StreamerInputSource.cc:386
edm::InputSource::resetLuminosityBlockAuxiliary
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:341
sd
double sd
Definition: CascadeWrapper.h:113
RecoTauValidation_cfi.header
header
Definition: RecoTauValidation_cfi.py:292
edm::InputSource::resetRunAuxiliary
void resetRunAuxiliary(bool isNewRun=true) const
Definition: InputSource.h:337
cms::Exception
Definition: Exception.h:70
edm::RawInputSource::fillDescription
static void fillDescription(ParameterSetDescription &description)
Definition: RawInputSource.cc:105
edm::EventSelectionIDVector
std::vector< EventSelectionID > EventSelectionIDVector
Definition: EventSelectionID.h:16
edm::SendDescs
std::vector< BranchDescription > SendDescs
Definition: StreamedProducts.h:100
muonDTDigis_cfi.pset
pset
Definition: muonDTDigis_cfi.py:27
edm::SendProds
std::vector< StreamedProduct > SendProds
Definition: StreamedProducts.h:67
edm::InputSource::run
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:437