CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Classes | Public Member Functions | Static Public Member Functions | Protected Member Functions | Static Protected Member Functions | Protected Attributes | Private Member Functions | Private Attributes | Static Private Attributes
edm::StreamerInputSource Class Referenceabstract

#include <StreamerInputSource.h>

Inheritance diagram for edm::StreamerInputSource:
edm::InputSource edm::ProductRegistryHelper edm::EventStreamHttpReader edm::StreamerFileReader edm::StreamerInputModule< Producer >

Classes

class  ProductGetter
 

Public Member Functions

void deserializeAndMergeWithRegistry (InitMsgView const &initView, bool subsequent=false)
 
EventPrincipaldeserializeEvent (EventMsgView const &eventView)
 
 StreamerInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
 
virtual ~StreamerInputSource ()
 
- Public Member Functions inherited from edm::InputSource
boost::shared_ptr
< ActivityRegistry
actReg () const
 Accessor for Activity Registry. More...
 
void closeFile (boost::shared_ptr< FileBlock >)
 close current file More...
 
void doBeginJob ()
 Called by framework at beginning of job. More...
 
void doBeginLumi (LuminosityBlockPrincipal &lbp)
 Called by framework at beginning of lumi block. More...
 
void doBeginRun (RunPrincipal &rp)
 Called by framework at beginning of run. More...
 
void doEndJob ()
 Called by framework at end of job. More...
 
void doEndLumi (LuminosityBlockPrincipal &lbp)
 Called by framework at end of lumi block. More...
 
void doEndRun (RunPrincipal &rp)
 Called by framework at end of run. More...
 
void doPostForkReacquireResources (boost::shared_ptr< multicore::MessageReceiverForSource >)
 
void doPreForkReleaseResources ()
 Called by the framework before forking the process. More...
 
ProcessingController::ForwardState forwardState () const
 
bool goToEvent (EventID const &eventID)
 
 InputSource (ParameterSet const &, InputSourceDescription const &)
 Constructor. More...
 
void issueReports (EventID const &eventID)
 issue an event report More...
 
LuminosityBlockNumber_t luminosityBlock () const
 Accessor for current luminosity block number. More...
 
boost::shared_ptr
< LuminosityBlockAuxiliary
luminosityBlockAuxiliary () const
 Called by the framework to merge or insert lumi in principal cache. More...
 
int markLumi ()
 Mark lumi as read. More...
 
int markRun ()
 Mark run as read. More...
 
int maxEvents () const
 
int maxLuminosityBlocks () const
 
ModuleDescription const & moduleDescription () const
 Accessor for 'module' description. More...
 
ItemType nextItemType ()
 
bool const primary () const
 Accessor for primary input source flag. More...
 
ProcessConfiguration const & processConfiguration () const
 Accessor for Process Configuration. More...
 
std::string const & processGUID () const
 Accessor for global process identifier. More...
 
ProcessHistoryID const & processHistoryID () const
 Accessor for the input process history ID of the current run. More...
 
ProcessingMode processingMode () const
 RunsLumisAndEvents (default), RunsAndLumis, or Runs. More...
 
boost::shared_ptr
< ProductRegistry const > 
productRegistry () const
 Accessor for product registry. More...
 
bool randomAccess () const
 
void readAndCacheLumi ()
 Read next luminosity block. More...
 
void readAndCacheRun ()
 Read next run. More...
 
EventPrincipalreadEvent (boost::shared_ptr< LuminosityBlockPrincipal > lbCache)
 
EventPrincipalreadEvent (EventID const &)
 Read a specific event. More...
 
boost::shared_ptr< FileBlockreadFile ()
 Read next file. More...
 
boost::shared_ptr
< LuminosityBlockAuxiliary
readLuminosityBlockAuxiliary ()
 Read next luminosity block Auxilary. More...
 
boost::shared_ptr< RunAuxiliaryreadRunAuxiliary ()
 Read next run Auxiliary. More...
 
void registerProducts ()
 Register any produced products. More...
 
int remainingEvents () const
 
int remainingLuminosityBlocks () const
 
void repeat ()
 Reset the remaining number of events/lumis to the maximum number. More...
 
ProcessingController::ReverseState reverseState () const
 
void rewind ()
 Begin again at the first event. More...
 
RunNumber_t run () const
 Accessor for current run number. More...
 
boost::shared_ptr< RunAuxiliaryrunAuxiliary () const
 Called by the framework to merge or insert run in principal cache. More...
 
void setLuminosityBlockNumber_t (LuminosityBlockNumber_t lb)
 Set the luminosity block ID. More...
 
void setRunNumber (RunNumber_t r)
 Set the run number. More...
 
void skipEvents (int offset)
 
Timestamp const & timestamp () const
 Accessor for the current time, as seen by the input source. More...
 
void wakeUp ()
 Wake up the input source. More...
 
virtual ~InputSource ()
 Destructor. More...
 

