CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Classes | Public Member Functions | Static Public Member Functions | Protected Member Functions | Static Protected Member Functions | Protected Attributes | Private Member Functions | Private Attributes | Static Private Attributes
edm::StreamerInputSource Class Referenceabstract

#include <StreamerInputSource.h>

Inheritance diagram for edm::StreamerInputSource:
edm::InputSource edm::ProductRegistryHelper edm::EventStreamHttpReader edm::StreamerFileReader edm::StreamerInputModule< Producer >

Classes

class  ProductGetter
 

Public Member Functions

void deserializeAndMergeWithRegistry (InitMsgView const &initView, bool subsequent=false)
 
EventPrincipaldeserializeEvent (EventMsgView const &eventView)
 
 StreamerInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
 
virtual ~StreamerInputSource ()
 
- Public Member Functions inherited from edm::InputSource
boost::shared_ptr
< ActivityRegistry
actReg () const
 Accessor for Activity Registry. More...
 
void closeFile (boost::shared_ptr< FileBlock >, bool cleaningUpAfterException)
 close current file More...
 
void doBeginJob ()
 Called by framework at beginning of job. More...
 
void doBeginLumi (LuminosityBlockPrincipal &lbp)
 Called by framework at beginning of lumi block. More...
 
void doBeginRun (RunPrincipal &rp)
 Called by framework at beginning of run. More...
 
void doEndJob ()
 Called by framework at end of job. More...
 
void doEndLumi (LuminosityBlockPrincipal &lbp, bool cleaningUpAfterException)
 Called by framework at end of lumi block. More...
 
void doEndRun (RunPrincipal &rp, bool cleaningUpAfterException)
 Called by framework at end of run. More...
 
void doPostForkReacquireResources (boost::shared_ptr< multicore::MessageReceiverForSource >)
 
void doPreForkReleaseResources ()
 Called by the framework before forking the process. More...
 
ProcessingController::ForwardState forwardState () const
 
bool goToEvent (EventID const &eventID)
 
 InputSource (ParameterSet const &, InputSourceDescription const &)
 Constructor. More...
 
void issueReports (EventID const &eventID)
 issue an event report More...
 
LuminosityBlockNumber_t luminosityBlock () const
 Accessor for current luminosity block number. More...
 
boost::shared_ptr
< LuminosityBlockAuxiliary
luminosityBlockAuxiliary () const
 Called by the framework to merge or insert lumi in principal cache. More...
 
int markLumi ()
 Mark lumi as read. More...
 
int markRun ()
 Mark run as read. More...
 
int maxEvents () const
 
int maxLuminosityBlocks () const
 
ModuleDescription const & moduleDescription () const
 Accessor for 'module' description. More...
 
ItemType nextItemType ()
 
bool primary () const
 Accessor for primary input source flag. More...
 
ProcessConfiguration const & processConfiguration () const
 Accessor for Process Configuration. More...
 
std::string const & processGUID () const
 Accessor for global process identifier. More...
 
ProcessingMode processingMode () const
 RunsLumisAndEvents (default), RunsAndLumis, or Runs. More...
 
boost::shared_ptr
< ProductRegistry const > 
productRegistry () const
 Accessor for product registry. More...
 
bool randomAccess () const
 
void readAndCacheLumi (bool merge, HistoryAppender &historyAppender)
 Read next luminosity block. More...
 
void readAndCacheRun (bool merge, HistoryAppender &historyAppender)
 Read next run. More...
 
EventPrincipalreadEvent (boost::shared_ptr< LuminosityBlockPrincipal > lbCache)
 
EventPrincipalreadEvent (EventID const &)
 Read a specific event. More...
 
boost::shared_ptr< FileBlockreadFile ()
 Read next file. More...
 
boost::shared_ptr
< LuminosityBlockAuxiliary
readLuminosityBlockAuxiliary ()
 Read next luminosity block Auxilary. More...
 
boost::shared_ptr< RunAuxiliaryreadRunAuxiliary ()
 Read next run Auxiliary. More...
 
ProcessHistoryID const & reducedProcessHistoryID () const
 
void registerProducts ()
 Register any produced products. More...
 
int remainingEvents () const
 
int remainingLuminosityBlocks () const
 
void repeat ()
 Reset the remaining number of events/lumis to the maximum number. More...
 
ProcessingController::ReverseState reverseState () const
 
void rewind ()
 Begin again at the first event. More...
 
RunNumber_t run () const
 Accessor for current run number. More...
 
boost::shared_ptr< RunAuxiliaryrunAuxiliary () const
 Called by the framework to merge or insert run in principal cache. More...
 
void setLuminosityBlockNumber_t (LuminosityBlockNumber_t lb)
 Set the luminosity block ID. More...
 
void setRunNumber (RunNumber_t r)
 Set the run number. More...
 
void skipEvents (int offset)
 
