CMS 3D CMS Logo

StreamerInputSource.cc
Go to the documentation of this file.
2 
6 
18 
19 #include "zlib.h"
20 
29 
33 
34 #include <string>
35 #include <iostream>
36 #include <set>
37 
38 namespace edm {
39  namespace {
40  int const init_size = 1024*1024;
41  }
42 
44  ParameterSet const& pset,
45  InputSourceDescription const& desc):
46  RawInputSource(pset, desc),
47  tc_(getTClass(typeid(SendEvent))),
48  dest_(init_size),
49  xbuf_(TBuffer::kRead, init_size),
50  sendEvent_(),
51  eventPrincipalHolder_(),
52  adjustEventToNewProductRegistry_(false),
53  processName_(),
54  protocolVersion_(0U) {
55  }
56 
58 
59  // ---------------------------------------
60  std::unique_ptr<FileBlock>
62  return std::make_unique<FileBlock>();
63  }
64 
65  void
67  ProductRegistry& reg,
69  ThinnedAssociationsHelper& thinnedHelper,
70  bool subsequent) {
71 
72  SendDescs const& descs = header.descs();
73 
74  FDEBUG(6) << "mergeIntoRegistry: Product List: " << std::endl;
75 
76  if (subsequent) {
77  ProductRegistry pReg;
78  pReg.updateFromInput(descs);
80  if (!mergeInfo.empty()) {
81  throw cms::Exception("MismatchedInput","RootInputFileSequence::previousEvent()") << mergeInfo;
82  }
83  branchIDListHelper.updateFromInput(header.branchIDLists());
84  thinnedHelper.updateFromPrimaryInput(header.thinnedAssociationsHelper());
85  } else {
86  declareStreamers(descs);
87  buildClassCache(descs);
89  if(!reg.frozen()) {
90  reg.updateFromInput(descs);
91  }
92  branchIDListHelper.updateFromInput(header.branchIDLists());
93  thinnedHelper.updateFromPrimaryInput(header.thinnedAssociationsHelper());
94  }
95  }
96 
97  void
99  std::vector<std::string> missingDictionaries;
100  std::vector<std::string> branchNamesForMissing;
101  std::vector<std::string> producedTypes;
102  for (auto const& item : descs) {
103  //pi->init();
104  std::string const real_name = wrappedClassName(item.className());
105  FDEBUG(6) << "declare: " << real_name << std::endl;
106  if (!loadCap(real_name, missingDictionaries)) {
107  branchNamesForMissing.emplace_back(item.branchName());
108  producedTypes.emplace_back(item.className() + std::string(" (read from input)"));
109  }
110  }
111  if (!missingDictionaries.empty()) {
112  std::string context("Calling StreamerInputSource::declareStreamers, checking dictionaries for input types");
113  throwMissingDictionariesException(missingDictionaries, context, producedTypes, branchNamesForMissing, true);
114  }
115  }
116 
117 
118  void
120  for(auto const& item : descs) {
121  //pi->init();
122  std::string const real_name = wrappedClassName(item.className());
123  FDEBUG(6) << "BuildReadData: " << real_name << std::endl;
124  doBuildRealData(real_name);
125  }
126  }
127 
132  std::unique_ptr<SendJobHeader>
134  if(initView.code() != Header::INIT)
135  throw cms::Exception("StreamTranslation","Registry deserialization error")
136  << "received wrong message type: expected INIT, got "
137  << initView.code() << "\n";
138 
139  //Get the process name and store if for Protocol version 4 and above.
140  if (initView.protocolVersion() > 3) {
141 
142  processName_ = initView.processName();
143  protocolVersion_ = initView.protocolVersion();
144 
145  FDEBUG(10) << "StreamerInputSource::deserializeRegistry processName = "<< processName_<< std::endl;
146  FDEBUG(10) << "StreamerInputSource::deserializeRegistry protocolVersion_= "<< protocolVersion_<< std::endl;
147  }
148 
149  // calculate the adler32 checksum
150  uint32_t adler32_chksum = cms::Adler32((char const*)initView.descData(),initView.descLength());
151  //std::cout << "Adler32 checksum of init message = " << adler32_chksum << std::endl;
152  //std::cout << "Adler32 checksum of init messsage from header = " << initView.adler32_chksum() << " "
153  // << "host name = " << initView.hostName() << " len = " << initView.hostName_len() << std::endl;
154  if((uint32)adler32_chksum != initView.adler32_chksum()) {
155  // skip event (based on option?) or throw exception?
156  throw cms::Exception("StreamDeserialization", "Checksum error")
157  << " chksum from registry data = " << adler32_chksum << " from header = "
158  << initView.adler32_chksum() << " host name = " << initView.hostName() << std::endl;
159  }
160 
161  TClass* desc = getTClass(typeid(SendJobHeader));
162 
163  TBufferFile xbuf(TBuffer::kRead, initView.descLength(),
164  const_cast<char*>((char const*)initView.descData()),kFALSE);
165  RootDebug tracer(10,10);
166  std::unique_ptr<SendJobHeader> sd((SendJobHeader*)xbuf.ReadObjectAny(desc));
167 
168  if(sd.get() == nullptr) {
169  throw cms::Exception("StreamTranslation","Registry deserialization error")
170  << "Could not read the initial product registry list\n";
171  }
172 
173  sd->initializeTransients();
174  return sd;
175  }
176 
181  void
183  std::unique_ptr<SendJobHeader> sd = deserializeRegistry(initView);
185  if (subsequent) {
187  }
188  SendJobHeader::ParameterSetMap const& psetMap = sd->processParameterSet();
189  pset::Registry& psetRegistry = *pset::Registry::instance();
190  for (auto const& item : psetMap) {
191  ParameterSet pset(item.second.pset());
192  pset.setID(item.first);
193  psetRegistry.insertMapped(pset);
194  }
195  }
196 
200  void
202  if(eventView.code() != Header::EVENT)
203  throw cms::Exception("StreamTranslation","Event deserialization error")
204  << "received wrong message type: expected EVENT, got "
205  << eventView.code() << "\n";
206  FDEBUG(9) << "Decode event: "
207  << eventView.event() << " "
208  << eventView.run() << " "
209  << eventView.size() << " "
210  << eventView.adler32_chksum() << " "
211  << eventView.eventLength() << " "
212  << eventView.eventData()
213  << std::endl;
214  // uncompress if we need to
215  // 78 was a dummy value (for no uncompressed) - should be 0 for uncompressed
216  // need to get rid of this when 090 MTCC streamers are gotten rid of
217  unsigned long origsize = eventView.origDataSize();
218  unsigned long dest_size; //(should be >= eventView.origDataSize())
219 
220  uint32_t adler32_chksum = cms::Adler32((char const*)eventView.eventData(), eventView.eventLength());
221  //std::cout << "Adler32 checksum of event = " << adler32_chksum << std::endl;
222  //std::cout << "Adler32 checksum from header = " << eventView.adler32_chksum() << " "
223  // << "host name = " << eventView.hostName() << " len = " << eventView.hostName_len() << std::endl;
224  if((uint32)adler32_chksum != eventView.adler32_chksum()) {
225  // skip event (based on option?) or throw exception?
226  throw cms::Exception("StreamDeserialization", "Checksum error")
227  << " chksum from event = " << adler32_chksum << " from header = "
228  << eventView.adler32_chksum() << " host name = " << eventView.hostName() << std::endl;
229  }
230  if(origsize != 78 && origsize != 0) {
231  // compressed
232  dest_size = uncompressBuffer(const_cast<unsigned char*>((unsigned char const*)eventView.eventData()),
233  eventView.eventLength(), dest_, origsize);
234  } else { // not compressed
235  // we need to copy anyway the buffer as we are using dest in xbuf
236  dest_size = eventView.eventLength();
237  dest_.resize(dest_size);
238  unsigned char* pos = (unsigned char*) &dest_[0];
239  unsigned char const* from = (unsigned char const*) eventView.eventData();
240  std::copy(from,from+dest_size,pos);
241  }
242  //TBuffer xbuf(TBuffer::kRead, dest_size,
243  // (char const*) &dest[0],kFALSE);
244  //TBuffer xbuf(TBuffer::kRead, eventView.eventLength(),
245  // (char const*) eventView.eventData(),kFALSE);
246  xbuf_.Reset();
247  xbuf_.SetBuffer(&dest_[0],dest_size,kFALSE);
248  RootDebug tracer(10,10);
249 
250  //We do not yet know which EventPrincipal we will use, therefore
251  // we are using a new EventPrincipalHolder as a proxy. We need to
252  // make a new one instead of reusing the same one becuase when running
253  // multi-threaded there will be multiple EventPrincipals being used
254  // simultaneously.
255  eventPrincipalHolder_ = std::make_unique<EventPrincipalHolder>(); // propagate_const<T> has no reset() function
257  sendEvent_ = std::unique_ptr<SendEvent>((SendEvent*)xbuf_.ReadObjectAny(tc_));
259 
260  if(sendEvent_.get() == nullptr) {
261  throw cms::Exception("StreamTranslation","Event deserialization error")
262  << "got a null event from input stream\n";
263  }
264  processHistoryRegistryForUpdate().registerProcessHistory(sendEvent_->processHistory());
265 
266  FDEBUG(5) << "Got event: " << sendEvent_->aux().id() << " " << sendEvent_->products().size() << std::endl;
267  if(runAuxiliary().get() == nullptr || runAuxiliary()->run() != sendEvent_->aux().run()) {
268  RunAuxiliary* runAuxiliary = new RunAuxiliary(sendEvent_->aux().run(), sendEvent_->aux().time(), Timestamp::invalidTimestamp());
269  runAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id());
270  setRunAuxiliary(runAuxiliary);
272  }
275  new LuminosityBlockAuxiliary(runAuxiliary()->run(), eventView.lumi(), sendEvent_->aux().time(), Timestamp::invalidTimestamp());
276  luminosityBlockAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id());
277  setLuminosityBlockAuxiliary(luminosityBlockAuxiliary);
278  }
279  setEventCached();
280  }
281 
282  void
286  bool eventOK = eventPrincipal.adjustToNewProductRegistry(*productRegistry());
287  assert(eventOK);
289  }
290  EventSelectionIDVector ids(sendEvent_->eventSelectionIDs());
291  BranchListIndexes indexes(sendEvent_->branchListIndexes());
292  branchIDListHelper()->fixBranchListIndexes(indexes);
293  eventPrincipal.fillEventPrincipal(sendEvent_->aux(), processHistoryRegistry(), std::move(ids), std::move(indexes));
294 
295  //We now know which eventPrincipal to use and we can reuse the slot in
296  // streamToEventPrincipalHolders to own the memory
297  eventPrincipalHolder_->setEventPrincipal(&eventPrincipal);
298  if(streamToEventPrincipalHolders_.size() < eventPrincipal.streamID().value() +1) {
299  streamToEventPrincipalHolders_.resize(eventPrincipal.streamID().value() +1);
300  }
302 
303  // no process name list handling
304 
305  SendProds& sps = sendEvent_->products();
306  for(auto& spitem : sps) {
307  FDEBUG(10) << "check prodpair" << std::endl;
308  if(spitem.desc() == nullptr)
309  throw cms::Exception("StreamTranslation","Empty Provenance");
310  FDEBUG(5) << "Prov:"
311  << " " << spitem.desc()->className()
312  << " " << spitem.desc()->productInstanceName()
313  << " " << spitem.desc()->branchID()
314  << std::endl;
315 
316  BranchDescription const branchDesc(*spitem.desc());
317  // This ProductProvenance constructor inserts into the entry description registry
318  if (spitem.parents()) {
319  ProductProvenance productProvenance(spitem.branchID(), *spitem.parents());
320  if(spitem.prod() != nullptr) {
321  FDEBUG(10) << "addproduct next " << spitem.branchID() << std::endl;
322  eventPrincipal.putOnRead(branchDesc, std::unique_ptr<WrapperBase>(const_cast<WrapperBase*>(spitem.prod())), &productProvenance);
323  FDEBUG(10) << "addproduct done" << std::endl;
324  } else {
325  FDEBUG(10) << "addproduct empty next " << spitem.branchID() << std::endl;
326  eventPrincipal.putOnRead(branchDesc, std::unique_ptr<WrapperBase>(), &productProvenance);
327  FDEBUG(10) << "addproduct empty done" << std::endl;
328  }
329  } else {
330  ProductProvenance const* productProvenance = nullptr;
331  if(spitem.prod() != nullptr) {
332  FDEBUG(10) << "addproduct next " << spitem.branchID() << std::endl;
333  eventPrincipal.putOnRead(branchDesc, std::unique_ptr<WrapperBase>(const_cast<WrapperBase*>(spitem.prod())), productProvenance);
334  FDEBUG(10) << "addproduct done" << std::endl;
335  } else {
336  FDEBUG(10) << "addproduct empty next " << spitem.branchID() << std::endl;
337  eventPrincipal.putOnRead(branchDesc, std::unique_ptr<WrapperBase>(), productProvenance);
338  FDEBUG(10) << "addproduct empty done" << std::endl;
339  }
340  }
341  spitem.clear();
342  }
343 
344  FDEBUG(10) << "Size = " << eventPrincipal.size() << std::endl;
345  }
346 
355  unsigned int
356  StreamerInputSource::uncompressBuffer(unsigned char* inputBuffer,
357  unsigned int inputSize,
358  std::vector<unsigned char>& outputBuffer,
359  unsigned int expectedFullSize) {
360  unsigned long origSize = expectedFullSize;
361  unsigned long uncompressedSize = expectedFullSize*1.1;
362  FDEBUG(1) << "Uncompress: original size = " << origSize
363  << ", compressed size = " << inputSize
364  << std::endl;
365  outputBuffer.resize(uncompressedSize);
366  int ret = uncompress(&outputBuffer[0], &uncompressedSize,
367  inputBuffer, inputSize); // do not need compression level
368  //std::cout << "unCompress Return value: " << ret << " Okay = " << Z_OK << std::endl;
369  if(ret == Z_OK) {
370  // check the length against original uncompressed length
371  FDEBUG(10) << " original size = " << origSize << " final size = "
372  << uncompressedSize << std::endl;
373  if(origSize != uncompressedSize) {
374  // we throw an error and return without event! null pointer
375  throw cms::Exception("StreamDeserialization","Uncompression error")
376  << "mismatch event lengths should be" << origSize << " got "
377  << uncompressedSize << "\n";
378  }
379  } else {
380  // we throw an error and return without event! null pointer
381  throw cms::Exception("StreamDeserialization","Uncompression error")
382  << "Error code = " << ret << "\n ";
383  }
384  return (unsigned int) uncompressedSize;
385  }
386 
388  // called from an online streamer source to reset after a stop command
389  // so an enable command will work
392  assert(!eventCached());
393  reset();
394  }
395 
397  // Need to define a dummy setRun here or else the InputSource::setRun is called
398  // if we have a source inheriting from this and wants to define a setRun method
400  << "StreamerInputSource::setRun()\n"
401  << "Run number cannot be modified for this type of Input Source\n"
402  << "Contact a Storage Manager Developer\n";
403  }
404 
406 
408 
409  WrapperBase const*
411  return eventPrincipal_ ? eventPrincipal_->getIt(id) : nullptr;
412  }
413 
414  WrapperBase const*
416  return eventPrincipal_ ? eventPrincipal_->getThinnedProduct(id, index) : nullptr;
417  }
418 
419  void
421  std::vector<WrapperBase const*>& wrappers,
422  std::vector<unsigned int>& keys) const {
423  if (eventPrincipal_) eventPrincipal_->getThinnedProducts(pid, wrappers, keys);
424  }
425 
426 
427  unsigned int
429  assert(eventPrincipal_ != nullptr);
431  }
432 
433  void
435  eventPrincipal_ = ep;
436  }
437 
438  void
441  }
442 }
void read(EventPrincipal &eventPrincipal) override
ProcessHistoryRegistry const & processHistoryRegistry() const
Accessors for process history registry.
Definition: InputSource.h:166
static void fillDescription(ParameterSetDescription &description)
size_t size() const
Definition: Principal.cc:246
uint32 lumi() const
Definition: EventMessage.cc:85
void throwMissingDictionariesException(std::vector< std::string > &missingDictionaries, std::string const &context)
WrapperBase const * getThinnedProduct(ProductID const &pid, unsigned int &key) const override
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:339
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
const uint8 * eventData() const
Definition: EventMessage.h:82
const uint8 * descData() const
Definition: InitMessage.h:93
static void mergeIntoRegistry(SendJobHeader const &header, ProductRegistry &, BranchIDListHelper &, ThinnedAssociationsHelper &, bool subsequent)
std::string hostName() const
std::vector< BranchDescription > SendDescs
void doBuildRealData(const std::string &name)
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
bool registerProcessHistory(ProcessHistory const &processHistory)
void resetRunAuxiliary(bool isNewRun=true) const
Definition: InputSource.h:350
std::unique_ptr< SendJobHeader > deserializeRegistry(InitMsgView const &initView)
void setRefCoreStreamer(bool resetAll=false)
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:504
StreamerInputSource(ParameterSet const &pset, InputSourceDescription const &desc)
void setRun(RunNumber_t r) override
#define nullptr
#define FDEBUG(lev)
Definition: DebugMacros.h:18
void updateFromPrimaryInput(ThinnedAssociationsHelper const &)
std::vector< EventSelectionID > EventSelectionIDVector
uint32 run() const
Definition: EventMessage.cc:73
void getThinnedProducts(ProductID const &pid, std::vector< WrapperBase const * > &foundContainers, std::vector< unsigned int > &keys) const override
uint32 adler32_chksum() const
Definition: EventMessage.h:99
uint32 eventLength() const
Definition: EventMessage.h:84
std::unique_ptr< FileBlock > readFile_() override
static void declareStreamers(SendDescs const &descs)
uint32 adler32_chksum() const
Definition: InitMessage.h:96
std::vector< BranchListIndex > BranchListIndexes
static void buildClassCache(SendDescs const &descs)
edm::propagate_const< std::unique_ptr< EventPrincipalHolder > > eventPrincipalHolder_
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:346
uint32 code() const
Definition: EventMessage.h:79
StreamID streamID() const
void deserializeEvent(EventMsgView const &eventView)
std::string hostName() const
Definition: InitMessage.cc:190
void getThinnedProducts(ProductID const &pid, std::vector< WrapperBase const * > &wrappers, std::vector< unsigned int > &keys) const override
std::string merge(ProductRegistry const &other, std::string const &fileName, BranchDescription::MatchMode branchesMustMatch=BranchDescription::Permissive)
bool insertMapped(value_type const &v, bool forceUpdate=false)
Definition: Registry.cc:35
void setProcessHistoryID(ProcessHistoryID const &phid)
std::vector< StreamedProduct > SendProds
unsigned int uint32
Definition: MsgTools.h:13
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:371
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
ThinnedAssociationsHelper const & thinnedAssociationsHelper() const
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:510
uint32 protocolVersion() const
Definition: InitMessage.cc:110
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Accessors for branchIDListHelper.
Definition: InputSource.h:170
unsigned int value() const
Definition: StreamID.h:46
double sd
std::string wrappedClassName(std::string const &iFullName)
edm::propagate_const< std::unique_ptr< SendEvent > > sendEvent_
void putOnRead(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const *productProvenance) const
bool loadCap(const std::string &name, std::vector< std::string > &missingDictionaries)
uint32 size() const
Definition: EventMessage.h:80
BranchIDLists const & branchIDLists() const
TClass * getTClass(const std::type_info &ti)
uint32 origDataSize() const
Definition: EventMessage.cc:91
unsigned int transitionIndex() const
WrapperBase const * getThinnedProduct(ProductID const &, unsigned int &) const override
bool updateFromInput(BranchIDLists const &bidlists)
WrapperBase const * getIt(ProductID const &id) const override
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:250
edm::propagate_const< TClass * > tc_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:354
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:340
void fillEventPrincipal(EventAuxiliary const &aux, ProcessHistoryRegistry const &processHistoryRegistry, DelayedReader *reader=0)
HLT enums.
void reset() const
Definition: InputSource.h:358
uint32 code() const
Definition: InitMessage.h:72
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:342
std::shared_ptr< ProductRegistry const > productRegistry() const
Accessors for product registry.
Definition: InputSource.h:162
bool adjustToNewProductRegistry(ProductRegistry const &reg)
Definition: Principal.cc:261
std::vector< edm::propagate_const< std::unique_ptr< EventPrincipalHolder > > > streamToEventPrincipalHolders_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Accessors for thinnedAssociationsHelper.
Definition: InputSource.h:174
std::string processName() const
Definition: InitMessage.cc:127
void updateFromInput(ProductList const &other)
const int init_size
void setProcessHistoryID(ProcessHistoryID const &phid)
Definition: RunAuxiliary.h:36
unsigned int RunNumber_t
void adjustIndexesAfterProductRegistryAddition()
Definition: Principal.cc:872
uint32 descLength() const
Definition: InitMessage.h:92
uint64 event() const
Definition: EventMessage.cc:79
void deserializeAndMergeWithRegistry(InitMsgView const &initView, bool subsequent=false)
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:253
SendDescs const & descs() const
static void fillDescription(ParameterSetDescription &description)
def move(src, dest)
Definition: eostools.py:510
static unsigned int uncompressBuffer(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize)
WrapperBase const * getIt(ProductID const &pid) const override
static Registry * instance()
Definition: Registry.cc:12
void loadExtraClasses()
Definition: ClassFiller.cc:34
bool eventCached() const
Definition: InputSource.h:369
std::vector< unsigned char > dest_