Static Public Member Functions

static std::auto_ptr
< SendJobHeader
deserializeRegistry (InitMsgView const &initView)
 
static void fillDescription (ParameterSetDescription &description)
 
static void mergeIntoRegistry (SendJobHeader const &header, ProductRegistry &, bool subsequent)
 
static unsigned int uncompressBuffer (unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize)
 
- Static Public Member Functions inherited from edm::InputSource
static const std::string & baseType ()
 
static void fillDescription (ParameterSetDescription &desc)
 
static void fillDescriptions (ConfigurationDescriptions &descriptions)
 

Protected Member Functions

void resetAfterEndRun ()
 
void setEndRun ()
 
- Protected Member Functions inherited from edm::InputSource
void decreaseRemainingEventsBy (int iSkipped)
 
EventPrincipal *const eventPrincipalCache ()
 
boost::shared_ptr
< LuminosityBlockPrincipal >
const 
luminosityBlockPrincipal () const
 
PrincipalCache const & principalCache () const
 
PrincipalCacheprincipalCache ()
 
ProductRegistryproductRegistryUpdate () const
 
void reset () const
 
void resetLuminosityBlockAuxiliary () const
 
void resetRunAuxiliary () const
 
boost::shared_ptr
< RunPrincipal > const 
runPrincipal () const
 
void setLuminosityBlockAuxiliary (LuminosityBlockAuxiliary *lbp)
 
void setLumiPrematurelyRead ()
 
void setRunAuxiliary (RunAuxiliary *rp)
 
void setRunPrematurelyRead ()
 
void setTimestamp (Timestamp const &theTime)
 To set the current time, as seen by the input source. More...
 
ItemType state () const
 

Static Protected Member Functions

static void buildClassCache (SendDescs const &descs)
 
static void declareStreamers (SendDescs const &descs)
 

Protected Attributes

bool inputFileTransitionsEachEvent_
 

Private Member Functions

virtual ItemType getNextItemType ()
 
virtual EventPrincipalread ()=0
 
virtual EventPrincipalreadEvent_ ()
 
virtual boost::shared_ptr
< FileBlock
readFile_ ()
 
virtual boost::shared_ptr
< LuminosityBlockAuxiliary
readLuminosityBlockAuxiliary_ ()
 
virtual boost::shared_ptr
< RunAuxiliary
readRunAuxiliary_ ()
 
virtual void setRun (RunNumber_t r)
 

Private Attributes

std::vector< unsigned char > dest_
 
bool eventCached_
 
bool newLumi_
 
bool newRun_
 
ProductGetter productGetter_
 
bool runEndingFlag_
 
TClass * tc_
 
TBufferFile xbuf_
 

Static Private Attributes

static std::string processName_
 
static unsigned int protocolVersion_
 

Additional Inherited Members

- Public Types inherited from edm::InputSource
enum  ItemType {
  IsInvalid, IsStop, IsFile, IsRun,
  IsLumi, IsEvent, IsRepeat
}
 
enum  ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents }
 
typedef
ProductRegistryHelper::TypeLabelList 
TypeLabelList
 

Detailed Description

Definition at line 29 of file StreamerInputSource.h.

Constructor & Destructor Documentation

edm::StreamerInputSource::StreamerInputSource ( ParameterSet const &  pset,
InputSourceDescription const &  desc 
)
explicit

Definition at line 49 of file StreamerInputSource.cc.

51  :
52  InputSource(pset, desc),
53  // The default value for the following parameter get defined in at least one derived class
54  // where it has a different default value.
56  pset.getUntrackedParameter<bool>("inputFileTransitionsEachEvent", false)),
57  newRun_(true),
58  newLumi_(true),
59  eventCached_(false),
60  tc_(getTClass(typeid(SendEvent))),
62  xbuf_(TBuffer::kRead, init_size),
63  runEndingFlag_(false),
64  productGetter_() {
65  }
TClass * getTClass(const std::type_info &ti)
Definition: ClassFiller.cc:87
const int init_size
InputSource(ParameterSet const &, InputSourceDescription const &)
Constructor.
Definition: InputSource.cc:74
std::vector< unsigned char > dest_
edm::StreamerInputSource::~StreamerInputSource ( )
virtual

Definition at line 67 of file StreamerInputSource.cc.

67 {}

Member Function Documentation

void edm::StreamerInputSource::buildClassCache ( SendDescs const &  descs)
staticprotected

Definition at line 115 of file StreamerInputSource.cc.

References edm::doBuildRealData(), FDEBUG, i, and edm::wrappedClassName().

Referenced by mergeIntoRegistry().

