CMS 3D CMS Logo

List of all members | Classes | Public Member Functions | Static Public Member Functions | Protected Member Functions | Static Protected Member Functions | Private Member Functions | Private Attributes
edm::StreamerInputSource Class Reference

#include <StreamerInputSource.h>

Inheritance diagram for edm::StreamerInputSource:
edm::RawInputSource edm::InputSource edm::ProductRegistryHelper dqmservices::DQMStreamerReader edm::StreamerFileReader edm::StreamerInputModule< Producer >

Classes

class  EventPrincipalHolder
 

Public Member Functions

void deserializeAndMergeWithRegistry (InitMsgView const &initView, bool subsequent=false)
 
void deserializeEvent (EventMsgView const &eventView)
 
std::unique_ptr< SendJobHeaderdeserializeRegistry (InitMsgView const &initView)
 
 StreamerInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
 
virtual ~StreamerInputSource ()
 
- Public Member Functions inherited from edm::RawInputSource
 RawInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
 
virtual ~RawInputSource ()
 
- Public Member Functions inherited from edm::InputSource
std::shared_ptr< ActivityRegistryactReg () const
 Accessor for Activity Registry. More...
 
std::shared_ptr< BranchIDListHelper const > branchIDListHelper () const
 Accessors for branchIDListHelper. More...
 
std::shared_ptr< BranchIDListHelper > & branchIDListHelper ()
 
void closeFile (FileBlock *, bool cleaningUpAfterException)
 close current file More...
 
void doBeginJob ()
 Called by framework at beginning of job. More...
 
void doBeginLumi (LuminosityBlockPrincipal &lbp, ProcessContext const *)
 Called by framework at beginning of lumi block. More...
 
void doBeginRun (RunPrincipal &rp, ProcessContext const *)
 Called by framework at beginning of run. More...
 
void doEndJob ()
 Called by framework at end of job. More...
 
void doEndLumi (LuminosityBlockPrincipal &lbp, bool cleaningUpAfterException, ProcessContext const *)
 Called by framework at end of lumi block. More...
 
void doEndRun (RunPrincipal &rp, bool cleaningUpAfterException, ProcessContext const *)
 Called by framework at end of run. More...
 
ProcessingController::ForwardState forwardState () const
 
bool goToEvent (EventID const &eventID)
 
 InputSource (ParameterSet const &, InputSourceDescription const &)
 Constructor. More...
 
 InputSource (InputSource const &)=delete
 
void issueReports (EventID const &eventID)
 issue an event report More...
 
LuminosityBlockNumber_t luminosityBlock () const
 Accessor for current luminosity block number. More...
 
std::shared_ptr< LuminosityBlockAuxiliaryluminosityBlockAuxiliary () const
 Called by the framework to merge or insert lumi in principal cache. More...
 
int maxEvents () const
 
int maxLuminosityBlocks () const
 
ModuleDescription const & moduleDescription () const
 Accessor for 'module' description. More...
 
ItemType nextItemType ()
 Advances the source to the next item. More...
 
InputSourceoperator= (InputSource const &)=delete
 
ProcessConfiguration const & processConfiguration () const
 Accessor for Process Configuration. More...
 
std::string const & processGUID () const
 Accessor for global process identifier. More...
 
ProcessHistoryRegistry const & processHistoryRegistry () const
 Accessors for process history registry. More...
 
ProcessHistoryRegistryprocessHistoryRegistry ()
 
ProcessingMode processingMode () const
 RunsLumisAndEvents (default), RunsAndLumis, or Runs. More...
 
std::shared_ptr< ProductRegistry const > productRegistry () const
 Accessors for product registry. More...
 
std::shared_ptr< ProductRegistry > & productRegistry ()
 
bool randomAccess () const
 
void readAndMergeLumi (LuminosityBlockPrincipal &lbp)
 Read next luminosity block (same as a prior lumi) More...
 
void readAndMergeRun (RunPrincipal &rp)
 Read next run (same as a prior run) More...
 
void readEvent (EventPrincipal &ep, StreamContext &)
 Read next event. More...
 
bool readEvent (EventPrincipal &ep, EventID const &, StreamContext &)
 Read a specific event. More...
 
std::unique_ptr< FileBlockreadFile ()
 Read next file. More...
 
void readLuminosityBlock (LuminosityBlockPrincipal &lumiPrincipal, HistoryAppender &historyAppender)
 Read next luminosity block (new lumi) More...
 
std::shared_ptr< LuminosityBlockAuxiliaryreadLuminosityBlockAuxiliary ()
 Read next luminosity block Auxilary. More...
 
void readRun (RunPrincipal &runPrincipal, HistoryAppender &historyAppender)
 Read next run (new run) More...
 
