CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
StreamerInputSource.cc
Go to the documentation of this file.
2 
6 
18 
19 #include "zlib.h"
20 
28 
32 
33 #include <string>
34 #include <iostream>
35 #include <set>
36 
37 namespace edm {
38  namespace {
39  int const init_size = 1024*1024;
40  }
41 
43  ParameterSet const& pset,
44  InputSourceDescription const& desc):
45  RawInputSource(pset, desc),
46  tc_(getTClass(typeid(SendEvent))),
47  dest_(init_size),
48  xbuf_(TBuffer::kRead, init_size),
49  sendEvent_(),
50  eventPrincipalHolder_(),
51  adjustEventToNewProductRegistry_(false),
52  processName_(),
53  protocolVersion_(0U) {
54  }
55 
57 
58  // ---------------------------------------
59  std::unique_ptr<FileBlock>
61  return std::unique_ptr<FileBlock>(new FileBlock);
62  }
63 
64  void
66  ProductRegistry& reg,
67  BranchIDListHelper& branchIDListHelper,
68  ThinnedAssociationsHelper& thinnedHelper,
69  bool subsequent) {
70 
71  SendDescs const& descs = header.descs();
72 
73  FDEBUG(6) << "mergeIntoRegistry: Product List: " << std::endl;
74 
75  if (subsequent) {
76  ProductRegistry pReg;
77  pReg.updateFromInput(descs);
79  if (!mergeInfo.empty()) {
80  throw cms::Exception("MismatchedInput","RootInputFileSequence::previousEvent()") << mergeInfo;
81  }
82  branchIDListHelper.updateFromInput(header.branchIDLists());
83  thinnedHelper.updateFromInput(header.thinnedAssociationsHelper(), false, std::vector<BranchID>());
84  } else {
85  declareStreamers(descs);
86  buildClassCache(descs);
88  if(!reg.frozen()) {
89  reg.updateFromInput(descs);
90  }
91  branchIDListHelper.updateFromInput(header.branchIDLists());
92  thinnedHelper.updateFromInput(header.thinnedAssociationsHelper(), false, std::vector<BranchID>());
93  }
94  }
95 
96  void
98  for(auto const& item : descs) {
99  //pi->init();
100  std::string const real_name = wrappedClassName(item.className());
101  FDEBUG(6) << "declare: " << real_name << std::endl;
102  loadCap(real_name);
103  }
104  }
105 
106 
107  void
109  for(auto const& item : descs) {
110  //pi->init();
111  std::string const real_name = wrappedClassName(item.className());
112  FDEBUG(6) << "BuildReadData: " << real_name << std::endl;
113  doBuildRealData(real_name);
114  }
115  }
116 
121  std::auto_ptr<SendJobHeader>
123  if(initView.code() != Header::INIT)
124  throw cms::Exception("StreamTranslation","Registry deserialization error")
125  << "received wrong message type: expected INIT, got "
126  << initView.code() << "\n";
127 
128  //Get the process name and store if for Protocol version 4 and above.
129  if (initView.protocolVersion() > 3) {
130 
131  processName_ = initView.processName();
132  protocolVersion_ = initView.protocolVersion();
133 
134  FDEBUG(10) << "StreamerInputSource::deserializeRegistry processName = "<< processName_<< std::endl;
135  FDEBUG(10) << "StreamerInputSource::deserializeRegistry protocolVersion_= "<< protocolVersion_<< std::endl;
136  }
137 
138  // calculate the adler32 checksum
139  uint32_t adler32_chksum = cms::Adler32((char const*)initView.descData(),initView.descLength());
140  //std::cout << "Adler32 checksum of init message = " << adler32_chksum << std::endl;
141  //std::cout << "Adler32 checksum of init messsage from header = " << initView.adler32_chksum() << " "
142  // << "host name = " << initView.hostName() << " len = " << initView.hostName_len() << std::endl;
143  if((uint32)adler32_chksum != initView.adler32_chksum()) {
144  std::cerr << "Error from StreamerInputSource: checksum of Init registry blob failed "
145  << " chksum from registry data = " << adler32_chksum << " from header = "
146  << initView.adler32_chksum() << " host name = " << initView.hostName() << std::endl;
147  // skip event (based on option?) or throw exception?
148  }
149 
150  TClass* desc = getTClass(typeid(SendJobHeader));
151 
152  TBufferFile xbuf(TBuffer::kRead, initView.descLength(),
153  const_cast<char*>((char const*)initView.descData()),kFALSE);
154  RootDebug tracer(10,10);
155  std::auto_ptr<SendJobHeader> sd((SendJobHeader*)xbuf.ReadObjectAny(desc));
156 
157  if(sd.get() == nullptr) {
158  throw cms::Exception("StreamTranslation","Registry deserialization error")
159  << "Could not read the initial product registry list\n";
160  }
161 
162  sd->initializeTransients();
163  return sd;
164  }
165 
170  void
172  std::auto_ptr<SendJobHeader> sd = deserializeRegistry(initView);
174  if (subsequent) {
176  }
177  SendJobHeader::ParameterSetMap const& psetMap = sd->processParameterSet();
178  pset::Registry& psetRegistry = *pset::Registry::instance();
179  for (auto const& item : psetMap) {
180  ParameterSet pset(item.second.pset());
181  pset.setID(item.first);
182  psetRegistry.insertMapped(pset);
183  }
184  }
185 
189  void
191  if(eventView.code() != Header::EVENT)
192  throw cms::Exception("StreamTranslation","Event deserialization error")
193  << "received wrong message type: expected EVENT, got "
194  << eventView.code() << "\n";
195  FDEBUG(9) << "Decode event: "
196  << eventView.event() << " "
197  << eventView.run() << " "
198  << eventView.size() << " "
199  << eventView.adler32_chksum() << " "
200  << eventView.eventLength() << " "
201  << eventView.eventData()
202  << std::endl;
203  // uncompress if we need to
204  // 78 was a dummy value (for no uncompressed) - should be 0 for uncompressed
205  // need to get rid of this when 090 MTCC streamers are gotten rid of
206  unsigned long origsize = eventView.origDataSize();
207  unsigned long dest_size; //(should be >= eventView.origDataSize())
208 
209  uint32_t adler32_chksum = cms::Adler32((char const*)eventView.eventData(), eventView.eventLength());
210  //std::cout << "Adler32 checksum of event = " << adler32_chksum << std::endl;
211  //std::cout << "Adler32 checksum from header = " << eventView.adler32_chksum() << " "
212  // << "host name = " << eventView.hostName() << " len = " << eventView.hostName_len() << std::endl;
213  if((uint32)adler32_chksum != eventView.adler32_chksum()) {
214  std::cerr << "Error from StreamerInputSource: checksum of event data blob failed "
215  << " chksum from event = " << adler32_chksum << " from header = "
216  << eventView.adler32_chksum() << " host name = " << eventView.hostName() << std::endl;
217  // skip event (based on option?) or throw exception?
218  }
219  if(origsize != 78 && origsize != 0) {
220  // compressed
221  dest_size = uncompressBuffer(const_cast<unsigned char*>((unsigned char const*)eventView.eventData()),
222  eventView.eventLength(), dest_, origsize);
223  } else { // not compressed
224  // we need to copy anyway the buffer as we are using dest in xbuf
225  dest_size = eventView.eventLength();
226  dest_.resize(dest_size);
227  unsigned char* pos = (unsigned char*) &dest_[0];
228  unsigned char const* from = (unsigned char const*) eventView.eventData();
229  std::copy(from,from+dest_size,pos);
230  }
231  //TBuffer xbuf(TBuffer::kRead, dest_size,
232  // (char const*) &dest[0],kFALSE);
233  //TBuffer xbuf(TBuffer::kRead, eventView.eventLength(),
234  // (char const*) eventView.eventData(),kFALSE);
235  xbuf_.Reset();
236  xbuf_.SetBuffer(&dest_[0],dest_size,kFALSE);
237  RootDebug tracer(10,10);
238 
239  //We do not yet know which EventPrincipal we will use, therefore
240  // we are using a new EventPrincipalHolder as a proxy. We need to
241  // make a new one instead of reusing the same one becuase when running
242  // multi-threaded there will be multiple EventPrincipals being used
243  // simultaneously.
246  sendEvent_ = std::unique_ptr<SendEvent>((SendEvent*)xbuf_.ReadObjectAny(tc_));
248 
249  if(sendEvent_.get() == nullptr) {
250  throw cms::Exception("StreamTranslation","Event deserialization error")
251  << "got a null event from input stream\n";
252  }
253  processHistoryRegistryUpdate().registerProcessHistory(sendEvent_->processHistory());
254 
255  FDEBUG(5) << "Got event: " << sendEvent_->aux().id() << " " << sendEvent_->products().size() << std::endl;
256  if(runAuxiliary().get() == nullptr || runAuxiliary()->run() != sendEvent_->aux().run()) {
257  RunAuxiliary* runAuxiliary = new RunAuxiliary(sendEvent_->aux().run(), sendEvent_->aux().time(), Timestamp::invalidTimestamp());
258  runAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id());
259  setRunAuxiliary(runAuxiliary);
261  }
264  new LuminosityBlockAuxiliary(runAuxiliary()->run(), eventView.lumi(), sendEvent_->aux().time(), Timestamp::invalidTimestamp());
265  luminosityBlockAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id());
266  setLuminosityBlockAuxiliary(luminosityBlockAuxiliary);
267  }
268  setEventCached();
269  }
270 
271  void
275  bool eventOK = eventPrincipal.adjustToNewProductRegistry(*productRegistry());
276  assert(eventOK);
278  }
279  EventSelectionIDVector ids(sendEvent_->eventSelectionIDs());
280  BranchListIndexes indexes(sendEvent_->branchListIndexes());
281  branchIDListHelper()->fixBranchListIndexes(indexes);
282  eventPrincipal.fillEventPrincipal(sendEvent_->aux(), processHistoryRegistry(), std::move(ids), std::move(indexes));
283 
284  //We now know which eventPrincipal to use and we can reuse the slot in
285  // streamToEventPrincipalHolders to own the memory
286  eventPrincipalHolder_->setEventPrincipal(&eventPrincipal);
287  if(streamToEventPrincipalHolders_.size() < eventPrincipal.streamID().value() +1) {
288  streamToEventPrincipalHolders_.resize(eventPrincipal.streamID().value() +1);
289  }
291 
292  // no process name list handling
293 
294  SendProds& sps = sendEvent_->products();
295  for(auto& spitem : sps) {
296  FDEBUG(10) << "check prodpair" << std::endl;
297  if(spitem.desc() == nullptr)
298  throw cms::Exception("StreamTranslation","Empty Provenance");
299  FDEBUG(5) << "Prov:"
300  << " " << spitem.desc()->className()
301  << " " << spitem.desc()->productInstanceName()
302  << " " << spitem.desc()->branchID()
303  << std::endl;
304 
305  BranchDescription const branchDesc(*spitem.desc());
306  // This ProductProvenance constructor inserts into the entry description registry
307  ProductProvenance productProvenance(spitem.branchID(), *spitem.parents());
308 
309  if(spitem.prod() != nullptr) {
310  FDEBUG(10) << "addproduct next " << spitem.branchID() << std::endl;
311  eventPrincipal.putOnRead(branchDesc, std::unique_ptr<WrapperBase>(const_cast<WrapperBase*>(spitem.prod())), productProvenance);
312  FDEBUG(10) << "addproduct done" << std::endl;
313  } else {
314  FDEBUG(10) << "addproduct empty next " << spitem.branchID() << std::endl;
315  eventPrincipal.putOnRead(branchDesc, std::unique_ptr<WrapperBase>(), productProvenance);
316  FDEBUG(10) << "addproduct empty done" << std::endl;
317  }
318  spitem.clear();
319  }
320 
321  FDEBUG(10) << "Size = " << eventPrincipal.size() << std::endl;
322  }
323 
332  unsigned int
333  StreamerInputSource::uncompressBuffer(unsigned char* inputBuffer,
334  unsigned int inputSize,
335  std::vector<unsigned char>& outputBuffer,
336  unsigned int expectedFullSize) {
337  unsigned long origSize = expectedFullSize;
338  unsigned long uncompressedSize = expectedFullSize*1.1;
339  FDEBUG(1) << "Uncompress: original size = " << origSize
340  << ", compressed size = " << inputSize
341  << std::endl;
342  outputBuffer.resize(uncompressedSize);
343  int ret = uncompress(&outputBuffer[0], &uncompressedSize,
344  inputBuffer, inputSize); // do not need compression level
345  //std::cout << "unCompress Return value: " << ret << " Okay = " << Z_OK << std::endl;
346  if(ret == Z_OK) {
347  // check the length against original uncompressed length
348  FDEBUG(10) << " original size = " << origSize << " final size = "
349  << uncompressedSize << std::endl;
350  if(origSize != uncompressedSize) {
351  std::cerr << "deserializeEvent: Problem with uncompress, original size = "
352  << origSize << " uncompress size = " << uncompressedSize << std::endl;
353  // we throw an error and return without event! null pointer
354  throw cms::Exception("StreamDeserialization","Uncompression error")
355  << "mismatch event lengths should be" << origSize << " got "
356  << uncompressedSize << "\n";
357  }
358  } else {
359  // we throw an error and return without event! null pointer
360  std::cerr << "deserializeEvent: Problem with uncompress, return value = "
361  << ret << std::endl;
362  throw cms::Exception("StreamDeserialization","Uncompression error")
363  << "Error code = " << ret << "\n ";
364  }
365  return (unsigned int) uncompressedSize;
366  }
367 
369  // called from an online streamer source to reset after a stop command
370  // so an enable command will work
373  assert(!eventCached());
374  reset();
375  }
376 
378  // Need to define a dummy setRun here or else the InputSource::setRun is called
379  // if we have a source inheriting from this and wants to define a setRun method
381  << "StreamerInputSource::setRun()\n"
382  << "Run number cannot be modified for this type of Input Source\n"
383  << "Contact a Storage Manager Developer\n";
384  }
385 
387 
389 
390  WrapperBase const*
392  return eventPrincipal_ ? eventPrincipal_->getIt(id) : nullptr;
393  }
394 
395  WrapperBase const*
397  return eventPrincipal_ ? eventPrincipal_->getThinnedProduct(id, index) : nullptr;
398  }
399 
400  void
402  std::vector<WrapperBase const*>& wrappers,
403  std::vector<unsigned int>& keys) const {
404  if (eventPrincipal_) eventPrincipal_->getThinnedProducts(pid, wrappers, keys);
405  }
406 
407 
408  unsigned int
410  assert(eventPrincipal_ != nullptr);
411  return eventPrincipal_->transitionIndex();
412  }
413 
414  void
416  eventPrincipal_ = ep;
417  }
418 
419  void
422  }
423 }
void setID(ParameterSetID const &id)
ProcessHistoryRegistry const & processHistoryRegistry() const
Const accessor for process history registry.
Definition: InputSource.h:172
static void fillDescription(ParameterSetDescription &description)
size_t size() const
Definition: Principal.cc:248
uint32 lumi() const
Definition: EventMessage.cc:85
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
const uint8 * eventData() const
Definition: EventMessage.h:82
const uint8 * descData() const
Definition: InitMessage.h:93
static void mergeIntoRegistry(SendJobHeader const &header, ProductRegistry &, BranchIDListHelper &, ThinnedAssociationsHelper &, bool subsequent)
std::string hostName() const
std::vector< BranchDescription > SendDescs
void doBuildRealData(const std::string &name)
Definition: ClassFiller.cc:34
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
assert(m_qm.get())
std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper() const
Accessor for thinnedAssociationsHelper.
Definition: InputSource.h:181
bool registerProcessHistory(ProcessHistory const &processHistory)
void resetRunAuxiliary(bool isNewRun=true) const
Definition: InputSource.h:362
void setRefCoreStreamer(bool resetAll=false)
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:600
StreamerInputSource(ParameterSet const &pset, InputSourceDescription const &desc)
std::unique_ptr< EventPrincipalHolder > eventPrincipalHolder_
#define nullptr
void putOnRead(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance)
#define FDEBUG(lev)
Definition: DebugMacros.h:18
virtual void setRun(RunNumber_t r)
void updateFromInput(ThinnedAssociationsHelper const &, bool isSecondaryFile, std::vector< BranchID > const &associationsFromSecondary)
TClass * getTClass(const std::type_info &ti)
Definition: ClassFiller.cc:79
std::vector< EventSelectionID > EventSelectionIDVector
uint32 run() const
Definition: EventMessage.cc:73
uint32 adler32_chksum() const
Definition: EventMessage.h:99
uint32 eventLength() const
Definition: EventMessage.h:84
static void declareStreamers(SendDescs const &descs)
uint32 adler32_chksum() const
Definition: InitMessage.h:96
virtual WrapperBase const * getIt(ProductID const &id) const override
std::vector< BranchListIndex > BranchListIndexes
static void buildClassCache(SendDescs const &descs)
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:358
std::shared_ptr< BranchIDListHelper > branchIDListHelper() const
Accessor for branchIDListHelper.
Definition: InputSource.h:178
uint32 code() const
Definition: EventMessage.h:79
std::unique_ptr< SendEvent > sendEvent_
def move
Definition: eostools.py:508
StreamID streamID() const
void deserializeEvent(EventMsgView const &eventView)
std::string hostName() const
Definition: InitMessage.cc:190
std::string merge(ProductRegistry const &other, std::string const &fileName, BranchDescription::MatchMode branchesMustMatch=BranchDescription::Permissive)
void setProcessHistoryID(ProcessHistoryID const &phid)
virtual WrapperBase const * getThinnedProduct(ProductID const &, unsigned int &) const override
std::vector< StreamedProduct > SendProds
unsigned int uint32
Definition: MsgTools.h:13
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:385
virtual unsigned int transitionIndex_() const override
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
ThinnedAssociationsHelper const & thinnedAssociationsHelper() const
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:606
uint32 protocolVersion() const
Definition: InitMessage.cc:110
unsigned int value() const
Definition: StreamID.h:46
double sd
ProductRegistry & productRegistryUpdate() const
Definition: InputSource.h:351
std::string wrappedClassName(std::string const &iFullName)
tuple pid
Definition: sysUtil.py:22
uint32 size() const
Definition: EventMessage.h:80
BranchIDLists const & branchIDLists() const
uint32 origDataSize() const
Definition: EventMessage.cc:91
bool updateFromInput(BranchIDLists const &bidlists)
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:263
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:366
virtual void getThinnedProducts(ProductID const &pid, std::vector< WrapperBase const * > &wrappers, std::vector< unsigned int > &keys) const override
void fillEventPrincipal(EventAuxiliary const &aux, ProcessHistoryRegistry const &processHistoryRegistry, DelayedReader *reader=0)
std::auto_ptr< SendJobHeader > deserializeRegistry(InitMsgView const &initView)
void reset() const
Definition: InputSource.h:370
uint32 code() const
Definition: InitMessage.h:72
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:354
std::shared_ptr< ProductRegistry const > productRegistry() const
Accessor for product registry.
Definition: InputSource.h:169
bool adjustToNewProductRegistry(ProductRegistry const &reg)
Definition: Principal.cc:263
ProcessHistoryRegistry & processHistoryRegistryUpdate() const
Definition: InputSource.h:352
std::string processName() const
Definition: InitMessage.cc:127
void updateFromInput(ProductList const &other)
const int init_size
void setProcessHistoryID(ProcessHistoryID const &phid)
Definition: RunAuxiliary.h:36
unsigned int RunNumber_t
void adjustIndexesAfterProductRegistryAddition()
Definition: Principal.cc:853
uint32 descLength() const
Definition: InitMessage.h:92
uint64 event() const
Definition: EventMessage.cc:79
volatile std::atomic< bool > shutdown_flag false
bool insertMapped(value_type const &v)
Definition: Registry.cc:39
void deserializeAndMergeWithRegistry(InitMsgView const &initView, bool subsequent=false)
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:266
SendDescs const & descs() const
static void fillDescription(ParameterSetDescription &description)
static unsigned int uncompressBuffer(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize)
static Registry * instance()
Definition: Registry.cc:16
virtual void read(EventPrincipal &eventPrincipal)
void loadExtraClasses()
Definition: ClassFiller.cc:46
std::vector< std::unique_ptr< EventPrincipalHolder > > streamToEventPrincipalHolders_
virtual std::unique_ptr< FileBlock > readFile_()
bool eventCached() const
Definition: InputSource.h:383
std::vector< unsigned char > dest_
void loadCap(const std::string &name)
Definition: ClassFiller.cc:27