Timestamp const & timestamp () const
 Accessor for the current time, as seen by the input source. More...
 
virtual ~InputSource ()
 Destructor. More...
 

Static Public Member Functions

static std::auto_ptr
< SendJobHeader
deserializeRegistry (InitMsgView const &initView)
 
static void fillDescription (ParameterSetDescription &description)
 
static void mergeIntoRegistry (SendJobHeader const &header, ProductRegistry &, bool subsequent)
 
static unsigned int uncompressBuffer (unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize)
 
- Static Public Member Functions inherited from edm::InputSource
static const std::string & baseType ()
 
static void fillDescription (ParameterSetDescription &desc)
 
static void fillDescriptions (ConfigurationDescriptions &descriptions)
 
static void prevalidate (ConfigurationDescriptions &)
 

Protected Member Functions

void resetAfterEndRun ()
 
void setEndRun ()
 
- Protected Member Functions inherited from edm::InputSource
void decreaseRemainingEventsBy (int iSkipped)
 
EventPrincipaleventPrincipalCache ()
 
boost::shared_ptr
< LuminosityBlockPrincipal >
const 
luminosityBlockPrincipal () const
 
PrincipalCache const & principalCache () const
 
PrincipalCacheprincipalCache ()
 
ProductRegistryproductRegistryUpdate () const
 
void reset () const
 
void resetLuminosityBlockAuxiliary () const
 
void resetRunAuxiliary () const
 
boost::shared_ptr
< RunPrincipal > const 
runPrincipal () const
 
void setLuminosityBlockAuxiliary (LuminosityBlockAuxiliary *lbp)
 
void setRunAuxiliary (RunAuxiliary *rp)
 
void setTimestamp (Timestamp const &theTime)
 To set the current time, as seen by the input source. More...
 
ItemType state () const
 

Static Protected Member Functions

static void buildClassCache (SendDescs const &descs)
 
static void declareStreamers (SendDescs const &descs)
 

Protected Attributes

bool inputFileTransitionsEachEvent_
 

Private Member Functions

virtual ItemType getNextItemType ()
 
virtual EventPrincipalread ()=0
 
virtual EventPrincipalreadEvent_ ()
 
virtual boost::shared_ptr
< FileBlock
readFile_ ()
 
virtual boost::shared_ptr
< LuminosityBlockAuxiliary
readLuminosityBlockAuxiliary_ ()
 
virtual boost::shared_ptr
< RunAuxiliary
readRunAuxiliary_ ()
 
virtual void setRun (RunNumber_t r)
 

Private Attributes

std::vector< unsigned char > dest_
 
bool eventCached_
 
bool newLumi_
 
bool newRun_
 
ProductGetter productGetter_
 
bool runEndingFlag_
 
TClass * tc_
 
TBufferFile xbuf_
 

Static Private Attributes

static std::string processName_
 
static unsigned int protocolVersion_
 

Additional Inherited Members

- Public Types inherited from edm::InputSource
enum  ItemType {
  IsInvalid, IsStop, IsFile, IsRun,
  IsLumi, IsEvent, IsRepeat
}
 
enum  ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents }
 
typedef
ProductRegistryHelper::TypeLabelList 
TypeLabelList
 

Detailed Description

Definition at line 29 of file StreamerInputSource.h.

Constructor & Destructor Documentation

edm::StreamerInputSource::StreamerInputSource ( ParameterSet const &  pset,
InputSourceDescription const &  desc 
)
explicit

Definition at line 49 of file StreamerInputSource.cc.

51  :
52  InputSource(pset, desc),
53  // The default value for the following parameter get defined in at least one derived class
54  // where it has a different default value.
56  pset.getUntrackedParameter<bool>("inputFileTransitionsEachEvent", false)),
57  newRun_(true),
58  newLumi_(true),
59  eventCached_(false),
60  tc_(getTClass(typeid(SendEvent))),
62  xbuf_(TBuffer::kRead, init_size),
63  runEndingFlag_(false),
64  productGetter_() {
65  }
TClass * getTClass(const std::type_info &ti)
Definition: ClassFiller.cc:87
const int init_size
InputSource(ParameterSet const &, InputSourceDescription const &)
Constructor.
Definition: InputSource.cc:54
std::vector< unsigned char > dest_
edm::StreamerInputSource::~StreamerInputSource ( )
virtual

Definition at line 67 of file StreamerInputSource.cc.

67 {}

Member Function Documentation

void edm::StreamerInputSource::buildClassCache ( SendDescs const &  descs)
staticprotected

Definition at line 115 of file StreamerInputSource.cc.

References edm::doBuildRealData(), alignCSCRings::e, FDEBUG, i, and edm::wrappedClassName().

Referenced by mergeIntoRegistry().