std::shared_ptr< RunAuxiliaryreadRunAuxiliary ()
 Read next run Auxiliary. More...
 
ProcessHistoryID const & reducedProcessHistoryID () const
 
void registerProducts ()
 Register any produced products. More...
 
int remainingEvents () const
 
int remainingLuminosityBlocks () const
 
void repeat ()
 Reset the remaining number of events/lumis to the maximum number. More...
 
std::pair< SharedResourcesAcquirer *, std::recursive_mutex * > resourceSharedWithDelayedReader ()
 Returns nullptr if no resource shared between the Source and a DelayedReader. More...
 
ProcessingController::ReverseState reverseState () const
 
void rewind ()
 Begin again at the first event. More...
 
RunNumber_t run () const
 Accessor for current run number. More...
 
std::shared_ptr< RunAuxiliaryrunAuxiliary () const
 Called by the framework to merge or insert run in principal cache. More...
 
void setLuminosityBlockNumber_t (LuminosityBlockNumber_t lb)
 Set the luminosity block ID. More...
 
void setRunNumber (RunNumber_t r)
 Set the run number. More...
 
void skipEvents (int offset)
 
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper () const
 Accessors for thinnedAssociationsHelper. More...
 
std::shared_ptr< ThinnedAssociationsHelper > & thinnedAssociationsHelper ()
 
Timestamp const & timestamp () const
 Accessor for the current time, as seen by the input source. More...
 
virtual ~InputSource () noexcept(false)
 Destructor. More...
 

Static Public Member Functions

static void fillDescription (ParameterSetDescription &description)
 
static void mergeIntoRegistry (SendJobHeader const &header, ProductRegistry &, BranchIDListHelper &, ThinnedAssociationsHelper &, bool subsequent)
 
static unsigned int uncompressBuffer (unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize)
 
- Static Public Member Functions inherited from edm::RawInputSource
static void fillDescription (ParameterSetDescription &description)
 
- Static Public Member Functions inherited from edm::InputSource
static const std::string & baseType ()
 
static void fillDescription (ParameterSetDescription &desc)
 
static void fillDescriptions (ConfigurationDescriptions &descriptions)
 
static void prevalidate (ConfigurationDescriptions &)
 

Protected Member Functions

void resetAfterEndRun ()
 
- Protected Member Functions inherited from edm::RawInputSource
virtual bool checkNextEvent ()=0
 
void makeEvent (EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
 
void setInputFileTransitionsEachEvent ()
 
- Protected Member Functions inherited from edm::InputSource
void decreaseRemainingEventsBy (int iSkipped)
 
bool eventCached () const
 
bool newLumi () const
 
bool newRun () const
 
ProcessHistoryRegistryprocessHistoryRegistryForUpdate ()
 
ProductRegistryproductRegistryUpdate ()
 
void reset () const
 
void resetEventCached ()
 
void resetLuminosityBlockAuxiliary (bool isNewLumi=true) const
 
void resetNewLumi ()
 
void resetNewRun ()
 
void resetRunAuxiliary (bool isNewRun=true) const
 
void setEventCached ()
 Called by the framework to merge or ached() const {return eventCached_;}. More...
 
void setLuminosityBlockAuxiliary (LuminosityBlockAuxiliary *lbp)
 
void setNewLumi ()
 
void setNewRun ()
 
void setRunAuxiliary (RunAuxiliary *rp)
 
void setTimestamp (Timestamp const &theTime)
 To set the current time, as seen by the input source. More...
 
virtual void skip (int offset)
 
ItemType state () const
 

Static Protected Member Functions

static void buildClassCache (SendDescs const &descs)
 
static void declareStreamers (SendDescs const &descs)
 

Private Member Functions

virtual void read (EventPrincipal &eventPrincipal)
 
virtual std::unique_ptr< FileBlockreadFile_ ()
 
virtual void setRun (RunNumber_t r)
 

Private Attributes

bool adjustEventToNewProductRegistry_
 
std::vector< unsigned char > dest_
 
edm::propagate_const< std::unique_ptr< EventPrincipalHolder > > eventPrincipalHolder_
 
std::string processName_
 
unsigned int protocolVersion_
 
edm::propagate_const< std::unique_ptr< SendEvent > > sendEvent_
 
std::vector< edm::propagate_const< std::unique_ptr< EventPrincipalHolder > > > streamToEventPrincipalHolders_
 
edm::propagate_const< TClass * > tc_
 
TBufferFile xbuf_
 

Additional Inherited Members

- Public Types inherited from edm::InputSource
enum  ItemType {
  IsInvalid, IsStop, IsFile, IsRun,
  IsLumi, IsEvent, IsRepeat, IsSynchronize
}
 
enum  ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents }
 