115  {
116  SendDescs::const_iterator i(descs.begin()), e(descs.end());
117 
118  for(; i != e; ++i) {
119  //pi->init();
120  std::string const real_name = wrappedClassName(i->className());
121  FDEBUG(6) << "BuildReadData: " << real_name << std::endl;
122  doBuildRealData(real_name);
123  }
124  }
int i
Definition: DBlmapReader.cc:9
void doBuildRealData(const std::string &name)
Definition: ClassFiller.cc:34
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::string wrappedClassName(std::string const &iFullName)
void edm::StreamerInputSource::declareStreamers ( SendDescs const &  descs)
staticprotected

Definition at line 102 of file StreamerInputSource.cc.

References FDEBUG, i, edm::loadCap(), and edm::wrappedClassName().

Referenced by mergeIntoRegistry().

102  {
103  SendDescs::const_iterator i(descs.begin()), e(descs.end());
104 
105  for(; i != e; ++i) {
106  //pi->init();
107  std::string const real_name = wrappedClassName(i->className());
108  FDEBUG(6) << "declare: " << real_name << std::endl;
109  loadCap(real_name);
110  }
111  }
int i
Definition: DBlmapReader.cc:9
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::string wrappedClassName(std::string const &iFullName)
void loadCap(const std::string &name)
Definition: ClassFiller.cc:22
void edm::StreamerInputSource::deserializeAndMergeWithRegistry ( InitMsgView const &  initView,
bool  subsequent = false 
)

Deserializes the specified init message into a SendJobHeader object and merges registries.

Definition at line 240 of file StreamerInputSource.cc.

References edm::PrincipalCache::adjustEventToNewProductRegistry(), deserializeRegistry(), i, edm::detail::ThreadSafeRegistry< KEY, T, E >::insertMapped(), edm::detail::ThreadSafeRegistry< KEY, T, E >::instance(), mergeIntoRegistry(), edm::InputSource::principalCache(), edm::InputSource::productRegistry(), edm::InputSource::productRegistryUpdate(), and MultipleCompare::pset.

Referenced by edm::StreamerFileReader::read(), edm::EventStreamHttpReader::readHeader(), edm::StreamerFileReader::StreamerFileReader(), and edm::StreamerInputModule< Producer >::StreamerInputModule().

240  {
241  std::auto_ptr<SendJobHeader> sd = deserializeRegistry(initView);
242  ProcessConfigurationVector const& pcv = sd->processConfigurations();
243  mergeIntoRegistry(*sd, productRegistryUpdate(), subsequent);
244  if (subsequent) {
246  }
247  SendJobHeader::ParameterSetMap const& psetMap = sd->processParameterSet();
248  pset::Registry& psetRegistry = *pset::Registry::instance();
249  for (SendJobHeader::ParameterSetMap::const_iterator i = psetMap.begin(), iEnd = psetMap.end(); i != iEnd; ++i) {
250  ParameterSet pset(i->second.pset());
251  pset.setID(i->first);
252  psetRegistry.insertMapped(pset);
253  }
255  for (ProcessConfigurationVector::const_iterator it = pcv.begin(), itEnd = pcv.end(); it != itEnd; ++it) {
256  pcReg.insertMapped(*it);
257  }
258  }
int i
Definition: DBlmapReader.cc:9
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
detail::ThreadSafeRegistry< ParameterSetID, ParameterSet, ProcessParameterSetIDCache > Registry
Definition: Registry.h:37
ProcessConfigurationRegistry::vector_type ProcessConfigurationVector
edm::detail::ThreadSafeRegistry< edm::ProcessConfigurationID, edm::ProcessConfiguration > ProcessConfigurationRegistry
void adjustEventToNewProductRegistry(boost::shared_ptr< ProductRegistry const > reg)
static void mergeIntoRegistry(SendJobHeader const &header, ProductRegistry &, bool subsequent)
PrincipalCache const & principalCache() const
Definition: InputSource.h:316
ProductRegistry & productRegistryUpdate() const
Definition: InputSource.h:299
boost::shared_ptr< ProductRegistry const > productRegistry() const
Accessor for product registry.
Definition: InputSource.h:161
static std::auto_ptr< SendJobHeader > deserializeRegistry(InitMsgView const &initView)
static ThreadSafeRegistry * instance()
EventPrincipal * edm::StreamerInputSource::deserializeEvent ( EventMsgView const &  eventView)

Deserializes the specified event message into an EventPrincipal object.

Definition at line 264 of file StreamerInputSource.cc.