115  {
116  SendDescs::const_iterator i(descs.begin()), e(descs.end());
117 
118  for(; i != e; ++i) {
119  //pi->init();
120  std::string const real_name = wrappedClassName(i->className());
121  FDEBUG(6) << "BuildReadData: " << real_name << std::endl;
122  doBuildRealData(real_name);
123  }
124  }
int i
Definition: DBlmapReader.cc:9
void doBuildRealData(const std::string &name)
Definition: ClassFiller.cc:34
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::string wrappedClassName(std::string const &iFullName)
void edm::StreamerInputSource::declareStreamers ( SendDescs const &  descs)
staticprotected

Definition at line 102 of file StreamerInputSource.cc.

References alignCSCRings::e, FDEBUG, i, edm::loadCap(), and edm::wrappedClassName().

Referenced by mergeIntoRegistry().

102  {
103  SendDescs::const_iterator i(descs.begin()), e(descs.end());
104 
105  for(; i != e; ++i) {
106  //pi->init();
107  std::string const real_name = wrappedClassName(i->className());
108  FDEBUG(6) << "declare: " << real_name << std::endl;
109  loadCap(real_name);
110  }
111  }
int i
Definition: DBlmapReader.cc:9
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::string wrappedClassName(std::string const &iFullName)
void loadCap(const std::string &name)
Definition: ClassFiller.cc:22
void edm::StreamerInputSource::deserializeAndMergeWithRegistry ( InitMsgView const &  initView,
bool  subsequent = false 
)

Deserializes the specified init message into a SendJobHeader object and merges registries.

Definition at line 241 of file StreamerInputSource.cc.

References edm::PrincipalCache::adjustEventToNewProductRegistry(), deserializeRegistry(), i, edm::detail::ThreadSafeRegistry< KEY, T, E >::insertMapped(), edm::detail::ThreadSafeRegistry< KEY, T, E >::instance(), mergeIntoRegistry(), edm::InputSource::principalCache(), edm::InputSource::productRegistry(), edm::InputSource::productRegistryUpdate(), sd, and edm::ParameterSet::setID().

Referenced by edm::StreamerFileReader::read(), edm::EventStreamHttpReader::readHeader(), edm::StreamerFileReader::StreamerFileReader(), and edm::StreamerInputModule< Producer >::StreamerInputModule().

241  {
242  std::auto_ptr<SendJobHeader> sd = deserializeRegistry(initView);
243  ProcessConfigurationVector const& pcv = sd->processConfigurations();
244  mergeIntoRegistry(*sd, productRegistryUpdate(), subsequent);
245  if (subsequent) {
247  }
248  SendJobHeader::ParameterSetMap const& psetMap = sd->processParameterSet();
249  pset::Registry& psetRegistry = *pset::Registry::instance();
250  for (SendJobHeader::ParameterSetMap::const_iterator i = psetMap.begin(), iEnd = psetMap.end(); i != iEnd; ++i) {
251  ParameterSet pset(i->second.pset());
252  pset.setID(i->first);
253  psetRegistry.insertMapped(pset);
254  }
256  for (ProcessConfigurationVector::const_iterator it = pcv.begin(), itEnd = pcv.end(); it != itEnd; ++it) {
257  pcReg.insertMapped(*it);
258  }
259  }
int i
Definition: DBlmapReader.cc:9
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
static ThreadSafeRegistry * instance()
detail::ThreadSafeRegistry< ParameterSetID, ParameterSet, ProcessParameterSetIDCache > Registry
Definition: Registry.h:37
ProcessConfigurationRegistry::vector_type ProcessConfigurationVector
edm::detail::ThreadSafeRegistry< edm::ProcessConfigurationID, edm::ProcessConfiguration > ProcessConfigurationRegistry
void adjustEventToNewProductRegistry(boost::shared_ptr< ProductRegistry const > reg)
static void mergeIntoRegistry(SendJobHeader const &header, ProductRegistry &, bool subsequent)
PrincipalCache const & principalCache() const
Definition: InputSource.h:312
double sd
ProductRegistry & productRegistryUpdate() const
Definition: InputSource.h:295
boost::shared_ptr< ProductRegistry const > productRegistry() const
Accessor for product registry.
Definition: InputSource.h:155
static std::auto_ptr< SendJobHeader > deserializeRegistry(InitMsgView const &initView)
EventPrincipal * edm::StreamerInputSource::deserializeEvent ( EventMsgView const &  eventView)

Deserializes the specified event message into an EventPrincipal object.

Definition at line 265 of file StreamerInputSource.cc.

