CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
StreamerInputSource.cc
Go to the documentation of this file.
2 
6 
18 
19 #include "zlib.h"
20 
29 
35 
36 #include <string>
37 #include <iostream>
38 #include <set>
39 
40 namespace edm {
41  namespace {
42  int const init_size = 1024*1024;
43  }
44 
47 
48 
50  ParameterSet const& pset,
51  InputSourceDescription const& desc):
52  InputSource(pset, desc),
53  // The default value for the following parameter get defined in at least one derived class
54  // where it has a different default value.
55  inputFileTransitionsEachEvent_(
56  pset.getUntrackedParameter<bool>("inputFileTransitionsEachEvent", false)),
57  newRun_(true),
58  newLumi_(true),
59  eventCached_(false),
60  tc_(getTClass(typeid(SendEvent))),
61  dest_(init_size),
62  xbuf_(TBuffer::kRead, init_size),
63  runEndingFlag_(false),
64  productGetter_() {
65  }
66 
68 
69  // ---------------------------------------
70  boost::shared_ptr<FileBlock>
72  return boost::shared_ptr<FileBlock>(new FileBlock);
73  }
74 
75  void
77 
78  SendDescs const& descs = header.descs();
79 
80  FDEBUG(6) << "mergeIntoRegistry: Product List: " << std::endl;
81 
82  if (subsequent) {
83  ProductRegistry pReg;
84  pReg.updateFromInput(descs);
86  std::string mergeInfo = reg.merge(pReg, std::string(), BranchDescription::Permissive);
87  if (!mergeInfo.empty()) {
88  throw cms::Exception("MismatchedInput","RootInputFileSequence::previousEvent()") << mergeInfo;
89  }
90  BranchIDListHelper::updateFromInput(header.branchIDLists(), std::string());
91  } else {
92  declareStreamers(descs);
93  buildClassCache(descs);
95  reg.updateFromInput(descs);
97  BranchIDListHelper::updateFromInput(header.branchIDLists(), std::string());
98  }
99  }
100 
101  void
103  SendDescs::const_iterator i(descs.begin()), e(descs.end());
104 
105  for(; i != e; ++i) {
106  //pi->init();
107  std::string const real_name = wrappedClassName(i->className());
108  FDEBUG(6) << "declare: " << real_name << std::endl;
109  loadCap(real_name);
110  }
111  }
112 
113 
114  void
116  SendDescs::const_iterator i(descs.begin()), e(descs.end());
117 
118  for(; i != e; ++i) {
119  //pi->init();
120  std::string const real_name = wrappedClassName(i->className());
121  FDEBUG(6) << "BuildReadData: " << real_name << std::endl;
122  doBuildRealData(real_name);
123  }
124  }
125 
126  boost::shared_ptr<RunAuxiliary>
128  assert(newRun_);
129  assert(runAuxiliary());
130  newRun_ = false;
131  return runAuxiliary();
132  }
133 
134  boost::shared_ptr<LuminosityBlockAuxiliary>
136  assert(!newRun_);
137  assert(newLumi_);
138  assert(luminosityBlockAuxiliary());
139  newLumi_ = false;
140  return luminosityBlockAuxiliary();
141  }
142 
145  assert(!newRun_);
146  assert(!newLumi_);
147  assert(eventCached_);
148  eventCached_ = false;
150  return eventPrincipalCache();
151  }
152 
155  if (runEndingFlag_) {
156  return IsStop;
157  }
158  if(newRun_ && runAuxiliary()) {
159  return IsRun;
160  }
162  return IsLumi;
163  }
164  if (eventCached_) {
165  return IsEvent;
166  }
170  }
171  read();
172  if (!eventCached_) {
173  return IsStop;
174  } else {
175  runEndingFlag_ = false;
177  return IsFile;
178  }
179  }
180  if(newRun_) {
181  return IsRun;
182  } else if(newLumi_) {
183  return IsLumi;
184  }
185  return IsEvent;
186  }
187 
192  std::auto_ptr<SendJobHeader>
194  if(initView.code() != Header::INIT)
195  throw cms::Exception("StreamTranslation","Registry deserialization error")
196  << "received wrong message type: expected INIT, got "
197  << initView.code() << "\n";
198 
199  //Get the process name and store if for Protocol version 4 and above.
200  if (initView.protocolVersion() > 3) {
201 
202  processName_ = initView.processName();
203  protocolVersion_ = initView.protocolVersion();
204 
205  FDEBUG(10) << "StreamerInputSource::deserializeRegistry processName = "<< processName_<< std::endl;
206  FDEBUG(10) << "StreamerInputSource::deserializeRegistry protocolVersion_= "<< protocolVersion_<< std::endl;
207  }
208 
209  // calculate the adler32 checksum
210  uint32_t adler32_chksum = cms::Adler32((char*)initView.descData(),initView.descLength());
211  //std::cout << "Adler32 checksum of init message = " << adler32_chksum << std::endl;
212  //std::cout << "Adler32 checksum of init messsage from header = " << initView.adler32_chksum() << " "
213  // << "host name = " << initView.hostName() << " len = " << initView.hostName_len() << std::endl;
214  if((uint32)adler32_chksum != initView.adler32_chksum()) {
215  std::cerr << "Error from StreamerInputSource: checksum of Init registry blob failed "
216  << " chksum from registry data = " << adler32_chksum << " from header = "
217  << initView.adler32_chksum() << " host name = " << initView.hostName() << std::endl;
218  // skip event (based on option?) or throw exception?
219  }
220 
221  TClass* desc = getTClass(typeid(SendJobHeader));
222 
223  TBufferFile xbuf(TBuffer::kRead, initView.descLength(),
224  (char*)initView.descData(),kFALSE);
225  RootDebug tracer(10,10);
226  std::auto_ptr<SendJobHeader> sd((SendJobHeader*)xbuf.ReadObjectAny(desc));
227 
228  if(sd.get()==0) {
229  throw cms::Exception("StreamTranslation","Registry deserialization error")
230  << "Could not read the initial product registry list\n";
231  }
232 
233  return sd;
234  }
235 
240  void
242  std::auto_ptr<SendJobHeader> sd = deserializeRegistry(initView);
243  ProcessConfigurationVector const& pcv = sd->processConfigurations();
244  mergeIntoRegistry(*sd, productRegistryUpdate(), subsequent);
245  if (subsequent) {
247  }
248  SendJobHeader::ParameterSetMap const& psetMap = sd->processParameterSet();
249  pset::Registry& psetRegistry = *pset::Registry::instance();
250  for (SendJobHeader::ParameterSetMap::const_iterator i = psetMap.begin(), iEnd = psetMap.end(); i != iEnd; ++i) {
251  ParameterSet pset(i->second.pset());
252  pset.setID(i->first);
253  psetRegistry.insertMapped(pset);
254  }
256  for (ProcessConfigurationVector::const_iterator it = pcv.begin(), itEnd = pcv.end(); it != itEnd; ++it) {
257  pcReg.insertMapped(*it);
258  }
259  }
260 
266  if(eventView.code() != Header::EVENT)
267  throw cms::Exception("StreamTranslation","Event deserialization error")
268  << "received wrong message type: expected EVENT, got "
269  << eventView.code() << "\n";
270  FDEBUG(9) << "Decode event: "
271  << eventView.event() << " "
272  << eventView.run() << " "
273  << eventView.size() << " "
274  << eventView.adler32_chksum() << " "
275  << eventView.eventLength() << " "
276  << eventView.eventData()
277  << std::endl;
278  EventSourceSentry(*this);
279  // uncompress if we need to
280  // 78 was a dummy value (for no uncompressed) - should be 0 for uncompressed
281  // need to get rid of this when 090 MTCC streamers are gotten rid of
282  unsigned long origsize = eventView.origDataSize();
283  unsigned long dest_size; //(should be >= eventView.origDataSize())
284 
285  uint32_t adler32_chksum = cms::Adler32((char*)eventView.eventData(), eventView.eventLength());
286  //std::cout << "Adler32 checksum of event = " << adler32_chksum << std::endl;
287  //std::cout << "Adler32 checksum from header = " << eventView.adler32_chksum() << " "
288  // << "host name = " << eventView.hostName() << " len = " << eventView.hostName_len() << std::endl;
289  if((uint32)adler32_chksum != eventView.adler32_chksum()) {
290  std::cerr << "Error from StreamerInputSource: checksum of event data blob failed "
291  << " chksum from event = " << adler32_chksum << " from header = "
292  << eventView.adler32_chksum() << " host name = " << eventView.hostName() << std::endl;
293  // skip event (based on option?) or throw exception?
294  }
295  if(origsize != 78 && origsize != 0) {
296  // compressed
297  dest_size = uncompressBuffer((unsigned char*)eventView.eventData(),
298  eventView.eventLength(), dest_, origsize);
299  } else { // not compressed
300  // we need to copy anyway the buffer as we are using dest in xbuf
301  dest_size = eventView.eventLength();
302  dest_.resize(dest_size);
303  unsigned char* pos = (unsigned char*) &dest_[0];
304  unsigned char* from = (unsigned char*) eventView.eventData();
305  std::copy(from,from+dest_size,pos);
306  }
307  //TBuffer xbuf(TBuffer::kRead, dest_size,
308  // (char*) &dest[0],kFALSE);
309  //TBuffer xbuf(TBuffer::kRead, eventView.eventLength(),
310  // (char*) eventView.eventData(),kFALSE);
311  xbuf_.Reset();
312  xbuf_.SetBuffer(&dest_[0],dest_size,kFALSE);
313  RootDebug tracer(10,10);
314 
316  std::auto_ptr<SendEvent> sd((SendEvent*)xbuf_.ReadObjectAny(tc_));
318 
319  if(sd.get()==0) {
320  throw cms::Exception("StreamTranslation","Event deserialization error")
321  << "got a null event from input stream\n";
322  }
323  ProcessHistoryRegistry::instance()->insertMapped(sd->processHistory());
324 
325  FDEBUG(5) << "Got event: " << sd->aux().id() << " " << sd->products().size() << std::endl;
326  if(runAuxiliary().get() == 0 || runAuxiliary()->run() != sd->aux().run()) {
327  newRun_ = newLumi_ = true;
328  RunAuxiliary* runAuxiliary = new RunAuxiliary(sd->aux().run(), sd->aux().time(), Timestamp::invalidTimestamp());
329  runAuxiliary->setProcessHistoryID(sd->processHistory().id());
330  setRunAuxiliary(runAuxiliary);
332  }
335  new LuminosityBlockAuxiliary(runAuxiliary()->run(), eventView.lumi(), sd->aux().time(), Timestamp::invalidTimestamp());
336  luminosityBlockAuxiliary->setProcessHistoryID(sd->processHistory().id());
337  setLuminosityBlockAuxiliary(luminosityBlockAuxiliary);
338  newLumi_ = true;
339  }
340 
341  boost::shared_ptr<EventSelectionIDVector> ids(new EventSelectionIDVector(sd->eventSelectionIDs()));
342  boost::shared_ptr<BranchListIndexes> indexes(new BranchListIndexes(sd->branchListIndexes()));
344  eventPrincipalCache()->fillEventPrincipal(sd->aux(), boost::shared_ptr<LuminosityBlockPrincipal>(), ids, indexes);
346  eventCached_ = true;
347 
348  // no process name list handling
349 
350  SendProds & sps = sd->products();
351  for(SendProds::iterator spi = sps.begin(), spe = sps.end(); spi != spe; ++spi) {
352  FDEBUG(10) << "check prodpair" << std::endl;
353  if(spi->desc() == 0)
354  throw cms::Exception("StreamTranslation","Empty Provenance");
355  FDEBUG(5) << "Prov:"
356  << " " << spi->desc()->className()
357  << " " << spi->desc()->productInstanceName()
358  << " " << spi->desc()->branchID()
359  << std::endl;
360 
361  ConstBranchDescription branchDesc(*spi->desc());
362  // This ProductProvenance constructor inserts into the entry description registry
363  ProductProvenance productProvenance(spi->branchID(), *spi->parents());
364 
365  if(spi->prod() != 0) {
366  FDEBUG(10) << "addgroup next " << spi->branchID() << std::endl;
367  eventPrincipalCache()->putOnRead(branchDesc, spi->prod(), productProvenance);
368  FDEBUG(10) << "addgroup done" << std::endl;
369  } else {
370  FDEBUG(10) << "addgroup empty next " << spi->branchID() << std::endl;
371  eventPrincipalCache()->putOnRead(branchDesc, spi->prod(), productProvenance);
372  FDEBUG(10) << "addgroup empty done" << std::endl;
373  }
374  spi->clear();
375  }
376 
377  FDEBUG(10) << "Size = " << eventPrincipalCache()->size() << std::endl;
378 
379  return eventPrincipalCache();
380  }
381 
390  unsigned int
391  StreamerInputSource::uncompressBuffer(unsigned char* inputBuffer,
392  unsigned int inputSize,
393  std::vector<unsigned char>& outputBuffer,
394  unsigned int expectedFullSize) {
395  unsigned long origSize = expectedFullSize;
396  unsigned long uncompressedSize = expectedFullSize*1.1;
397  FDEBUG(1) << "Uncompress: original size = " << origSize
398  << ", compressed size = " << inputSize
399  << std::endl;
400  outputBuffer.resize(uncompressedSize);
401  int ret = uncompress(&outputBuffer[0], &uncompressedSize,
402  inputBuffer, inputSize); // do not need compression level
403  //std::cout << "unCompress Return value: " << ret << " Okay = " << Z_OK << std::endl;
404  if(ret == Z_OK) {
405  // check the length against original uncompressed length
406  FDEBUG(10) << " original size = " << origSize << " final size = "
407  << uncompressedSize << std::endl;
408  if(origSize != uncompressedSize) {
409  std::cerr << "deserializeEvent: Problem with uncompress, original size = "
410  << origSize << " uncompress size = " << uncompressedSize << std::endl;
411  // we throw an error and return without event! null pointer
412  throw cms::Exception("StreamDeserialization","Uncompression error")
413  << "mismatch event lengths should be" << origSize << " got "
414  << uncompressedSize << "\n";
415  }
416  } else {
417  // we throw an error and return without event! null pointer
418  std::cerr << "deserializeEvent: Problem with uncompress, return value = "
419  << ret << std::endl;
420  throw cms::Exception("StreamDeserialization","Uncompression error")
421  << "Error code = " << ret << "\n ";
422  }
423  return (unsigned int) uncompressedSize;
424  }
425 
427  // called from an online streamer source to reset after a stop command
428  // so an enable command will work
431  newRun_ = newLumi_ = true;
432  assert(!eventCached_);
433  reset();
434  runEndingFlag_ = false;
435  }
436 
438  // Need to define a dummy setRun here or else the InputSource::setRun is called
439  // if we have a source inheriting from this and wants to define a setRun method
441  << "StreamerInputSource::setRun()\n"
442  << "Run number cannot be modified for this type of Input Source\n"
443  << "Contact a Storage Manager Developer\n";
444  }
445 
447 
449 
452  return eventPrincipal_ ? eventPrincipal_->getIt(id) : WrapperHolder();
453  }
454 
455  void
457  eventPrincipal_ = ep;
458  }
459 
460  void
462  // The default value for "inputFileTransitionsEachEvent" gets defined in the derived class
463  // as it depends on the derived class. So, we cannot redefine it here.
465  }
466 }
void setLuminosityBlockPrincipal(boost::shared_ptr< LuminosityBlockPrincipal > const &lbp)
static void fillDescription(ParameterSetDescription &description)
int i
Definition: DBlmapReader.cc:9
size_t size() const
Definition: Principal.cc:146
uint32 lumi() const
Definition: EventMessage.cc:85
virtual WrapperHolder getIt(edm::ProductID const &id) const
const uint8 * eventData() const
Definition: EventMessage.h:78
const uint8 * descData() const
Definition: InitMessage.h:87
static ThreadSafeRegistry * instance()
std::string hostName() const
virtual boost::shared_ptr< LuminosityBlockAuxiliary > readLuminosityBlockAuxiliary_()
std::vector< BranchDescription > SendDescs
void doBuildRealData(const std::string &name)
Definition: ClassFiller.cc:34
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
static bool updateFromInput(BranchIDLists const &bidlists, std::string const &fileName)
virtual EventPrincipal * readEvent_()
static std::string processName_
void setRefCoreStreamer(bool resetAll=false)
boost::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:238
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:606
StreamerInputSource(ParameterSet const &pset, InputSourceDescription const &desc)
std::string merge(ProductRegistry const &other, std::string const &fileName, BranchDescription::MatchMode parametersMustMatch=BranchDescription::Permissive, BranchDescription::MatchMode branchesMustMatch=BranchDescription::Permissive)
#define FDEBUG(lev)
Definition: DebugMacros.h:18
virtual void setRun(RunNumber_t r)
virtual ItemType getNextItemType()
bool insertMapped(value_type const &v)
boost::shared_ptr< LuminosityBlockPrincipal > const luminosityBlockPrincipal() const
Definition: InputSource.cc:281
TClass * getTClass(const std::type_info &ti)
Definition: ClassFiller.cc:87
const int init_size
void fillProductRegistryTransients(std::vector< ProcessConfiguration > const &pcVec, ProductRegistry const &preg, bool okToRegister=false)
static void fixBranchListIndexes(BranchListIndexes &indexes)
std::vector< EventSelectionID > EventSelectionIDVector
uint32 run() const
Definition: EventMessage.cc:73
void setID(ParameterSetID const &id) const
uint32 adler32_chksum() const
Definition: EventMessage.h:95
ProcessConfigurationRegistry::vector_type ProcessConfigurationVector
uint32 eventLength() const
Definition: EventMessage.h:80
void adjustEventToNewProductRegistry(boost::shared_ptr< ProductRegistry const > reg)
static void declareStreamers(SendDescs const &descs)
uint32 adler32_chksum() const
Definition: InitMessage.h:90
void resetLuminosityBlockAuxiliary() const
Definition: InputSource.h:302
virtual EventPrincipal * read()=0
std::vector< BranchListIndex > BranchListIndexes
void resetRunAuxiliary() const
Definition: InputSource.h:299
static void buildClassCache(SendDescs const &descs)
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:298
uint32 code() const
Definition: EventMessage.h:75
std::vector< ProcessConfiguration > const & processConfigurations() const
static unsigned int protocolVersion_
virtual boost::shared_ptr< FileBlock > readFile_()
std::string hostName() const
Definition: InitMessage.cc:186
static void mergeIntoRegistry(SendJobHeader const &header, ProductRegistry &, bool subsequent)
void setProcessHistoryID(ProcessHistoryID const &phid)
uint32 event() const
Definition: EventMessage.cc:79
std::vector< StreamedProduct > SendProds
PrincipalCache const & principalCache() const
Definition: InputSource.h:312
void fillEventPrincipal(EventAuxiliary const &aux, boost::shared_ptr< LuminosityBlockPrincipal > lbp, boost::shared_ptr< EventSelectionIDVector > eventSelectionIDs=boost::shared_ptr< EventSelectionIDVector >(), boost::shared_ptr< BranchListIndexes > branchListIndexes=boost::shared_ptr< BranchListIndexes >(), boost::shared_ptr< BranchMapper > mapper=boost::shared_ptr< BranchMapper >(new BranchMapper), DelayedReader *reader=0)
unsigned int uint32
Definition: MsgTools.h:13
static std::string from(" from ")
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:612
uint32 protocolVersion() const
Definition: InitMessage.cc:106
static Timestamp const & invalidTimestamp()
Definition: Timestamp.cc:83
double sd
ProductRegistry & productRegistryUpdate() const
Definition: InputSource.h:295
std::string wrappedClassName(std::string const &iFullName)
void putOnRead(ConstBranchDescription const &bd, void const *product, ProductProvenance const &productProvenance)
uint32 size() const
Definition: EventMessage.h:76
BranchIDLists const & branchIDLists() const
uint32 origDataSize() const
Definition: EventMessage.cc:91
boost::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:235
EventPrincipal * deserializeEvent(EventMsgView const &eventView)
boost::shared_ptr< ProductRegistry const > productRegistry() const
Accessor for product registry.
Definition: InputSource.h:155
static std::auto_ptr< SendJobHeader > deserializeRegistry(InitMsgView const &initView)
void reset() const
Definition: InputSource.h:305
uint32 code() const
Definition: InitMessage.h:66
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:297
author Stefano ARGIRO author Bill Tanenbaum
virtual boost::shared_ptr< RunAuxiliary > readRunAuxiliary_()
std::string processName() const
Definition: InitMessage.cc:123
void updateFromInput(ProductList const &other)
static void fillDescription(ParameterSetDescription &desc)
Definition: InputSource.cc:129
void setProcessHistoryID(ProcessHistoryID const &phid)
Definition: RunAuxiliary.h:36
unsigned int RunNumber_t
Definition: EventRange.h:32
uint32 descLength() const
Definition: InitMessage.h:86
void deserializeAndMergeWithRegistry(InitMsgView const &initView, bool subsequent=false)
SendDescs const & descs() const
EventPrincipal * eventPrincipalCache()
Definition: InputSource.cc:139
static unsigned int uncompressBuffer(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize)
void loadExtraClasses()
Definition: ClassFiller.cc:47
std::vector< unsigned char > dest_
void loadCap(const std::string &name)
Definition: ClassFiller.cc:22