References cms::Adler32(), EventMsgView::adler32_chksum(), printConversionInfo::aux, benchmark_cfg::cerr, EventMsgView::code(), filterCSVwithJSON::copy, dest_, Header::EVENT, EventMsgView::event(), eventCached_, EventMsgView::eventData(), EventMsgView::eventLength(), edm::InputSource::eventPrincipalCache(), edm::hlt::Exception, FDEBUG, edm::EventPrincipal::fillEventPrincipal(), Capri::details::from(), EventMsgView::hostName(), edm::detail::ThreadSafeRegistry< KEY, T, E >::insertMapped(), edm::detail::ThreadSafeRegistry< KEY, T, E >::instance(), edm::Timestamp::invalidTimestamp(), EventMsgView::lumi(), edm::InputSource::luminosityBlock(), edm::InputSource::luminosityBlockAuxiliary(), edm::InputSource::luminosityBlockPrincipal(), newLumi_, newRun_, EventMsgView::origDataSize(), pos, productGetter_, edm::EventPrincipal::putOnRead(), edm::InputSource::readAndCacheLumi(), edm::InputSource::readAndCacheRun(), EventMsgView::run(), edm::InputSource::run(), edm::InputSource::runAuxiliary(), edm::StreamerInputSource::ProductGetter::setEventPrincipal(), edm::InputSource::setLuminosityBlockAuxiliary(), edm::InputSource::setLumiPrematurelyRead(), edm::RunAuxiliary::setProcessHistoryID(), edm::LuminosityBlockAuxiliary::setProcessHistoryID(), edm::setRefCoreStreamer(), edm::InputSource::setRunAuxiliary(), edm::InputSource::setRunPrematurelyRead(), EventMsgView::size(), edm::Principal::size(), tc_, uncompressBuffer(), and xbuf_.

Referenced by edm::StreamerFileReader::read(), and edm::EventStreamHttpReader::read().