References cms::Adler32(), EventMsgView::adler32_chksum(), dtNoiseDBValidation_cfg::cerr, EventMsgView::code(), filterCSVwithJSON::copy, dest_, Header::EVENT, EventMsgView::event(), eventCached_, EventMsgView::eventData(), EventMsgView::eventLength(), edm::InputSource::eventPrincipalCache(), edm::hlt::Exception, FDEBUG, edm::EventPrincipal::fillEventPrincipal(), edm::BranchIDListHelper::fixBranchListIndexes(), Capri::details::from(), EventMsgView::hostName(), edm::detail::ThreadSafeRegistry< KEY, T, E >::insertMapped(), edm::detail::ThreadSafeRegistry< KEY, T, E >::instance(), edm::Timestamp::invalidTimestamp(), EventMsgView::lumi(), edm::InputSource::luminosityBlock(), edm::InputSource::luminosityBlockAuxiliary(), newLumi_, newRun_, EventMsgView::origDataSize(), pos, productGetter_, edm::EventPrincipal::putOnRead(), edm::InputSource::resetLuminosityBlockAuxiliary(), EventMsgView::run(), edm::InputSource::run(), edm::InputSource::runAuxiliary(), sd, edm::StreamerInputSource::ProductGetter::setEventPrincipal(), edm::InputSource::setLuminosityBlockAuxiliary(), edm::RunAuxiliary::setProcessHistoryID(), edm::LuminosityBlockAuxiliary::setProcessHistoryID(), edm::setRefCoreStreamer(), edm::InputSource::setRunAuxiliary(), EventMsgView::size(), edm::Principal::size(), tc_, uncompressBuffer(), and xbuf_.

Referenced by edm::StreamerFileReader::read(), and edm::EventStreamHttpReader::read().