typedef ProductRegistryHelper::TypeLabelList TypeLabelList
 
- Public Attributes inherited from edm::InputSource
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> postEventReadFromSourceSignal_
 
signalslot::Signal< void(StreamContext const &, ModuleCallingContext const &)> preEventReadFromSourceSignal_
 

Detailed Description

Definition at line 32 of file StreamerInputSource.h.

Constructor & Destructor Documentation

edm::StreamerInputSource::StreamerInputSource ( ParameterSet const &  pset,
InputSourceDescription const &  desc 
)
explicit

Definition at line 43 of file StreamerInputSource.cc.

45  :
46  RawInputSource(pset, desc),
47  tc_(getTClass(typeid(SendEvent))),
49  xbuf_(TBuffer::kRead, init_size),
50  sendEvent_(),
53  processName_(),
55  }
RawInputSource(ParameterSet const &pset, InputSourceDescription const &desc)
edm::propagate_const< std::unique_ptr< EventPrincipalHolder > > eventPrincipalHolder_
edm::propagate_const< std::unique_ptr< SendEvent > > sendEvent_
TClass * getTClass(const std::type_info &ti)
edm::propagate_const< TClass * > tc_
const int init_size
std::vector< unsigned char > dest_
edm::StreamerInputSource::~StreamerInputSource ( )
virtual

Definition at line 57 of file StreamerInputSource.cc.

57 {}

Member Function Documentation

void edm::StreamerInputSource::buildClassCache ( SendDescs const &  descs)
staticprotected

Definition at line 119 of file StreamerInputSource.cc.

References edm::doBuildRealData(), FDEBUG, AlCaHLTBitMon_QueryRunRegistry::string, and edm::wrappedClassName().

Referenced by mergeIntoRegistry().

119  {
120  for(auto const& item : descs) {
121  //pi->init();
122  std::string const real_name = wrappedClassName(item.className());
123  FDEBUG(6) << "BuildReadData: " << real_name << std::endl;
124  doBuildRealData(real_name);
125  }
126  }
void doBuildRealData(const std::string &name)
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::string wrappedClassName(std::string const &iFullName)
void edm::StreamerInputSource::declareStreamers ( SendDescs const &  descs)
staticprotected

Definition at line 98 of file StreamerInputSource.cc.

References FDEBUG, edm::loadCap(), AlCaHLTBitMon_QueryRunRegistry::string, edm::throwMissingDictionariesException(), and edm::wrappedClassName().

Referenced by mergeIntoRegistry().

98  {
99  std::vector<std::string> missingDictionaries;
100  std::vector<std::string> branchNamesForMissing;
101  std::vector<std::string> producedTypes;
102  for (auto const& item : descs) {
103  //pi->init();
104  std::string const real_name = wrappedClassName(item.className());
105  FDEBUG(6) << "declare: " << real_name << std::endl;
106  if (!loadCap(real_name, missingDictionaries)) {
107  branchNamesForMissing.emplace_back(item.branchName());
108  producedTypes.emplace_back(item.className() + std::string(" (read from input)"));
109  }
110  }
111  if (!missingDictionaries.empty()) {
112  std::string context("Calling StreamerInputSource::declareStreamers, checking dictionaries for input types");
113  throwMissingDictionariesException(missingDictionaries, context, producedTypes, branchNamesForMissing, true);
114  }
115  }
void throwMissingDictionariesException(std::vector< std::string > &missingDictionaries, std::string const &context)
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::string wrappedClassName(std::string const &iFullName)
bool loadCap(const std::string &name, std::vector< std::string > &missingDictionaries)
void edm::StreamerInputSource::deserializeAndMergeWithRegistry ( InitMsgView const &  initView,
bool  subsequent = false 
)

Deserializes the specified init message into a SendJobHeader object and merges registries.

Definition at line 182 of file StreamerInputSource.cc.

References adjustEventToNewProductRegistry_, edm::InputSource::branchIDListHelper(), deserializeRegistry(), edm::pset::Registry::insertMapped(), edm::pset::Registry::instance(), mergeIntoRegistry(), edm::InputSource::productRegistryUpdate(), muonDTDigis_cfi::pset, sd, and edm::InputSource::thinnedAssociationsHelper().

Referenced by edm::StreamerFileReader::checkNextEvent(), edm::StreamerInputModule< Producer >::checkNextEvent(), dqmservices::DQMStreamerReader::checkNextEvent(), dqmservices::DQMStreamerReader::openFile_(), edm::StreamerFileReader::reset_(), and edm::StreamerInputModule< Producer >::StreamerInputModule().