264  {
265  if(eventView.code() != Header::EVENT)
266  throw cms::Exception("StreamTranslation","Event deserialization error")
267  << "received wrong message type: expected EVENT, got "
268  << eventView.code() << "\n";
269  FDEBUG(9) << "Decode event: "
270  << eventView.event() << " "
271  << eventView.run() << " "
272  << eventView.size() << " "
273  << eventView.adler32_chksum() << " "
274  << eventView.eventLength() << " "
275  << eventView.eventData()
276  << std::endl;
277  EventSourceSentry(*this);
278  // uncompress if we need to
279  // 78 was a dummy value (for no uncompressed) - should be 0 for uncompressed
280  // need to get rid of this when 090 MTCC streamers are gotten rid of
281  unsigned long origsize = eventView.origDataSize();
282  unsigned long dest_size; //(should be >= eventView.origDataSize())
283 
284  uint32_t adler32_chksum = cms::Adler32((char*)eventView.eventData(), eventView.eventLength());
285  //std::cout << "Adler32 checksum of event = " << adler32_chksum << std::endl;
286  //std::cout << "Adler32 checksum from header = " << eventView.adler32_chksum() << " "
287  // << "host name = " << eventView.hostName() << " len = " << eventView.hostName_len() << std::endl;
288  if((uint32)adler32_chksum != eventView.adler32_chksum()) {
289  std::cerr << "Error from StreamerInputSource: checksum of event data blob failed "
290  << " chksum from event = " << adler32_chksum << " from header = "
291  << eventView.adler32_chksum() << " host name = " << eventView.hostName() << std::endl;
292  // skip event (based on option?) or throw exception?
293  }
294  if(origsize != 78 && origsize != 0) {
295  // compressed
296  dest_size = uncompressBuffer((unsigned char*)eventView.eventData(),
297  eventView.eventLength(), dest_, origsize);
298  } else { // not compressed
299  // we need to copy anyway the buffer as we are using dest in xbuf
300  dest_size = eventView.eventLength();
301  dest_.resize(dest_size);
302  unsigned char* pos = (unsigned char*) &dest_[0];
303  unsigned char* from = (unsigned char*) eventView.eventData();
304  std::copy(from,from+dest_size,pos);
305  }
306  //TBuffer xbuf(TBuffer::kRead, dest_size,
307  // (char*) &dest[0],kFALSE);
308  //TBuffer xbuf(TBuffer::kRead, eventView.eventLength(),
309  // (char*) eventView.eventData(),kFALSE);
310  xbuf_.Reset();
311  xbuf_.SetBuffer(&dest_[0],dest_size,kFALSE);
312  RootDebug tracer(10,10);
313 
315  std::auto_ptr<SendEvent> sd((SendEvent*)xbuf_.ReadObjectAny(tc_));
317 
318  if(sd.get()==0) {
319  throw cms::Exception("StreamTranslation","Event deserialization error")
320  << "got a null event from input stream\n";
321  }
322  ProcessHistoryRegistry::instance()->insertMapped(sd->processHistory());
323 
324  FDEBUG(5) << "Got event: " << sd->aux().id() << " " << sd->products().size() << std::endl;
325  if(runAuxiliary().get() == 0 || runAuxiliary()->run() != sd->aux().run()) {
326  newRun_ = newLumi_ = true;
327  RunAuxiliary* runAuxiliary = new RunAuxiliary(sd->aux().run(), sd->aux().time(), Timestamp::invalidTimestamp());
328  runAuxiliary->setProcessHistoryID(sd->processHistory().id());
329  setRunAuxiliary(runAuxiliary);
330  readAndCacheRun();
332  }
333  if(!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != eventView.lumi()) {
334  LuminosityBlockAuxiliary* luminosityBlockAuxiliary =
335  new LuminosityBlockAuxiliary(runAuxiliary()->run(), eventView.lumi(), sd->aux().time(), Timestamp::invalidTimestamp());
336  luminosityBlockAuxiliary->setProcessHistoryID(sd->processHistory().id());
337  setLuminosityBlockAuxiliary(luminosityBlockAuxiliary);
338  newLumi_ = true;
341  }
342 
343  boost::shared_ptr<EventSelectionIDVector> ids(new EventSelectionIDVector(sd->eventSelectionIDs()));
344  boost::shared_ptr<BranchListIndexes> indexes(new BranchListIndexes(sd->branchListIndexes()));
345  std::auto_ptr<EventAuxiliary> aux(new EventAuxiliary(sd->aux()));
348  eventCached_ = true;
349 
350  // no process name list handling
351 
352  SendProds & sps = sd->products();
353  for(SendProds::iterator spi = sps.begin(), spe = sps.end(); spi != spe; ++spi) {
354  FDEBUG(10) << "check prodpair" << std::endl;
355  if(spi->desc() == 0)
356  throw cms::Exception("StreamTranslation","Empty Provenance");
357  FDEBUG(5) << "Prov:"
358  << " " << spi->desc()->className()
359  << " " << spi->desc()->productInstanceName()
360  << " " << spi->desc()->branchID()
361  << std::endl;
362 
363  ConstBranchDescription branchDesc(*spi->desc());
364  // This ProductProvenance constructor inserts into the entry description registry
365  std::auto_ptr<ProductProvenance> productProvenance(
366  new ProductProvenance(spi->branchID(),
367  spi->status(),
368  *spi->parents()));
369 
370  if(spi->prod() != 0) {
371  std::auto_ptr<EDProduct> aprod(const_cast<EDProduct*>(spi->prod()));
372  FDEBUG(10) << "addgroup next " << spi->branchID() << std::endl;
373  eventPrincipalCache()->putOnRead(branchDesc, aprod, productProvenance);
374  FDEBUG(10) << "addgroup done" << std::endl;
375  } else {
376  FDEBUG(10) << "addgroup empty next " << spi->branchID() << std::endl;
377  eventPrincipalCache()->putOnRead(branchDesc, std::auto_ptr<EDProduct>(), productProvenance);
378  FDEBUG(10) << "addgroup empty done" << std::endl;
379  }
380  spi->clear();
381  }
382 
383  FDEBUG(10) << "Size = " << eventPrincipalCache()->size() << std::endl;
384 
385  return eventPrincipalCache();
386  }
size_t size() const
Definition: Principal.cc:65
EventPrincipal *const eventPrincipalCache()
Definition: InputSource.cc:156
void readAndCacheLumi()
Read next luminosity block.
Definition: InputSource.cc:309
void setRunPrematurelyRead()
Definition: InputSource.h:321
void putOnRead(ConstBranchDescription const &bd, std::auto_ptr< EDProduct > edp, std::auto_ptr< ProductProvenance > productProvenance)
void setRefCoreStreamer(bool resetAll=false)
boost::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:242
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:582
#define FDEBUG(lev)
Definition: DebugMacros.h:18
bool insertMapped(value_type const &v)
boost::shared_ptr< LuminosityBlockPrincipal > const luminosityBlockPrincipal() const
Definition: InputSource.cc:280
std::vector< EventSelectionID > EventSelectionIDVector
std::vector< BranchListIndex > BranchListIndexes
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:302
std::vector< StreamedProduct > SendProds
unsigned int uint32
Definition: MsgTools.h:13
static std::string from(" from ")
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:588
void fillEventPrincipal(std::auto_ptr< EventAuxiliary > aux, boost::shared_ptr< LuminosityBlockPrincipal > lbp, boost::shared_ptr< EventSelectionIDVector > eventSelectionIDs=boost::shared_ptr< EventSelectionIDVector >(new EventSelectionIDVector), boost::shared_ptr< BranchListIndexes > branchListIndexes=boost::shared_ptr< BranchListIndexes >(new BranchListIndexes), boost::shared_ptr< BranchMapper > mapper=boost::shared_ptr< BranchMapper >(new BranchMapper), boost::shared_ptr< DelayedReader > rtrv=boost::shared_ptr< DelayedReader >(new NoDelayedReader))
static Timestamp const & invalidTimestamp()
Definition: Timestamp.cc:83
void setLumiPrematurelyRead()
Definition: InputSource.h:322
boost::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:239
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:301
static ThreadSafeRegistry * instance()
void readAndCacheRun()
Read next run.
Definition: InputSource.cc:285
static unsigned int uncompressBuffer(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize)
std::vector< unsigned char > dest_
std::auto_ptr< SendJobHeader > edm::StreamerInputSource::deserializeRegistry ( InitMsgView const &  initView)
static

