CMS 3D CMS Logo

All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
StreamerInputSource.cc
Go to the documentation of this file.
2 
6 
18 
19 #include "zlib.h"
20 
29 
34 
35 #include <string>
36 #include <iostream>
37 #include <set>
38 
39 namespace edm {
40  namespace {
41  int const init_size = 1024*1024;
42  }
43 
46 
47 
49  ParameterSet const& pset,
50  InputSourceDescription const& desc):
51  RawInputSource(pset, desc),
52  tc_(getTClass(typeid(SendEvent))),
53  dest_(init_size),
54  xbuf_(TBuffer::kRead, init_size),
55  sendEvent_(),
56  productGetter_(),
57  adjustEventToNewProductRegistry_(false) {
58  }
59 
61 
62  // ---------------------------------------
63  std::unique_ptr<FileBlock>
65  return std::unique_ptr<FileBlock>(new FileBlock);
66  }
67 
68  void
69  StreamerInputSource::mergeIntoRegistry(SendJobHeader const& header, ProductRegistry& reg, BranchIDListHelper& branchIDListHelper, bool subsequent) {
70 
71  SendDescs const& descs = header.descs();
72 
73  FDEBUG(6) << "mergeIntoRegistry: Product List: " << std::endl;
74 
75  if (subsequent) {
76  ProductRegistry pReg;
77  pReg.updateFromInput(descs);
80  if (!mergeInfo.empty()) {
81  throw cms::Exception("MismatchedInput","RootInputFileSequence::previousEvent()") << mergeInfo;
82  }
83  branchIDListHelper.updateFromInput(header.branchIDLists());
84  } else {
85  declareStreamers(descs);
86  buildClassCache(descs);
88  reg.updateFromInput(descs);
90  branchIDListHelper.updateFromInput(header.branchIDLists());
91  }
92  }
93 
94  void
96  SendDescs::const_iterator i(descs.begin()), e(descs.end());
97 
98  for(; i != e; ++i) {
99  //pi->init();
100  std::string const real_name = wrappedClassName(i->className());
101  FDEBUG(6) << "declare: " << real_name << std::endl;
102  loadCap(real_name);
103  }
104  }
105 
106 
107  void
109  SendDescs::const_iterator i(descs.begin()), e(descs.end());
110 
111  for(; i != e; ++i) {
112  //pi->init();
113  std::string const real_name = wrappedClassName(i->className());
114  FDEBUG(6) << "BuildReadData: " << real_name << std::endl;
115  doBuildRealData(real_name);
116  }
117  }
118 
123  std::auto_ptr<SendJobHeader>
125  if(initView.code() != Header::INIT)
126  throw cms::Exception("StreamTranslation","Registry deserialization error")
127  << "received wrong message type: expected INIT, got "
128  << initView.code() << "\n";
129 
130  //Get the process name and store if for Protocol version 4 and above.
131  if (initView.protocolVersion() > 3) {
132 
133  processName_ = initView.processName();
134  protocolVersion_ = initView.protocolVersion();
135 
136  FDEBUG(10) << "StreamerInputSource::deserializeRegistry processName = "<< processName_<< std::endl;
137  FDEBUG(10) << "StreamerInputSource::deserializeRegistry protocolVersion_= "<< protocolVersion_<< std::endl;
138  }
139 
140  // calculate the adler32 checksum
141  uint32_t adler32_chksum = cms::Adler32((char*)initView.descData(),initView.descLength());
142  //std::cout << "Adler32 checksum of init message = " << adler32_chksum << std::endl;
143  //std::cout << "Adler32 checksum of init messsage from header = " << initView.adler32_chksum() << " "
144  // << "host name = " << initView.hostName() << " len = " << initView.hostName_len() << std::endl;
145  if((uint32)adler32_chksum != initView.adler32_chksum()) {
146  std::cerr << "Error from StreamerInputSource: checksum of Init registry blob failed "
147  << " chksum from registry data = " << adler32_chksum << " from header = "
148  << initView.adler32_chksum() << " host name = " << initView.hostName() << std::endl;
149  // skip event (based on option?) or throw exception?
150  }
151 
152  TClass* desc = getTClass(typeid(SendJobHeader));
153 
154  TBufferFile xbuf(TBuffer::kRead, initView.descLength(),
155  (char*)initView.descData(),kFALSE);
156  RootDebug tracer(10,10);
157  std::auto_ptr<SendJobHeader> sd((SendJobHeader*)xbuf.ReadObjectAny(desc));
158 
159  if(sd.get()==0) {
160  throw cms::Exception("StreamTranslation","Registry deserialization error")
161  << "Could not read the initial product registry list\n";
162  }
163 
164  return sd;
165  }
166 
171  void
173  std::auto_ptr<SendJobHeader> sd = deserializeRegistry(initView);
174  ProcessConfigurationVector const& pcv = sd->processConfigurations();
176  if (subsequent) {
178  }
179  SendJobHeader::ParameterSetMap const& psetMap = sd->processParameterSet();
180  pset::Registry& psetRegistry = *pset::Registry::instance();
181  for (SendJobHeader::ParameterSetMap::const_iterator i = psetMap.begin(), iEnd = psetMap.end(); i != iEnd; ++i) {
182  ParameterSet pset(i->second.pset());
183  pset.setID(i->first);
184  psetRegistry.insertMapped(pset);
185  }
187  for (ProcessConfigurationVector::const_iterator it = pcv.begin(), itEnd = pcv.end(); it != itEnd; ++it) {
188  pcReg.insertMapped(*it);
189  }
190  }
191 
195  void
197  if(eventView.code() != Header::EVENT)
198  throw cms::Exception("StreamTranslation","Event deserialization error")
199  << "received wrong message type: expected EVENT, got "
200  << eventView.code() << "\n";
201  FDEBUG(9) << "Decode event: "
202  << eventView.event() << " "
203  << eventView.run() << " "
204  << eventView.size() << " "
205  << eventView.adler32_chksum() << " "
206  << eventView.eventLength() << " "
207  << eventView.eventData()
208  << std::endl;
209  EventSourceSentry sentry(*this);
210  // uncompress if we need to
211  // 78 was a dummy value (for no uncompressed) - should be 0 for uncompressed
212  // need to get rid of this when 090 MTCC streamers are gotten rid of
213  unsigned long origsize = eventView.origDataSize();
214  unsigned long dest_size; //(should be >= eventView.origDataSize())
215 
216  uint32_t adler32_chksum = cms::Adler32((char*)eventView.eventData(), eventView.eventLength());
217  //std::cout << "Adler32 checksum of event = " << adler32_chksum << std::endl;
218  //std::cout << "Adler32 checksum from header = " << eventView.adler32_chksum() << " "
219  // << "host name = " << eventView.hostName() << " len = " << eventView.hostName_len() << std::endl;
220  if((uint32)adler32_chksum != eventView.adler32_chksum()) {
221  std::cerr << "Error from StreamerInputSource: checksum of event data blob failed "
222  << " chksum from event = " << adler32_chksum << " from header = "
223  << eventView.adler32_chksum() << " host name = " << eventView.hostName() << std::endl;
224  // skip event (based on option?) or throw exception?
225  }
226  if(origsize != 78 && origsize != 0) {
227  // compressed
228  dest_size = uncompressBuffer((unsigned char*)eventView.eventData(),
229  eventView.eventLength(), dest_, origsize);
230  } else { // not compressed
231  // we need to copy anyway the buffer as we are using dest in xbuf
232  dest_size = eventView.eventLength();
233  dest_.resize(dest_size);
234  unsigned char* pos = (unsigned char*) &dest_[0];
235  unsigned char* from = (unsigned char*) eventView.eventData();
236  std::copy(from,from+dest_size,pos);
237  }
238  //TBuffer xbuf(TBuffer::kRead, dest_size,
239  // (char*) &dest[0],kFALSE);
240  //TBuffer xbuf(TBuffer::kRead, eventView.eventLength(),
241  // (char*) eventView.eventData(),kFALSE);
242  xbuf_.Reset();
243  xbuf_.SetBuffer(&dest_[0],dest_size,kFALSE);
244  RootDebug tracer(10,10);
245 
247  sendEvent_ = std::unique_ptr<SendEvent>((SendEvent*)xbuf_.ReadObjectAny(tc_));
249 
250  if(sendEvent_.get()==0) {
251  throw cms::Exception("StreamTranslation","Event deserialization error")
252  << "got a null event from input stream\n";
253  }
255 
256  FDEBUG(5) << "Got event: " << sendEvent_->aux().id() << " " << sendEvent_->products().size() << std::endl;
257  if(runAuxiliary().get() == 0 || runAuxiliary()->run() != sendEvent_->aux().run()) {
259  runAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id());
260  setRunAuxiliary(runAuxiliary);
262  }
265  new LuminosityBlockAuxiliary(runAuxiliary()->run(), eventView.lumi(), sendEvent_->aux().time(), Timestamp::invalidTimestamp());
266  luminosityBlockAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id());
267  setLuminosityBlockAuxiliary(luminosityBlockAuxiliary);
268  }
269  setEventCached();
270  }
271 
276  bool eventOK = eventPrincipal.adjustToNewProductRegistry(*productRegistry());
277  assert(eventOK);
279  }
280  boost::shared_ptr<EventSelectionIDVector> ids(new EventSelectionIDVector(sendEvent_->eventSelectionIDs()));
281  boost::shared_ptr<BranchListIndexes> indexes(new BranchListIndexes(sendEvent_->branchListIndexes()));
282  branchIDListHelper()->fixBranchListIndexes(*indexes);
283  eventPrincipal.fillEventPrincipal(sendEvent_->aux(), ids, indexes);
284  productGetter_.setEventPrincipal(&eventPrincipal);
285 
286  // no process name list handling
287 
288  SendProds & sps = sendEvent_->products();
289  for(SendProds::iterator spi = sps.begin(), spe = sps.end(); spi != spe; ++spi) {
290  FDEBUG(10) << "check prodpair" << std::endl;
291  if(spi->desc() == 0)
292  throw cms::Exception("StreamTranslation","Empty Provenance");
293  FDEBUG(5) << "Prov:"
294  << " " << spi->desc()->className()
295  << " " << spi->desc()->productInstanceName()
296  << " " << spi->desc()->branchID()
297  << std::endl;
298 
299  ConstBranchDescription branchDesc(*spi->desc());
300  // This ProductProvenance constructor inserts into the entry description registry
301  ProductProvenance productProvenance(spi->branchID(), *spi->parents());
302 
303  if(spi->prod() != 0) {
304  FDEBUG(10) << "addproduct next " << spi->branchID() << std::endl;
305  eventPrincipal.putOnRead(branchDesc, spi->prod(), productProvenance);
306  FDEBUG(10) << "addproduct done" << std::endl;
307  } else {
308  FDEBUG(10) << "addproduct empty next " << spi->branchID() << std::endl;
309  eventPrincipal.putOnRead(branchDesc, spi->prod(), productProvenance);
310  FDEBUG(10) << "addproduct empty done" << std::endl;
311  }
312  spi->clear();
313  }
314 
315  FDEBUG(10) << "Size = " << eventPrincipal.size() << std::endl;
316 
317  return &eventPrincipal;
318  }
319 
328  unsigned int
329  StreamerInputSource::uncompressBuffer(unsigned char* inputBuffer,
330  unsigned int inputSize,
331  std::vector<unsigned char>& outputBuffer,
332  unsigned int expectedFullSize) {
333  unsigned long origSize = expectedFullSize;
334  unsigned long uncompressedSize = expectedFullSize*1.1;
335  FDEBUG(1) << "Uncompress: original size = " << origSize
336  << ", compressed size = " << inputSize
337  << std::endl;
338  outputBuffer.resize(uncompressedSize);
339  int ret = uncompress(&outputBuffer[0], &uncompressedSize,
340  inputBuffer, inputSize); // do not need compression level
341  //std::cout << "unCompress Return value: " << ret << " Okay = " << Z_OK << std::endl;
342  if(ret == Z_OK) {
343  // check the length against original uncompressed length
344  FDEBUG(10) << " original size = " << origSize << " final size = "
345  << uncompressedSize << std::endl;
346  if(origSize != uncompressedSize) {
347  std::cerr << "deserializeEvent: Problem with uncompress, original size = "
348  << origSize << " uncompress size = " << uncompressedSize << std::endl;
349  // we throw an error and return without event! null pointer
350  throw cms::Exception("StreamDeserialization","Uncompression error")
351  << "mismatch event lengths should be" << origSize << " got "
352  << uncompressedSize << "\n";
353  }
354  } else {
355  // we throw an error and return without event! null pointer
356  std::cerr << "deserializeEvent: Problem with uncompress, return value = "
357  << ret << std::endl;
358  throw cms::Exception("StreamDeserialization","Uncompression error")
359  << "Error code = " << ret << "\n ";
360  }
361  return (unsigned int) uncompressedSize;
362  }
363 
365  // called from an online streamer source to reset after a stop command
366  // so an enable command will work
369  assert(!eventCached());
370  reset();
371  }
372 
374  // Need to define a dummy setRun here or else the InputSource::setRun is called
375  // if we have a source inheriting from this and wants to define a setRun method
377  << "StreamerInputSource::setRun()\n"
378  << "Run number cannot be modified for this type of Input Source\n"
379  << "Contact a Storage Manager Developer\n";
380  }
381 
383 
385 
388  return eventPrincipal_ ? eventPrincipal_->getIt(id) : WrapperHolder();
389  }
390 
391  void
393  eventPrincipal_ = ep;
394  }
395 
396  void
399  }
400 }
static void fillDescription(ParameterSetDescription &description)
int i
Definition: DBlmapReader.cc:9
void fillEventPrincipal(EventAuxiliary const &aux, boost::shared_ptr< EventSelectionIDVector > eventSelectionIDs=boost::shared_ptr< EventSelectionIDVector >(), boost::shared_ptr< BranchListIndexes > branchListIndexes=boost::shared_ptr< BranchListIndexes >(), boost::shared_ptr< BranchMapper > mapper=boost::shared_ptr< BranchMapper >(new BranchMapper), DelayedReader *reader=0)
size_t size() const
Definition: Principal.cc:214
uint32 lumi() const
Definition: EventMessage.cc:85
static void mergeIntoRegistry(SendJobHeader const &header, ProductRegistry &, BranchIDListHelper &, bool subsequent)
virtual WrapperHolder getIt(edm::ProductID const &id) const
const uint8 * eventData() const
Definition: EventMessage.h:78
const uint8 * descData() const
Definition: InitMessage.h:87
static ThreadSafeRegistry * instance()
std::string hostName() const
std::vector< BranchDescription > SendDescs
void doBuildRealData(const std::string &name)
Definition: ClassFiller.cc:30
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
void resetRunAuxiliary(bool isNewRun=true) const
Definition: InputSource.h:326
static std::string processName_
void setRefCoreStreamer(bool resetAll=false)
boost::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:247
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:605
StreamerInputSource(ParameterSet const &pset, InputSourceDescription const &desc)
std::string merge(ProductRegistry const &other, std::string const &fileName, BranchDescription::MatchMode parametersMustMatch=BranchDescription::Permissive, BranchDescription::MatchMode branchesMustMatch=BranchDescription::Permissive)
#define FDEBUG(lev)
Definition: DebugMacros.h:18
virtual void setRun(RunNumber_t r)
bool insertMapped(value_type const &v)
TClass * getTClass(const std::type_info &ti)
Definition: ClassFiller.cc:77
const int init_size
void fillProductRegistryTransients(std::vector< ProcessConfiguration > const &pcVec, ProductRegistry const &preg, bool okToRegister=false)
std::vector< EventSelectionID > EventSelectionIDVector
uint32 run() const
Definition: EventMessage.cc:73
void setID(ParameterSetID const &id) const
uint32 adler32_chksum() const
Definition: EventMessage.h:95
ProcessConfigurationRegistry::vector_type ProcessConfigurationVector
uint32 eventLength() const
Definition: EventMessage.h:80
static void declareStreamers(SendDescs const &descs)
uint32 adler32_chksum() const
Definition: InitMessage.h:90
std::vector< BranchListIndex > BranchListIndexes
static void buildClassCache(SendDescs const &descs)
boost::shared_ptr< BranchIDListHelper > branchIDListHelper() const
Accessor for branchIDListHelper.
Definition: InputSource.h:165
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:322
uint32 code() const
Definition: EventMessage.h:75
std::unique_ptr< SendEvent > sendEvent_
std::vector< ProcessConfiguration > const & processConfigurations() const
static unsigned int protocolVersion_
void deserializeEvent(EventMsgView const &eventView)
std::string hostName() const
Definition: InitMessage.cc:186
void setProcessHistoryID(ProcessHistoryID const &phid)
uint32 event() const
Definition: EventMessage.cc:79
std::vector< StreamedProduct > SendProds
virtual EventPrincipal * read(EventPrincipal &eventPrincipal)
unsigned int uint32
Definition: MsgTools.h:13
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:349
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:611
uint32 protocolVersion() const
Definition: InitMessage.cc:106
static Timestamp const & invalidTimestamp()
Definition: Timestamp.cc:83
double sd
ProductRegistry & productRegistryUpdate() const
Definition: InputSource.h:316
std::string wrappedClassName(std::string const &iFullName)
void putOnRead(ConstBranchDescription const &bd, void const *product, ProductProvenance const &productProvenance)
uint32 size() const
Definition: EventMessage.h:76
BranchIDLists const & branchIDLists() const
uint32 origDataSize() const
Definition: EventMessage.cc:91
bool updateFromInput(BranchIDLists const &bidlists)
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:330
boost::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:244
boost::shared_ptr< ProductRegistry const > productRegistry() const
Accessor for product registry.
Definition: InputSource.h:162
static std::auto_ptr< SendJobHeader > deserializeRegistry(InitMsgView const &initView)
void reset() const
Definition: InputSource.h:334
uint32 code() const
Definition: InitMessage.h:66
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:318
bool adjustToNewProductRegistry(ProductRegistry const &reg)
Definition: Principal.cc:229
std::string processName() const
Definition: InitMessage.cc:123
void updateFromInput(ProductList const &other)
void setProcessHistoryID(ProcessHistoryID const &phid)
Definition: RunAuxiliary.h:36
unsigned int RunNumber_t
Definition: EventRange.h:32
void adjustIndexesAfterProductRegistryAddition()
Definition: Principal.cc:785
uint32 descLength() const
Definition: InitMessage.h:86
void deserializeAndMergeWithRegistry(InitMsgView const &initView, bool subsequent=false)
SendDescs const & descs() const
static void fillDescription(ParameterSetDescription &description)
static unsigned int uncompressBuffer(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize)
void loadExtraClasses()
Definition: ClassFiller.cc:42
virtual std::unique_ptr< FileBlock > readFile_()
bool eventCached() const
Definition: InputSource.h:347
std::vector< unsigned char > dest_
void loadCap(const std::string &name)
Definition: ClassFiller.cc:19