182  {
183  std::unique_ptr<SendJobHeader> sd = deserializeRegistry(initView);
185  if (subsequent) {
187  }
188  SendJobHeader::ParameterSetMap const& psetMap = sd->processParameterSet();
189  pset::Registry& psetRegistry = *pset::Registry::instance();
190  for (auto const& item : psetMap) {
191  ParameterSet pset(item.second.pset());
192  pset.setID(item.first);
193  psetRegistry.insertMapped(pset);
194  }
195  }
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:344
static void mergeIntoRegistry(SendJobHeader const &header, ProductRegistry &, BranchIDListHelper &, ThinnedAssociationsHelper &, bool subsequent)
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
std::unique_ptr< SendJobHeader > deserializeRegistry(InitMsgView const &initView)
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Accessors for branchIDListHelper.
Definition: InputSource.h:172
double sd
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Accessors for thinnedAssociationsHelper.
Definition: InputSource.h:176
static Registry * instance()
Definition: Registry.cc:12
void edm::StreamerInputSource::deserializeEvent ( EventMsgView const &  eventView)

Deserializes the specified event message.

Definition at line 201 of file StreamerInputSource.cc.

References cms::Adler32(), EventMsgView::adler32_chksum(), EventMsgView::code(), popcon2dropbox::copy(), dest_, Header::EVENT, EventMsgView::event(), EventMsgView::eventData(), EventMsgView::eventLength(), eventPrincipalHolder_, Exception, FDEBUG, EventMsgView::hostName(), edm::Timestamp::invalidTimestamp(), EventMsgView::lumi(), edm::InputSource::luminosityBlock(), edm::InputSource::luminosityBlockAuxiliary(), EventMsgView::origDataSize(), edm::InputSource::processHistoryRegistryForUpdate(), edm::ProcessHistoryRegistry::registerProcessHistory(), edm::InputSource::resetLuminosityBlockAuxiliary(), EventMsgView::run(), edm::InputSource::run(), edm::InputSource::runAuxiliary(), sendEvent_, edm::InputSource::setEventCached(), edm::InputSource::setLuminosityBlockAuxiliary(), edm::RunAuxiliary::setProcessHistoryID(), edm::LuminosityBlockAuxiliary::setProcessHistoryID(), edm::setRefCoreStreamer(), edm::InputSource::setRunAuxiliary(), EventMsgView::size(), tc_, uncompressBuffer(), and xbuf_.

Referenced by edm::StreamerFileReader::checkNextEvent(), dqmservices::DQMStreamerReader::checkNextEvent(), and edm::StreamerInputModule< Producer >::checkNextEvent().