Deserializes the specified init message into a SendJobHeader object (which is related to the product registry).

Definition at line 192 of file StreamerInputSource.cc.

References cms::Adler32(), InitMsgView::adler32_chksum(), benchmark_cfg::cerr, InitMsgView::code(), InitMsgView::descData(), InitMsgView::descLength(), edm::hlt::Exception, FDEBUG, edm::getTClass(), InitMsgView::hostName(), Header::INIT, InitMsgView::processName(), processName_, InitMsgView::protocolVersion(), and protocolVersion_.

Referenced by deserializeAndMergeWithRegistry(), and edm::readHeaderFromStream().

192  {
193  if(initView.code() != Header::INIT)
194  throw cms::Exception("StreamTranslation","Registry deserialization error")
195  << "received wrong message type: expected INIT, got "
196  << initView.code() << "\n";
197 
198  //Get the process name and store if for Protocol version 4 and above.
199  if (initView.protocolVersion() > 3) {
200 
201  processName_ = initView.processName();
202  protocolVersion_ = initView.protocolVersion();
203 
204  FDEBUG(10) << "StreamerInputSource::deserializeRegistry processName = "<< processName_<< std::endl;
205  FDEBUG(10) << "StreamerInputSource::deserializeRegistry protocolVersion_= "<< protocolVersion_<< std::endl;
206  }
207 
208  // calculate the adler32 checksum
209  uint32_t adler32_chksum = cms::Adler32((char*)initView.descData(),initView.descLength());
210  //std::cout << "Adler32 checksum of init message = " << adler32_chksum << std::endl;
211  //std::cout << "Adler32 checksum of init messsage from header = " << initView.adler32_chksum() << " "
212  // << "host name = " << initView.hostName() << " len = " << initView.hostName_len() << std::endl;
213  if((uint32)adler32_chksum != initView.adler32_chksum()) {
214  std::cerr << "Error from StreamerInputSource: checksum of Init registry blob failed "
215  << " chksum from registry data = " << adler32_chksum << " from header = "
216  << initView.adler32_chksum() << " host name = " << initView.hostName() << std::endl;
217  // skip event (based on option?) or throw exception?
218  }
219 
220  TClass* desc = getTClass(typeid(SendJobHeader));
221 
222  TBufferFile xbuf(TBuffer::kRead, initView.descLength(),
223  (char*)initView.descData(),kFALSE);
224  RootDebug tracer(10,10);
225  std::auto_ptr<SendJobHeader> sd((SendJobHeader*)xbuf.ReadObjectAny(desc));
226 
227  if(sd.get()==0) {
228  throw cms::Exception("StreamTranslation","Registry deserialization error")
229  << "Could not read the initial product registry list\n";
230  }
231 
232  return sd;
233  }
static std::string processName_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
TClass * getTClass(const std::type_info &ti)
Definition: ClassFiller.cc:87
static unsigned int protocolVersion_
unsigned int uint32
Definition: MsgTools.h:13
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
void edm::StreamerInputSource::fillDescription ( ParameterSetDescription description)
static

Definition at line 467 of file StreamerInputSource.cc.

References edm::InputSource::fillDescription().

Referenced by edm::StreamerFileReader::fillDescriptions().

467  {
468  // The default value for "inputFileTransitionsEachEvent" gets defined in the derived class
469  // as it depends on the derived class. So, we cannot redefine it here.
471  }
static void fillDescription(ParameterSetDescription &desc)
Definition: InputSource.cc:146
InputSource::ItemType edm::StreamerInputSource::getNextItemType ( )
privatevirtual

Implements edm::InputSource.

Definition at line 153 of file StreamerInputSource.cc.

References eventCached_, inputFileTransitionsEachEvent_, edm::InputSource::IsEvent, edm::InputSource::IsFile, edm::InputSource::IsLumi, edm::InputSource::IsRun, edm::InputSource::IsStop, edm::InputSource::luminosityBlockAuxiliary(), newLumi_, newRun_, read(), edm::InputSource::resetLuminosityBlockAuxiliary(), edm::InputSource::resetRunAuxiliary(), edm::InputSource::runAuxiliary(), and runEndingFlag_.

153  {
154  if (runEndingFlag_) {
155  return IsStop;
156  }
157  if(newRun_ && runAuxiliary()) {
158  return IsRun;
159  }
161  return IsLumi;
162  }
163  if (eventCached_) {
164  return IsEvent;
165  }
169  }
170  read();
171  if (!eventCached_) {
172  return IsStop;
173  } else {
174  runEndingFlag_ = false;
176  return IsFile;
177  }
178  }
179  if(newRun_) {
180  return IsRun;
181  } else if(newLumi_) {
182  return IsLumi;
183  }
184  return IsEvent;
185  }
boost::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:242
void resetLuminosityBlockAuxiliary() const
Definition: InputSource.h:306
virtual EventPrincipal * read()=0
void resetRunAuxiliary() const
Definition: InputSource.h:303
boost::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:239
void edm::StreamerInputSource::mergeIntoRegistry ( SendJobHeader const &  header,
ProductRegistry reg,
bool  subsequent 
)
static