265  {
266  if(eventView.code() != Header::EVENT)
267  throw cms::Exception("StreamTranslation","Event deserialization error")
268  << "received wrong message type: expected EVENT, got "
269  << eventView.code() << "\n";
270  FDEBUG(9) << "Decode event: "
271  << eventView.event() << " "
272  << eventView.run() << " "
273  << eventView.size() << " "
274  << eventView.adler32_chksum() << " "
275  << eventView.eventLength() << " "
276  << eventView.eventData()
277  << std::endl;
278  EventSourceSentry(*this);
279  // uncompress if we need to
280  // 78 was a dummy value (for no uncompressed) - should be 0 for uncompressed
281  // need to get rid of this when 090 MTCC streamers are gotten rid of
282  unsigned long origsize = eventView.origDataSize();
283  unsigned long dest_size; //(should be >= eventView.origDataSize())
284 
285  uint32_t adler32_chksum = cms::Adler32((char*)eventView.eventData(), eventView.eventLength());
286  //std::cout << "Adler32 checksum of event = " << adler32_chksum << std::endl;
287  //std::cout << "Adler32 checksum from header = " << eventView.adler32_chksum() << " "
288  // << "host name = " << eventView.hostName() << " len = " << eventView.hostName_len() << std::endl;
289  if((uint32)adler32_chksum != eventView.adler32_chksum()) {
290  std::cerr << "Error from StreamerInputSource: checksum of event data blob failed "
291  << " chksum from event = " << adler32_chksum << " from header = "
292  << eventView.adler32_chksum() << " host name = " << eventView.hostName() << std::endl;
293  // skip event (based on option?) or throw exception?
294  }
295  if(origsize != 78 && origsize != 0) {
296  // compressed
297  dest_size = uncompressBuffer((unsigned char*)eventView.eventData(),
298  eventView.eventLength(), dest_, origsize);
299  } else { // not compressed
300  // we need to copy anyway the buffer as we are using dest in xbuf
301  dest_size = eventView.eventLength();
302  dest_.resize(dest_size);
303  unsigned char* pos = (unsigned char*) &dest_[0];
304  unsigned char* from = (unsigned char*) eventView.eventData();
305  std::copy(from,from+dest_size,pos);
306  }
307  //TBuffer xbuf(TBuffer::kRead, dest_size,
308  // (char*) &dest[0],kFALSE);
309  //TBuffer xbuf(TBuffer::kRead, eventView.eventLength(),
310  // (char*) eventView.eventData(),kFALSE);
311  xbuf_.Reset();
312  xbuf_.SetBuffer(&dest_[0],dest_size,kFALSE);
313  RootDebug tracer(10,10);
314 
316  std::auto_ptr<SendEvent> sd((SendEvent*)xbuf_.ReadObjectAny(tc_));
318 
319  if(sd.get()==0) {
320  throw cms::Exception("StreamTranslation","Event deserialization error")
321  << "got a null event from input stream\n";
322  }
323  ProcessHistoryRegistry::instance()->insertMapped(sd->processHistory());
324 
325  FDEBUG(5) << "Got event: " << sd->aux().id() << " " << sd->products().size() << std::endl;
326  if(runAuxiliary().get() == 0 || runAuxiliary()->run() != sd->aux().run()) {
327  newRun_ = newLumi_ = true;
328  RunAuxiliary* runAuxiliary = new RunAuxiliary(sd->aux().run(), sd->aux().time(), Timestamp::invalidTimestamp());
329  runAuxiliary->setProcessHistoryID(sd->processHistory().id());
330  setRunAuxiliary(runAuxiliary);
332  }
333  if(!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != eventView.lumi()) {
334  LuminosityBlockAuxiliary* luminosityBlockAuxiliary =
335  new LuminosityBlockAuxiliary(runAuxiliary()->run(), eventView.lumi(), sd->aux().time(), Timestamp::invalidTimestamp());
336  luminosityBlockAuxiliary->setProcessHistoryID(sd->processHistory().id());
337  setLuminosityBlockAuxiliary(luminosityBlockAuxiliary);
338  newLumi_ = true;
339  }
340 
341  boost::shared_ptr<EventSelectionIDVector> ids(new EventSelectionIDVector(sd->eventSelectionIDs()));
342  boost::shared_ptr<BranchListIndexes> indexes(new BranchListIndexes(sd->branchListIndexes()));
344  eventPrincipalCache()->fillEventPrincipal(sd->aux(), boost::shared_ptr<LuminosityBlockPrincipal>(), ids, indexes);
346  eventCached_ = true;
347 
348  // no process name list handling
349 
350  SendProds & sps = sd->products();
351  for(SendProds::iterator spi = sps.begin(), spe = sps.end(); spi != spe; ++spi) {
352  FDEBUG(10) << "check prodpair" << std::endl;
353  if(spi->desc() == 0)
354  throw cms::Exception("StreamTranslation","Empty Provenance");
355  FDEBUG(5) << "Prov:"
356  << " " << spi->desc()->className()
357  << " " << spi->desc()->productInstanceName()
358  << " " << spi->desc()->branchID()
359  << std::endl;
360 
361  ConstBranchDescription branchDesc(*spi->desc());
362  // This ProductProvenance constructor inserts into the entry description registry
363  ProductProvenance productProvenance(spi->branchID(), *spi->parents());
364 
365  if(spi->prod() != 0) {
366  FDEBUG(10) << "addgroup next " << spi->branchID() << std::endl;
367  eventPrincipalCache()->putOnRead(branchDesc, spi->prod(), productProvenance);
368  FDEBUG(10) << "addgroup done" << std::endl;
369  } else {
370  FDEBUG(10) << "addgroup empty next " << spi->branchID() << std::endl;
371  eventPrincipalCache()->putOnRead(branchDesc, spi->prod(), productProvenance);
372  FDEBUG(10) << "addgroup empty done" << std::endl;
373  }
374  spi->clear();
375  }
376 
377  FDEBUG(10) << "Size = " << eventPrincipalCache()->size() << std::endl;
378 
379  return eventPrincipalCache();
380  }
size_t size() const
Definition: Principal.cc:146
void setRefCoreStreamer(bool resetAll=false)
boost::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:238
static ThreadSafeRegistry * instance()
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:606
#define FDEBUG(lev)
Definition: DebugMacros.h:18
static void fixBranchListIndexes(BranchListIndexes &indexes)
std::vector< EventSelectionID > EventSelectionIDVector
void resetLuminosityBlockAuxiliary() const
Definition: InputSource.h:302
std::vector< BranchListIndex > BranchListIndexes
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:298
std::vector< StreamedProduct > SendProds
void fillEventPrincipal(EventAuxiliary const &aux, boost::shared_ptr< LuminosityBlockPrincipal > lbp, boost::shared_ptr< EventSelectionIDVector > eventSelectionIDs=boost::shared_ptr< EventSelectionIDVector >(), boost::shared_ptr< BranchListIndexes > branchListIndexes=boost::shared_ptr< BranchListIndexes >(), boost::shared_ptr< BranchMapper > mapper=boost::shared_ptr< BranchMapper >(new BranchMapper), DelayedReader *reader=0)
unsigned int uint32
Definition: MsgTools.h:13
static std::string from(" from ")
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:612
static Timestamp const & invalidTimestamp()
Definition: Timestamp.cc:83
double sd
void putOnRead(ConstBranchDescription const &bd, void const *product, ProductProvenance const &productProvenance)
boost::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:235
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:297
bool insertMapped(value_type const &v)
EventPrincipal * eventPrincipalCache()
Definition: InputSource.cc:139
static unsigned int uncompressBuffer(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize)
std::vector< unsigned char > dest_
std::auto_ptr< SendJobHeader > edm::StreamerInputSource::deserializeRegistry ( InitMsgView const &  initView)
static

Deserializes the specified init message into a SendJobHeader object (which is related to the product registry).

Definition at line 193 of file StreamerInputSource.cc.

References cms::Adler32(), InitMsgView::adler32_chksum(), dtNoiseDBValidation_cfg::cerr, InitMsgView::code(), InitMsgView::descData(), InitMsgView::descLength(), edm::hlt::Exception, FDEBUG, edm::getTClass(), InitMsgView::hostName(), Header::INIT, InitMsgView::processName(), processName_, InitMsgView::protocolVersion(), protocolVersion_, and sd.