201  {
202  if(eventView.code() != Header::EVENT)
203  throw cms::Exception("StreamTranslation","Event deserialization error")
204  << "received wrong message type: expected EVENT, got "
205  << eventView.code() << "\n";
206  FDEBUG(9) << "Decode event: "
207  << eventView.event() << " "
208  << eventView.run() << " "
209  << eventView.size() << " "
210  << eventView.adler32_chksum() << " "
211  << eventView.eventLength() << " "
212  << eventView.eventData()
213  << std::endl;
214  // uncompress if we need to
215  // 78 was a dummy value (for no uncompressed) - should be 0 for uncompressed
216  // need to get rid of this when 090 MTCC streamers are gotten rid of
217  unsigned long origsize = eventView.origDataSize();
218  unsigned long dest_size; //(should be >= eventView.origDataSize())
219 
220  uint32_t adler32_chksum = cms::Adler32((char const*)eventView.eventData(), eventView.eventLength());
221  //std::cout << "Adler32 checksum of event = " << adler32_chksum << std::endl;
222  //std::cout << "Adler32 checksum from header = " << eventView.adler32_chksum() << " "
223  // << "host name = " << eventView.hostName() << " len = " << eventView.hostName_len() << std::endl;
224  if((uint32)adler32_chksum != eventView.adler32_chksum()) {
225  // skip event (based on option?) or throw exception?
226  throw cms::Exception("StreamDeserialization", "Checksum error")
227  << " chksum from event = " << adler32_chksum << " from header = "
228  << eventView.adler32_chksum() << " host name = " << eventView.hostName() << std::endl;
229  }
230  if(origsize != 78 && origsize != 0) {
231  // compressed
232  dest_size = uncompressBuffer(const_cast<unsigned char*>((unsigned char const*)eventView.eventData()),
233  eventView.eventLength(), dest_, origsize);
234  } else { // not compressed
235  // we need to copy anyway the buffer as we are using dest in xbuf
236  dest_size = eventView.eventLength();
237  dest_.resize(dest_size);
238  unsigned char* pos = (unsigned char*) &dest_[0];
239  unsigned char const* from = (unsigned char const*) eventView.eventData();
240  std::copy(from,from+dest_size,pos);
241  }
242  //TBuffer xbuf(TBuffer::kRead, dest_size,
243  // (char const*) &dest[0],kFALSE);
244  //TBuffer xbuf(TBuffer::kRead, eventView.eventLength(),
245  // (char const*) eventView.eventData(),kFALSE);
246  xbuf_.Reset();
247  xbuf_.SetBuffer(&dest_[0],dest_size,kFALSE);
248  RootDebug tracer(10,10);
249 
250  //We do not yet know which EventPrincipal we will use, therefore
251  // we are using a new EventPrincipalHolder as a proxy. We need to
252  // make a new one instead of reusing the same one becuase when running
253  // multi-threaded there will be multiple EventPrincipals being used
254  // simultaneously.
255  eventPrincipalHolder_ = std::make_unique<EventPrincipalHolder>(); // propagate_const<T> has no reset() function
257  sendEvent_ = std::unique_ptr<SendEvent>((SendEvent*)xbuf_.ReadObjectAny(tc_));
259 
260  if(sendEvent_.get() == nullptr) {
261  throw cms::Exception("StreamTranslation","Event deserialization error")
262  << "got a null event from input stream\n";
263  }
264  processHistoryRegistryForUpdate().registerProcessHistory(sendEvent_->processHistory());
265 
266  FDEBUG(5) << "Got event: " << sendEvent_->aux().id() << " " << sendEvent_->products().size() << std::endl;
267  if(runAuxiliary().get() == nullptr || runAuxiliary()->run() != sendEvent_->aux().run()) {
268  RunAuxiliary* runAuxiliary = new RunAuxiliary(sendEvent_->aux().run(), sendEvent_->aux().time(), Timestamp::invalidTimestamp());
269  runAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id());
270  setRunAuxiliary(runAuxiliary);
272  }
273  if(!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != eventView.lumi()) {
274  LuminosityBlockAuxiliary* luminosityBlockAuxiliary =
275  new LuminosityBlockAuxiliary(runAuxiliary()->run(), eventView.lumi(), sendEvent_->aux().time(), Timestamp::invalidTimestamp());
276  luminosityBlockAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id());
277  setLuminosityBlockAuxiliary(luminosityBlockAuxiliary);
278  }
279  setEventCached();
280  }
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
bool registerProcessHistory(ProcessHistory const &processHistory)
void setRefCoreStreamer(bool resetAll=false)
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:534
#define FDEBUG(lev)
Definition: DebugMacros.h:18
edm::propagate_const< std::unique_ptr< EventPrincipalHolder > > eventPrincipalHolder_
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:351
unsigned int uint32
Definition: MsgTools.h:13
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:376
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:540
edm::propagate_const< std::unique_ptr< SendEvent > > sendEvent_
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:252
edm::propagate_const< TClass * > tc_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:359
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:345
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:347
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:255
static unsigned int uncompressBuffer(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize)
std::vector< unsigned char > dest_
std::unique_ptr< SendJobHeader > edm::StreamerInputSource::deserializeRegistry ( InitMsgView const &  initView)

Deserializes the specified init message into a SendJobHeader object (which is related to the product registry).

Definition at line 133 of file StreamerInputSource.cc.

References cms::Adler32(), InitMsgView::adler32_chksum(), InitMsgView::code(), InitMsgView::descData(), InitMsgView::descLength(), Exception, FDEBUG, edm::getTClass(), InitMsgView::hostName(), Header::INIT, InitMsgView::processName(), processName_, InitMsgView::protocolVersion(), protocolVersion_, and sd.

Referenced by deserializeAndMergeWithRegistry().