Definition at line 76 of file StreamerInputSource.cc.

References edm::SendJobHeader::branchIDLists(), buildClassCache(), declareStreamers(), edm::SendJobHeader::descs(), edm::hlt::Exception, FDEBUG, edm::fillProductRegistryTransients(), edm::loadExtraClasses(), edm::ProductRegistry::merge(), edm::BranchDescription::Permissive, edm::SendJobHeader::processConfigurations(), edm::BranchIDListHelper::updateFromInput(), and edm::ProductRegistry::updateFromInput().

Referenced by deserializeAndMergeWithRegistry(), and edm::getRegFromFile().

76  {
77 
78  SendDescs const& descs = header.descs();
79 
80  FDEBUG(6) << "mergeIntoRegistry: Product List: " << std::endl;
81 
82  if (subsequent) {
83  ProductRegistry pReg;
84  pReg.updateFromInput(descs);
85  fillProductRegistryTransients(header.processConfigurations(), pReg);
86  std::string mergeInfo = reg.merge(pReg, std::string(), BranchDescription::Permissive);
87  if (!mergeInfo.empty()) {
88  throw cms::Exception("MismatchedInput","RootInputFileSequence::previousEvent()") << mergeInfo;
89  }
90  BranchIDListHelper::updateFromInput(header.branchIDLists(), std::string());
91  } else {
92  declareStreamers(descs);
93  buildClassCache(descs);
95  reg.updateFromInput(descs);
96  fillProductRegistryTransients(header.processConfigurations(), reg);
97  BranchIDListHelper::updateFromInput(header.branchIDLists(), std::string());
98  }
99  }
std::vector< BranchDescription > SendDescs
static bool updateFromInput(BranchIDLists const &bidlists, std::string const &fileName)
#define FDEBUG(lev)
Definition: DebugMacros.h:18
void fillProductRegistryTransients(std::vector< ProcessConfiguration > const &pcVec, ProductRegistry const &preg, bool okToRegister=false)
static void declareStreamers(SendDescs const &descs)
static void buildClassCache(SendDescs const &descs)
void loadExtraClasses()
Definition: ClassFiller.cc:47
virtual EventPrincipal* edm::StreamerInputSource::read ( )
privatepure virtual
EventPrincipal * edm::StreamerInputSource::readEvent_ ( )
privatevirtual

Implements edm::InputSource.

Definition at line 144 of file StreamerInputSource.cc.

References eventCached_, edm::InputSource::eventPrincipalCache(), newLumi_, and newRun_.

144  {
145  assert(!newRun_);
146  assert(!newLumi_);
147  assert(eventCached_);
148  eventCached_ = false;
149  return eventPrincipalCache();
150  }
EventPrincipal *const eventPrincipalCache()
Definition: InputSource.cc:156
boost::shared_ptr< FileBlock > edm::StreamerInputSource::readFile_ ( )
privatevirtual

Reimplemented from edm::InputSource.

Definition at line 71 of file StreamerInputSource.cc.

71  {
72  return boost::shared_ptr<FileBlock>(new FileBlock);
73  }
boost::shared_ptr< LuminosityBlockAuxiliary > edm::StreamerInputSource::readLuminosityBlockAuxiliary_ ( )
privatevirtual

Implements edm::InputSource.

Definition at line 135 of file StreamerInputSource.cc.

References edm::InputSource::luminosityBlockAuxiliary(), newLumi_, and newRun_.

135  {
136  assert(!newRun_);
137  assert(newLumi_);
138  assert(luminosityBlockAuxiliary());
139  newLumi_ = false;
140  return luminosityBlockAuxiliary();
141  }
boost::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:242
boost::shared_ptr< RunAuxiliary > edm::StreamerInputSource::readRunAuxiliary_ ( )
privatevirtual

Implements edm::InputSource.

Definition at line 127 of file StreamerInputSource.cc.

References newRun_, and edm::InputSource::runAuxiliary().

127  {
128  assert(newRun_);
129  assert(runAuxiliary());
130  newRun_ = false;
131  return runAuxiliary();
132  }
boost::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:239
void edm::StreamerInputSource::resetAfterEndRun ( )
protected

Definition at line 432 of file StreamerInputSource.cc.

References eventCached_, newLumi_, newRun_, edm::InputSource::reset(), edm::InputSource::resetLuminosityBlockAuxiliary(), edm::InputSource::resetRunAuxiliary(), and runEndingFlag_.