Referenced by deserializeAndMergeWithRegistry(), and edm::readHeaderFromStream().

193  {
194  if(initView.code() != Header::INIT)
195  throw cms::Exception("StreamTranslation","Registry deserialization error")
196  << "received wrong message type: expected INIT, got "
197  << initView.code() << "\n";
198 
199  //Get the process name and store if for Protocol version 4 and above.
200  if (initView.protocolVersion() > 3) {
201 
202  processName_ = initView.processName();
203  protocolVersion_ = initView.protocolVersion();
204 
205  FDEBUG(10) << "StreamerInputSource::deserializeRegistry processName = "<< processName_<< std::endl;
206  FDEBUG(10) << "StreamerInputSource::deserializeRegistry protocolVersion_= "<< protocolVersion_<< std::endl;
207  }
208 
209  // calculate the adler32 checksum
210  uint32_t adler32_chksum = cms::Adler32((char*)initView.descData(),initView.descLength());
211  //std::cout << "Adler32 checksum of init message = " << adler32_chksum << std::endl;
212  //std::cout << "Adler32 checksum of init messsage from header = " << initView.adler32_chksum() << " "
213  // << "host name = " << initView.hostName() << " len = " << initView.hostName_len() << std::endl;
214  if((uint32)adler32_chksum != initView.adler32_chksum()) {
215  std::cerr << "Error from StreamerInputSource: checksum of Init registry blob failed "
216  << " chksum from registry data = " << adler32_chksum << " from header = "
217  << initView.adler32_chksum() << " host name = " << initView.hostName() << std::endl;
218  // skip event (based on option?) or throw exception?
219  }
220 
221  TClass* desc = getTClass(typeid(SendJobHeader));
222 
223  TBufferFile xbuf(TBuffer::kRead, initView.descLength(),
224  (char*)initView.descData(),kFALSE);
225  RootDebug tracer(10,10);
226  std::auto_ptr<SendJobHeader> sd((SendJobHeader*)xbuf.ReadObjectAny(desc));
227 
228  if(sd.get()==0) {
229  throw cms::Exception("StreamTranslation","Registry deserialization error")
230  << "Could not read the initial product registry list\n";
231  }
232 
233  return sd;
234  }
static std::string processName_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
TClass * getTClass(const std::type_info &ti)
Definition: ClassFiller.cc:87
static unsigned int protocolVersion_
unsigned int uint32
Definition: MsgTools.h:13
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
double sd
void edm::StreamerInputSource::fillDescription ( ParameterSetDescription description)
static

Definition at line 461 of file StreamerInputSource.cc.

References edm::InputSource::fillDescription().

Referenced by edm::StreamerFileReader::fillDescriptions().

461  {
462  // The default value for "inputFileTransitionsEachEvent" gets defined in the derived class
463  // as it depends on the derived class. So, we cannot redefine it here.
465  }
static void fillDescription(ParameterSetDescription &desc)
Definition: InputSource.cc:129
InputSource::ItemType edm::StreamerInputSource::getNextItemType ( )
privatevirtual

Implements edm::InputSource.

Definition at line 154 of file StreamerInputSource.cc.

References eventCached_, inputFileTransitionsEachEvent_, edm::InputSource::IsEvent, edm::InputSource::IsFile, edm::InputSource::IsLumi, edm::InputSource::IsRun, edm::InputSource::IsStop, edm::InputSource::luminosityBlockAuxiliary(), newLumi_, newRun_, read(), edm::InputSource::resetLuminosityBlockAuxiliary(), edm::InputSource::resetRunAuxiliary(), edm::InputSource::runAuxiliary(), and runEndingFlag_.

154  {
155  if (runEndingFlag_) {
156  return IsStop;
157  }
158  if(newRun_ && runAuxiliary()) {
159  return IsRun;
160  }
162  return IsLumi;
163  }
164  if (eventCached_) {
165  return IsEvent;
166  }
170  }
171  read();
172  if (!eventCached_) {
173  return IsStop;
174  } else {
175  runEndingFlag_ = false;
177  return IsFile;
178  }
179  }
180  if(newRun_) {
181  return IsRun;
182  } else if(newLumi_) {
183  return IsLumi;
184  }
185  return IsEvent;
186  }
boost::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:238
void resetLuminosityBlockAuxiliary() const
Definition: InputSource.h:302
virtual EventPrincipal * read()=0
void resetRunAuxiliary() const
Definition: InputSource.h:299
boost::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:235
void edm::StreamerInputSource::mergeIntoRegistry ( SendJobHeader const &  header,
ProductRegistry reg,
bool  subsequent 
)
static

Definition at line 76 of file StreamerInputSource.cc.