133  {
134  if(initView.code() != Header::INIT)
135  throw cms::Exception("StreamTranslation","Registry deserialization error")
136  << "received wrong message type: expected INIT, got "
137  << initView.code() << "\n";
138 
139  //Get the process name and store if for Protocol version 4 and above.
140  if (initView.protocolVersion() > 3) {
141 
142  processName_ = initView.processName();
143  protocolVersion_ = initView.protocolVersion();
144 
145  FDEBUG(10) << "StreamerInputSource::deserializeRegistry processName = "<< processName_<< std::endl;
146  FDEBUG(10) << "StreamerInputSource::deserializeRegistry protocolVersion_= "<< protocolVersion_<< std::endl;
147  }
148 
149  // calculate the adler32 checksum
150  uint32_t adler32_chksum = cms::Adler32((char const*)initView.descData(),initView.descLength());
151  //std::cout << "Adler32 checksum of init message = " << adler32_chksum << std::endl;
152  //std::cout << "Adler32 checksum of init messsage from header = " << initView.adler32_chksum() << " "
153  // << "host name = " << initView.hostName() << " len = " << initView.hostName_len() << std::endl;
154  if((uint32)adler32_chksum != initView.adler32_chksum()) {
155  // skip event (based on option?) or throw exception?
156  throw cms::Exception("StreamDeserialization", "Checksum error")
157  << " chksum from registry data = " << adler32_chksum << " from header = "
158  << initView.adler32_chksum() << " host name = " << initView.hostName() << std::endl;
159  }
160 
161  TClass* desc = getTClass(typeid(SendJobHeader));
162 
163  TBufferFile xbuf(TBuffer::kRead, initView.descLength(),
164  const_cast<char*>((char const*)initView.descData()),kFALSE);
165  RootDebug tracer(10,10);
166  std::unique_ptr<SendJobHeader> sd((SendJobHeader*)xbuf.ReadObjectAny(desc));
167 
168  if(sd.get() == nullptr) {
169  throw cms::Exception("StreamTranslation","Registry deserialization error")
170  << "Could not read the initial product registry list\n";
171  }
172 
173  sd->initializeTransients();
174  return sd;
175  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
unsigned int uint32
Definition: MsgTools.h:13
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
double sd
TClass * getTClass(const std::type_info &ti)
void edm::StreamerInputSource::fillDescription ( ParameterSetDescription description)
static

Definition at line 439 of file StreamerInputSource.cc.

References edm::RawInputSource::fillDescription().

Referenced by edm::StreamerFileReader::fillDescriptions(), and dqmservices::DQMStreamerReader::fillDescriptions().

439  {
441  }
static void fillDescription(ParameterSetDescription &description)
void edm::StreamerInputSource::mergeIntoRegistry ( SendJobHeader const &  header,
ProductRegistry reg,
BranchIDListHelper branchIDListHelper,
ThinnedAssociationsHelper thinnedHelper,
bool  subsequent 
)
static

Definition at line 66 of file StreamerInputSource.cc.

References edm::SendJobHeader::branchIDLists(), buildClassCache(), declareStreamers(), edm::SendJobHeader::descs(), Exception, FDEBUG, edm::ProductRegistry::frozen(), edm::loadExtraClasses(), edm::ProductRegistry::merge(), edm::BranchDescription::Permissive, AlCaHLTBitMon_QueryRunRegistry::string, edm::SendJobHeader::thinnedAssociationsHelper(), edm::BranchIDListHelper::updateFromInput(), edm::ProductRegistry::updateFromInput(), and edm::ThinnedAssociationsHelper::updateFromPrimaryInput().

Referenced by deserializeAndMergeWithRegistry().

70  {
71 
72  SendDescs const& descs = header.descs();
73 
74  FDEBUG(6) << "mergeIntoRegistry: Product List: " << std::endl;
75 
76  if (subsequent) {
77  ProductRegistry pReg;
78  pReg.updateFromInput(descs);
79  std::string mergeInfo = reg.merge(pReg, std::string(), BranchDescription::Permissive);
80  if (!mergeInfo.empty()) {
81  throw cms::Exception("MismatchedInput","RootInputFileSequence::previousEvent()") << mergeInfo;
82  }
83  branchIDListHelper.updateFromInput(header.branchIDLists());
84  thinnedHelper.updateFromPrimaryInput(header.thinnedAssociationsHelper());
85  } else {
86  declareStreamers(descs);
87  buildClassCache(descs);
89  if(!reg.frozen()) {
90  reg.updateFromInput(descs);
91  }
92  branchIDListHelper.updateFromInput(header.branchIDLists());
93  thinnedHelper.updateFromPrimaryInput(header.thinnedAssociationsHelper());
94  }
95  }
std::vector< BranchDescription > SendDescs
#define FDEBUG(lev)
Definition: DebugMacros.h:18
static void declareStreamers(SendDescs const &descs)
static void buildClassCache(SendDescs const &descs)
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Accessors for branchIDListHelper.
Definition: InputSource.h:172
void loadExtraClasses()
Definition: ClassFiller.cc:34
void edm::StreamerInputSource::read ( EventPrincipal eventPrincipal)
privatevirtual

Implements edm::RawInputSource.

Definition at line 283 of file StreamerInputSource.cc.

References adjustEventToNewProductRegistry_, edm::Principal::adjustIndexesAfterProductRegistryAddition(), edm::Principal::adjustToNewProductRegistry(), edm::InputSource::branchIDListHelper(), eventPrincipalHolder_, FDEBUG, edm::EventPrincipal::fillEventPrincipal(), eostools::move(), edm::InputSource::processHistoryRegistry(), edm::InputSource::productRegistry(), edm::EventPrincipal::putOnRead(), sendEvent_, edm::Principal::size(), edm::EventPrincipal::streamID(), streamToEventPrincipalHolders_, and edm::StreamID::value().

Referenced by edmIntegrityCheck.PublishToFileSystem::get().

283  {
285  eventPrincipal.adjustIndexesAfterProductRegistryAddition();
286  bool eventOK = eventPrincipal.adjustToNewProductRegistry(*productRegistry());
287  assert(eventOK);
289  }
290  EventSelectionIDVector ids(sendEvent_->eventSelectionIDs());
291  BranchListIndexes indexes(sendEvent_->branchListIndexes());
292  branchIDListHelper()->fixBranchListIndexes(indexes);
293  eventPrincipal.fillEventPrincipal(sendEvent_->aux(), processHistoryRegistry(), std::move(ids), std::move(indexes));
294 
295  //We now know which eventPrincipal to use and we can reuse the slot in
296  // streamToEventPrincipalHolders to own the memory
297  eventPrincipalHolder_->setEventPrincipal(&eventPrincipal);
298  if(streamToEventPrincipalHolders_.size() < eventPrincipal.streamID().value() +1) {
299  streamToEventPrincipalHolders_.resize(eventPrincipal.streamID().value() +1);
300  }
301  streamToEventPrincipalHolders_[eventPrincipal.streamID().value()] = std::move(eventPrincipalHolder_);
302 
303  // no process name list handling
304 
305  SendProds& sps = sendEvent_->products();
306  for(auto& spitem : sps) {
307  FDEBUG(10) << "check prodpair" << std::endl;
308  if(spitem.desc() == nullptr)
309  throw cms::Exception("StreamTranslation","Empty Provenance");
310  FDEBUG(5) << "Prov:"
311  << " " << spitem.desc()->className()
312  << " " << spitem.desc()->productInstanceName()
313  << " " << spitem.desc()->branchID()
314  << std::endl;
315 
316  BranchDescription const branchDesc(*spitem.desc());
317  // This ProductProvenance constructor inserts into the entry description registry
318  if (spitem.parents()) {
319  ProductProvenance productProvenance(spitem.branchID(), *spitem.parents());
320  if(spitem.prod() != nullptr) {
321  FDEBUG(10) << "addproduct next " << spitem.branchID() << std::endl;
322  eventPrincipal.putOnRead(branchDesc, std::unique_ptr<WrapperBase>(const_cast<WrapperBase*>(spitem.prod())), &productProvenance);
323  FDEBUG(10) << "addproduct done" << std::endl;
324  } else {
325  FDEBUG(10) << "addproduct empty next " << spitem.branchID() << std::endl;
326  eventPrincipal.putOnRead(branchDesc, std::unique_ptr<WrapperBase>(), &productProvenance);
327  FDEBUG(10) << "addproduct empty done" << std::endl;
328  }
329  } else {
330  ProductProvenance const* productProvenance = nullptr;
331  if(spitem.prod() != nullptr) {
332  FDEBUG(10) << "addproduct next " << spitem.branchID() << std::endl;
333  eventPrincipal.putOnRead(branchDesc, std::unique_ptr<WrapperBase>(const_cast<WrapperBase*>(spitem.prod())), productProvenance);
334  FDEBUG(10) << "addproduct done" << std::endl;
335  } else {
336  FDEBUG(10) << "addproduct empty next " << spitem.branchID() << std::endl;
337  eventPrincipal.putOnRead(branchDesc, std::unique_ptr<WrapperBase>(), productProvenance);
338  FDEBUG(10) << "addproduct empty done" << std::endl;
339  }
340  }
341  spitem.clear();
342  }
343 
344  FDEBUG(10) << "Size = " << eventPrincipal.size() << std::endl;
345  }
ProcessHistoryRegistry const & processHistoryRegistry() const
Accessors for process history registry.
Definition: InputSource.h:168
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< EventSelectionID > EventSelectionIDVector
std::vector< BranchListIndex > BranchListIndexes
edm::propagate_const< std::unique_ptr< EventPrincipalHolder > > eventPrincipalHolder_
std::vector< StreamedProduct > SendProds
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Accessors for branchIDListHelper.
Definition: InputSource.h:172
edm::propagate_const< std::unique_ptr< SendEvent > > sendEvent_
std::shared_ptr< ProductRegistry const > productRegistry() const
Accessors for product registry.
Definition: InputSource.h:164
std::vector< edm::propagate_const< std::unique_ptr< EventPrincipalHolder > > > streamToEventPrincipalHolders_
def move(src, dest)
Definition: eostools.py:510
std::unique_ptr< FileBlock > edm::StreamerInputSource::readFile_ ( )
privatevirtual