432  {
433  // called from an online streamer source to reset after a stop command
434  // so an enable command will work
437  newRun_ = newLumi_ = true;
438  assert(!eventCached_);
439  reset();
440  runEndingFlag_ = false;
441  }
void resetLuminosityBlockAuxiliary() const
Definition: InputSource.h:306
void resetRunAuxiliary() const
Definition: InputSource.h:303
void reset() const
Definition: InputSource.h:309
void edm::StreamerInputSource::setEndRun ( )
inlineprotected

Definition at line 61 of file StreamerInputSource.h.

References runEndingFlag_.

Referenced by edm::EventStreamHttpReader::read().

void edm::StreamerInputSource::setRun ( RunNumber_t  r)
privatevirtual

Reimplemented from edm::InputSource.

Definition at line 443 of file StreamerInputSource.cc.

References edm::hlt::Exception, and edm::errors::LogicError.

443  {
444  // Need to define a dummy setRun here or else the InputSource::setRun is called
445  // if we have a source inheriting from this and wants to define a setRun method
447  << "StreamerInputSource::setRun()\n"
448  << "Run number cannot be modified for this type of Input Source\n"
449  << "Contact a Storage Manager Developer\n";
450  }
unsigned int edm::StreamerInputSource::uncompressBuffer ( unsigned char *  inputBuffer,
unsigned int  inputSize,
std::vector< unsigned char > &  outputBuffer,
unsigned int  expectedFullSize 
)
static

Uncompresses the data in the specified input buffer into the specified output buffer. The inputSize should be set to the size of the compressed data in the inputBuffer. The expectedFullSize should be set to the original size of the data (before compression). Returns the actual size of the uncompressed data. Errors are reported by throwing exceptions.

Definition at line 397 of file StreamerInputSource.cc.

References benchmark_cfg::cerr, edm::hlt::Exception, FDEBUG, and runTheMatrix::ret.

Referenced by edm::StreamDQMDeserializer::deserializeDQMEvent(), and deserializeEvent().

400  {
401  unsigned long origSize = expectedFullSize;
402  unsigned long uncompressedSize = expectedFullSize*1.1;
403  FDEBUG(1) << "Uncompress: original size = " << origSize
404  << ", compressed size = " << inputSize
405  << std::endl;
406  outputBuffer.resize(uncompressedSize);
407  int ret = uncompress(&outputBuffer[0], &uncompressedSize,
408  inputBuffer, inputSize); // do not need compression level
409  //std::cout << "unCompress Return value: " << ret << " Okay = " << Z_OK << std::endl;
410  if(ret == Z_OK) {
411  // check the length against original uncompressed length
412  FDEBUG(10) << " original size = " << origSize << " final size = "
413  << uncompressedSize << std::endl;
414  if(origSize != uncompressedSize) {
415  std::cerr << "deserializeEvent: Problem with uncompress, original size = "
416  << origSize << " uncompress size = " << uncompressedSize << std::endl;
417  // we throw an error and return without event! null pointer
418  throw cms::Exception("StreamDeserialization","Uncompression error")
419  << "mismatch event lengths should be" << origSize << " got "
420  << uncompressedSize << "\n";
421  }
422  } else {
423  // we throw an error and return without event! null pointer
424  std::cerr << "deserializeEvent: Problem with uncompress, return value = "
425  << ret << std::endl;
426  throw cms::Exception("StreamDeserialization","Uncompression error")
427  << "Error code = " << ret << "\n ";
428  }
429  return (unsigned int) uncompressedSize;
430  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18

Member Data Documentation

std::vector<unsigned char> edm::StreamerInputSource::dest_
private

Definition at line 102 of file StreamerInputSource.h.

Referenced by deserializeEvent().

bool edm::StreamerInputSource::eventCached_
private
bool edm::StreamerInputSource::inputFileTransitionsEachEvent_
protected

Definition at line 64 of file StreamerInputSource.h.

Referenced by getNextItemType().

bool edm::StreamerInputSource::newLumi_
private
bool edm::StreamerInputSource::newRun_
private
std::string edm::StreamerInputSource::processName_
staticprivate

Definition at line 108 of file StreamerInputSource.h.

Referenced by deserializeRegistry().

ProductGetter edm::StreamerInputSource::productGetter_
private

Definition at line 105 of file StreamerInputSource.h.

Referenced by deserializeEvent().

unsigned int edm::StreamerInputSource::protocolVersion_
staticprivate

Definition at line 109 of file StreamerInputSource.h.

Referenced by deserializeRegistry().

bool edm::StreamerInputSource::runEndingFlag_
private

Definition at line 104 of file StreamerInputSource.h.

Referenced by getNextItemType(), resetAfterEndRun(), and setEndRun().

TClass* edm::StreamerInputSource::tc_
private

Definition at line 101 of file StreamerInputSource.h.

Referenced by deserializeEvent().

TBufferFile edm::StreamerInputSource::xbuf_
private

Definition at line 103 of file StreamerInputSource.h.

Referenced by deserializeEvent().