References edm::SendJobHeader::branchIDLists(), buildClassCache(), declareStreamers(), edm::SendJobHeader::descs(), edm::hlt::Exception, FDEBUG, edm::fillProductRegistryTransients(), edm::loadExtraClasses(), edm::ProductRegistry::merge(), edm::BranchDescription::Permissive, edm::SendJobHeader::processConfigurations(), edm::BranchIDListHelper::updateFromInput(), and edm::ProductRegistry::updateFromInput().

Referenced by deserializeAndMergeWithRegistry(), and edm::getRegFromFile().

76  {
77 
78  SendDescs const& descs = header.descs();
79 
80  FDEBUG(6) << "mergeIntoRegistry: Product List: " << std::endl;
81 
82  if (subsequent) {
83  ProductRegistry pReg;
84  pReg.updateFromInput(descs);
85  fillProductRegistryTransients(header.processConfigurations(), pReg);
86  std::string mergeInfo = reg.merge(pReg, std::string(), BranchDescription::Permissive);
87  if (!mergeInfo.empty()) {
88  throw cms::Exception("MismatchedInput","RootInputFileSequence::previousEvent()") << mergeInfo;
89  }
90  BranchIDListHelper::updateFromInput(header.branchIDLists(), std::string());
91  } else {
92  declareStreamers(descs);
93  buildClassCache(descs);
95  reg.updateFromInput(descs);
96  fillProductRegistryTransients(header.processConfigurations(), reg);
97  BranchIDListHelper::updateFromInput(header.branchIDLists(), std::string());
98  }
99  }
std::vector< BranchDescription > SendDescs
static bool updateFromInput(BranchIDLists const &bidlists, std::string const &fileName)
#define FDEBUG(lev)
Definition: DebugMacros.h:18
void fillProductRegistryTransients(std::vector< ProcessConfiguration > const &pcVec, ProductRegistry const &preg, bool okToRegister=false)
static void declareStreamers(SendDescs const &descs)
static void buildClassCache(SendDescs const &descs)
void loadExtraClasses()
Definition: ClassFiller.cc:47
virtual EventPrincipal* edm::StreamerInputSource::read ( )
privatepure virtual
EventPrincipal * edm::StreamerInputSource::readEvent_ ( )
privatevirtual

Implements edm::InputSource.

Definition at line 144 of file StreamerInputSource.cc.

References eventCached_, edm::InputSource::eventPrincipalCache(), edm::InputSource::luminosityBlockPrincipal(), newLumi_, newRun_, and edm::EventPrincipal::setLuminosityBlockPrincipal().

144  {
145  assert(!newRun_);
146  assert(!newLumi_);
147  assert(eventCached_);
148  eventCached_ = false;
150  return eventPrincipalCache();
151  }
void setLuminosityBlockPrincipal(boost::shared_ptr< LuminosityBlockPrincipal > const &lbp)
boost::shared_ptr< LuminosityBlockPrincipal > const luminosityBlockPrincipal() const
Definition: InputSource.cc:281
EventPrincipal * eventPrincipalCache()
Definition: InputSource.cc:139
boost::shared_ptr< FileBlock > edm::StreamerInputSource::readFile_ ( )
privatevirtual

Reimplemented from edm::InputSource.

Definition at line 71 of file StreamerInputSource.cc.

71  {
72  return boost::shared_ptr<FileBlock>(new FileBlock);
73  }
boost::shared_ptr< LuminosityBlockAuxiliary > edm::StreamerInputSource::readLuminosityBlockAuxiliary_ ( )
privatevirtual

Implements edm::InputSource.

Definition at line 135 of file StreamerInputSource.cc.

References edm::InputSource::luminosityBlockAuxiliary(), newLumi_, and newRun_.

135  {
136  assert(!newRun_);
137  assert(newLumi_);
138  assert(luminosityBlockAuxiliary());
139  newLumi_ = false;
140  return luminosityBlockAuxiliary();
141  }
boost::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:238
boost::shared_ptr< RunAuxiliary > edm::StreamerInputSource::readRunAuxiliary_ ( )
privatevirtual

Implements edm::InputSource.

Definition at line 127 of file StreamerInputSource.cc.

References newRun_, and edm::InputSource::runAuxiliary().

127  {
128  assert(newRun_);
129  assert(runAuxiliary());
130  newRun_ = false;
131  return runAuxiliary();
132  }
boost::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:235
void edm::StreamerInputSource::resetAfterEndRun ( )
protected

Definition at line 426 of file StreamerInputSource.cc.

References eventCached_, newLumi_, newRun_, edm::InputSource::reset(), edm::InputSource::resetLuminosityBlockAuxiliary(), edm::InputSource::resetRunAuxiliary(), and runEndingFlag_.