Reimplemented from edm::InputSource.

Definition at line 61 of file StreamerInputSource.cc.

61  {
62  return std::make_unique<FileBlock>();
63  }
void edm::StreamerInputSource::resetAfterEndRun ( )
protected

Definition at line 387 of file StreamerInputSource.cc.

References edm::InputSource::eventCached(), edm::InputSource::reset(), edm::InputSource::resetLuminosityBlockAuxiliary(), and edm::InputSource::resetRunAuxiliary().

387  {
388  // called from an online streamer source to reset after a stop command
389  // so an enable command will work
392  assert(!eventCached());
393  reset();
394  }
void resetRunAuxiliary(bool isNewRun=true) const
Definition: InputSource.h:355
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:359
void reset() const
Definition: InputSource.h:363
bool eventCached() const
Definition: InputSource.h:374
void edm::StreamerInputSource::setRun ( RunNumber_t  r)
privatevirtual

Reimplemented from edm::InputSource.

Definition at line 396 of file StreamerInputSource.cc.

References Exception, and edm::errors::LogicError.

396  {
397  // Need to define a dummy setRun here or else the InputSource::setRun is called
398  // if we have a source inheriting from this and wants to define a setRun method
400  << "StreamerInputSource::setRun()\n"
401  << "Run number cannot be modified for this type of Input Source\n"
402  << "Contact a Storage Manager Developer\n";
403  }
unsigned int edm::StreamerInputSource::uncompressBuffer ( unsigned char *  inputBuffer,
unsigned int  inputSize,
std::vector< unsigned char > &  outputBuffer,
unsigned int  expectedFullSize 
)
static