426  {
427  // called from an online streamer source to reset after a stop command
428  // so an enable command will work
431  newRun_ = newLumi_ = true;
432  assert(!eventCached_);
433  reset();
434  runEndingFlag_ = false;
435  }
void resetLuminosityBlockAuxiliary() const
Definition: InputSource.h:302
void resetRunAuxiliary() const
Definition: InputSource.h:299
void reset() const
Definition: InputSource.h:305
void edm::StreamerInputSource::setEndRun ( )
inlineprotected

Definition at line 61 of file StreamerInputSource.h.

References runEndingFlag_.

Referenced by edm::EventStreamHttpReader::read().

void edm::StreamerInputSource::setRun ( RunNumber_t  r)
privatevirtual

Reimplemented from edm::InputSource.

Definition at line 437 of file StreamerInputSource.cc.

References edm::hlt::Exception, and edm::errors::LogicError.

437  {
438  // Need to define a dummy setRun here or else the InputSource::setRun is called
439  // if we have a source inheriting from this and wants to define a setRun method
441  << "StreamerInputSource::setRun()\n"
442  << "Run number cannot be modified for this type of Input Source\n"
443  << "Contact a Storage Manager Developer\n";
444  }
unsigned int edm::StreamerInputSource::uncompressBuffer ( unsigned char *  inputBuffer,
unsigned int  inputSize,
std::vector< unsigned char > &  outputBuffer,
unsigned int  expectedFullSize 
)
static

Uncompresses the data in the specified input buffer into the specified output buffer. The inputSize should be set to the size of the compressed data in the inputBuffer. The expectedFullSize should be set to the original size of the data (before compression). Returns the actual size of the uncompressed data. Errors are reported by throwing exceptions.

Definition at line 391 of file StreamerInputSource.cc.

References dtNoiseDBValidation_cfg::cerr, edm::hlt::Exception, FDEBUG, and run_regression::ret.

Referenced by edm::StreamDQMDeserializer::deserializeDQMEvent(), and deserializeEvent().

394  {
395  unsigned long origSize = expectedFullSize;
396  unsigned long uncompressedSize = expectedFullSize*1.1;
397  FDEBUG(1) << "Uncompress: original size = " << origSize
398  << ", compressed size = " << inputSize
399  << std::endl;
400  outputBuffer.resize(uncompressedSize);
401  int ret = uncompress(&outputBuffer[0], &uncompressedSize,
402  inputBuffer, inputSize); // do not need compression level
403  //std::cout << "unCompress Return value: " << ret << " Okay = " << Z_OK << std::endl;
404  if(ret == Z_OK) {
405  // check the length against original uncompressed length
406  FDEBUG(10) << " original size = " << origSize << " final size = "
407  << uncompressedSize << std::endl;
408  if(origSize != uncompressedSize) {
409  std::cerr << "deserializeEvent: Problem with uncompress, original size = "
410  << origSize << " uncompress size = " << uncompressedSize << std::endl;
411  // we throw an error and return without event! null pointer
412  throw cms::Exception("StreamDeserialization","Uncompression error")
413  << "mismatch event lengths should be" << origSize << " got "
414  << uncompressedSize << "\n";
415  }
416  } else {
417  // we throw an error and return without event! null pointer
418  std::cerr << "deserializeEvent: Problem with uncompress, return value = "
419  << ret << std::endl;
420  throw cms::Exception("StreamDeserialization","Uncompression error")
421  << "Error code = " << ret << "\n ";
422  }
423  return (unsigned int) uncompressedSize;
424  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18

Member Data Documentation

std::vector<unsigned char> edm::StreamerInputSource::dest_
private

Definition at line 102 of file StreamerInputSource.h.

Referenced by deserializeEvent().

bool edm::StreamerInputSource::eventCached_
private
bool edm::StreamerInputSource::inputFileTransitionsEachEvent_
protected

Definition at line 64 of file StreamerInputSource.h.

Referenced by getNextItemType().

bool edm::StreamerInputSource::newLumi_
private
bool edm::StreamerInputSource::newRun_
private
std::string edm::StreamerInputSource::processName_
staticprivate

Definition at line 108 of file StreamerInputSource.h.

Referenced by deserializeRegistry().

ProductGetter edm::StreamerInputSource::productGetter_
private

Definition at line 105 of file StreamerInputSource.h.

Referenced by deserializeEvent().

unsigned int edm::StreamerInputSource::protocolVersion_
staticprivate

Definition at line 109 of file StreamerInputSource.h.

Referenced by deserializeRegistry().

bool edm::StreamerInputSource::runEndingFlag_
private

Definition at line 104 of file StreamerInputSource.h.

Referenced by getNextItemType(), resetAfterEndRun(), and setEndRun().

TClass* edm::StreamerInputSource::tc_
private

Definition at line 101 of file StreamerInputSource.h.

Referenced by deserializeEvent().

TBufferFile edm::StreamerInputSource::xbuf_
private

Definition at line 103 of file StreamerInputSource.h.

Referenced by deserializeEvent().