Uncompresses the data in the specified input buffer into the specified output buffer. The inputSize should be set to the size of the compressed data in the inputBuffer. The expectedFullSize should be set to the original size of the data (before compression). Returns the actual size of the uncompressed data. Errors are reported by throwing exceptions.

Definition at line 356 of file StreamerInputSource.cc.

References Exception, and FDEBUG.

Referenced by deserializeEvent().

359  {
360  unsigned long origSize = expectedFullSize;
361  unsigned long uncompressedSize = expectedFullSize*1.1;
362  FDEBUG(1) << "Uncompress: original size = " << origSize
363  << ", compressed size = " << inputSize
364  << std::endl;
365  outputBuffer.resize(uncompressedSize);
366  int ret = uncompress(&outputBuffer[0], &uncompressedSize,
367  inputBuffer, inputSize); // do not need compression level
368  //std::cout << "unCompress Return value: " << ret << " Okay = " << Z_OK << std::endl;
369  if(ret == Z_OK) {
370  // check the length against original uncompressed length
371  FDEBUG(10) << " original size = " << origSize << " final size = "
372  << uncompressedSize << std::endl;
373  if(origSize != uncompressedSize) {
374  // we throw an error and return without event! null pointer
375  throw cms::Exception("StreamDeserialization","Uncompression error")
376  << "mismatch event lengths should be" << origSize << " got "
377  << uncompressedSize << "\n";
378  }
379  } else {
380  // we throw an error and return without event! null pointer
381  throw cms::Exception("StreamDeserialization","Uncompression error")
382  << "Error code = " << ret << "\n ";
383  }
384  return (unsigned int) uncompressedSize;
385  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18

Member Data Documentation

bool edm::StreamerInputSource::adjustEventToNewProductRegistry_
private

Definition at line 104 of file StreamerInputSource.h.

Referenced by deserializeAndMergeWithRegistry(), and read().

std::vector<unsigned char> edm::StreamerInputSource::dest_
private

Definition at line 99 of file StreamerInputSource.h.

Referenced by deserializeEvent().

edm::propagate_const<std::unique_ptr<EventPrincipalHolder> > edm::StreamerInputSource::eventPrincipalHolder_
private

Definition at line 102 of file StreamerInputSource.h.

Referenced by deserializeEvent(), and read().

std::string edm::StreamerInputSource::processName_
private

Definition at line 106 of file StreamerInputSource.h.

Referenced by deserializeRegistry().

unsigned int edm::StreamerInputSource::protocolVersion_
private

Definition at line 107 of file StreamerInputSource.h.

Referenced by deserializeRegistry().

edm::propagate_const<std::unique_ptr<SendEvent> > edm::StreamerInputSource::sendEvent_
private

Definition at line 101 of file StreamerInputSource.h.

Referenced by deserializeEvent(), and read().

std::vector<edm::propagate_const<std::unique_ptr<EventPrincipalHolder> > > edm::StreamerInputSource::streamToEventPrincipalHolders_
private

Definition at line 103 of file StreamerInputSource.h.

Referenced by read().

edm::propagate_const<TClass*> edm::StreamerInputSource::tc_
private

Definition at line 98 of file StreamerInputSource.h.

Referenced by deserializeEvent().

TBufferFile edm::StreamerInputSource::xbuf_
private

Definition at line 100 of file StreamerInputSource.h.

Referenced by deserializeEvent().