CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
StreamerInputSource.cc
Go to the documentation of this file.
2 
6 
17 
18 #include "zlib.h"
19 
27 
31 
32 #include <string>
33 #include <iostream>
34 #include <set>
35 
36 namespace edm {
37  namespace {
38  int const init_size = 1024*1024;
39  }
40 
42  ParameterSet const& pset,
43  InputSourceDescription const& desc):
44  RawInputSource(pset, desc),
45  tc_(getTClass(typeid(SendEvent))),
46  dest_(init_size),
47  xbuf_(TBuffer::kRead, init_size),
48  sendEvent_(),
49  eventPrincipalHolder_(),
50  adjustEventToNewProductRegistry_(false),
51  processName_(),
52  protocolVersion_(0U) {
53  }
54 
56 
57  // ---------------------------------------
58  std::unique_ptr<FileBlock>
60  return std::unique_ptr<FileBlock>(new FileBlock);
61  }
62 
63  void
64  StreamerInputSource::mergeIntoRegistry(SendJobHeader const& header, ProductRegistry& reg, BranchIDListHelper& branchIDListHelper, bool subsequent) {
65 
66  SendDescs const& descs = header.descs();
67 
68  FDEBUG(6) << "mergeIntoRegistry: Product List: " << std::endl;
69 
70  if (subsequent) {
71  ProductRegistry pReg;
72  pReg.updateFromInput(descs);
74  if (!mergeInfo.empty()) {
75  throw cms::Exception("MismatchedInput","RootInputFileSequence::previousEvent()") << mergeInfo;
76  }
77  branchIDListHelper.updateFromInput(header.branchIDLists());
78  } else {
79  declareStreamers(descs);
80  buildClassCache(descs);
82  if(!reg.frozen()) {
83  reg.updateFromInput(descs);
84  }
85  branchIDListHelper.updateFromInput(header.branchIDLists());
86  }
87  }
88 
89  void
91  for(auto const& item : descs) {
92  //pi->init();
93  std::string const real_name = wrappedClassName(item.className());
94  FDEBUG(6) << "declare: " << real_name << std::endl;
95  loadCap(real_name);
96  }
97  }
98 
99 
100  void
102  for(auto const& item : descs) {
103  //pi->init();
104  std::string const real_name = wrappedClassName(item.className());
105  FDEBUG(6) << "BuildReadData: " << real_name << std::endl;
106  doBuildRealData(real_name);
107  }
108  }
109 
114  std::auto_ptr<SendJobHeader>
116  if(initView.code() != Header::INIT)
117  throw cms::Exception("StreamTranslation","Registry deserialization error")
118  << "received wrong message type: expected INIT, got "
119  << initView.code() << "\n";
120 
121  //Get the process name and store if for Protocol version 4 and above.
122  if (initView.protocolVersion() > 3) {
123 
124  processName_ = initView.processName();
125  protocolVersion_ = initView.protocolVersion();
126 
127  FDEBUG(10) << "StreamerInputSource::deserializeRegistry processName = "<< processName_<< std::endl;
128  FDEBUG(10) << "StreamerInputSource::deserializeRegistry protocolVersion_= "<< protocolVersion_<< std::endl;
129  }
130 
131  // calculate the adler32 checksum
132  uint32_t adler32_chksum = cms::Adler32((char const*)initView.descData(),initView.descLength());
133  //std::cout << "Adler32 checksum of init message = " << adler32_chksum << std::endl;
134  //std::cout << "Adler32 checksum of init messsage from header = " << initView.adler32_chksum() << " "
135  // << "host name = " << initView.hostName() << " len = " << initView.hostName_len() << std::endl;
136  if((uint32)adler32_chksum != initView.adler32_chksum()) {
137  std::cerr << "Error from StreamerInputSource: checksum of Init registry blob failed "
138  << " chksum from registry data = " << adler32_chksum << " from header = "
139  << initView.adler32_chksum() << " host name = " << initView.hostName() << std::endl;
140  // skip event (based on option?) or throw exception?
141  }
142 
143  TClass* desc = getTClass(typeid(SendJobHeader));
144 
145  TBufferFile xbuf(TBuffer::kRead, initView.descLength(),
146  const_cast<char*>((char const*)initView.descData()),kFALSE);
147  RootDebug tracer(10,10);
148  std::auto_ptr<SendJobHeader> sd((SendJobHeader*)xbuf.ReadObjectAny(desc));
149 
150  if(sd.get() == nullptr) {
151  throw cms::Exception("StreamTranslation","Registry deserialization error")
152  << "Could not read the initial product registry list\n";
153  }
154 
155  sd->initializeTransients();
156  return sd;
157  }
158 
163  void
165  std::auto_ptr<SendJobHeader> sd = deserializeRegistry(initView);
167  if (subsequent) {
169  }
170  SendJobHeader::ParameterSetMap const& psetMap = sd->processParameterSet();
171  pset::Registry& psetRegistry = *pset::Registry::instance();
172  for (auto const& item : psetMap) {
173  ParameterSet pset(item.second.pset());
174  pset.setID(item.first);
175  psetRegistry.insertMapped(pset);
176  }
177  }
178 
182  void
184  if(eventView.code() != Header::EVENT)
185  throw cms::Exception("StreamTranslation","Event deserialization error")
186  << "received wrong message type: expected EVENT, got "
187  << eventView.code() << "\n";
188  FDEBUG(9) << "Decode event: "
189  << eventView.event() << " "
190  << eventView.run() << " "
191  << eventView.size() << " "
192  << eventView.adler32_chksum() << " "
193  << eventView.eventLength() << " "
194  << eventView.eventData()
195  << std::endl;
196  // uncompress if we need to
197  // 78 was a dummy value (for no uncompressed) - should be 0 for uncompressed
198  // need to get rid of this when 090 MTCC streamers are gotten rid of
199  unsigned long origsize = eventView.origDataSize();
200  unsigned long dest_size; //(should be >= eventView.origDataSize())
201 
202  uint32_t adler32_chksum = cms::Adler32((char const*)eventView.eventData(), eventView.eventLength());
203  //std::cout << "Adler32 checksum of event = " << adler32_chksum << std::endl;
204  //std::cout << "Adler32 checksum from header = " << eventView.adler32_chksum() << " "
205  // << "host name = " << eventView.hostName() << " len = " << eventView.hostName_len() << std::endl;
206  if((uint32)adler32_chksum != eventView.adler32_chksum()) {
207  std::cerr << "Error from StreamerInputSource: checksum of event data blob failed "
208  << " chksum from event = " << adler32_chksum << " from header = "
209  << eventView.adler32_chksum() << " host name = " << eventView.hostName() << std::endl;
210  // skip event (based on option?) or throw exception?
211  }
212  if(origsize != 78 && origsize != 0) {
213  // compressed
214  dest_size = uncompressBuffer(const_cast<unsigned char*>((unsigned char const*)eventView.eventData()),
215  eventView.eventLength(), dest_, origsize);
216  } else { // not compressed
217  // we need to copy anyway the buffer as we are using dest in xbuf
218  dest_size = eventView.eventLength();
219  dest_.resize(dest_size);
220  unsigned char* pos = (unsigned char*) &dest_[0];
221  unsigned char const* from = (unsigned char const*) eventView.eventData();
222  std::copy(from,from+dest_size,pos);
223  }
224  //TBuffer xbuf(TBuffer::kRead, dest_size,
225  // (char const*) &dest[0],kFALSE);
226  //TBuffer xbuf(TBuffer::kRead, eventView.eventLength(),
227  // (char const*) eventView.eventData(),kFALSE);
228  xbuf_.Reset();
229  xbuf_.SetBuffer(&dest_[0],dest_size,kFALSE);
230  RootDebug tracer(10,10);
231 
233  sendEvent_ = std::unique_ptr<SendEvent>((SendEvent*)xbuf_.ReadObjectAny(tc_));
235 
236  if(sendEvent_.get() == nullptr) {
237  throw cms::Exception("StreamTranslation","Event deserialization error")
238  << "got a null event from input stream\n";
239  }
241 
242  FDEBUG(5) << "Got event: " << sendEvent_->aux().id() << " " << sendEvent_->products().size() << std::endl;
243  if(runAuxiliary().get() == nullptr || runAuxiliary()->run() != sendEvent_->aux().run()) {
245  runAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id());
246  setRunAuxiliary(runAuxiliary);
248  }
251  new LuminosityBlockAuxiliary(runAuxiliary()->run(), eventView.lumi(), sendEvent_->aux().time(), Timestamp::invalidTimestamp());
252  luminosityBlockAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id());
253  setLuminosityBlockAuxiliary(luminosityBlockAuxiliary);
254  }
255  setEventCached();
256  }
257 
258  void
262  bool eventOK = eventPrincipal.adjustToNewProductRegistry(*productRegistry());
263  assert(eventOK);
265  }
266  EventSelectionIDVector ids(sendEvent_->eventSelectionIDs());
267  BranchListIndexes indexes(sendEvent_->branchListIndexes());
268  branchIDListHelper()->fixBranchListIndexes(indexes);
269  eventPrincipal.fillEventPrincipal(sendEvent_->aux(), processHistoryRegistry(), std::move(ids), std::move(indexes));
270  eventPrincipalHolder_.setEventPrincipal(&eventPrincipal);
271 
272  // no process name list handling
273 
274  SendProds& sps = sendEvent_->products();
275  for(auto& spitem : sps) {
276  FDEBUG(10) << "check prodpair" << std::endl;
277  if(spitem.desc() == nullptr)
278  throw cms::Exception("StreamTranslation","Empty Provenance");
279  FDEBUG(5) << "Prov:"
280  << " " << spitem.desc()->className()
281  << " " << spitem.desc()->productInstanceName()
282  << " " << spitem.desc()->branchID()
283  << std::endl;
284 
285  BranchDescription const branchDesc(*spitem.desc());
286  // This ProductProvenance constructor inserts into the entry description registry
287  ProductProvenance productProvenance(spitem.branchID(), *spitem.parents());
288 
289  if(spitem.prod() != nullptr) {
290  FDEBUG(10) << "addproduct next " << spitem.branchID() << std::endl;
291  eventPrincipal.putOnRead(branchDesc, std::unique_ptr<WrapperBase>(const_cast<WrapperBase*>(spitem.prod())), productProvenance);
292  FDEBUG(10) << "addproduct done" << std::endl;
293  } else {
294  FDEBUG(10) << "addproduct empty next " << spitem.branchID() << std::endl;
295  eventPrincipal.putOnRead(branchDesc, std::unique_ptr<WrapperBase>(), productProvenance);
296  FDEBUG(10) << "addproduct empty done" << std::endl;
297  }
298  spitem.clear();
299  }
300 
301  FDEBUG(10) << "Size = " << eventPrincipal.size() << std::endl;
302  }
303 
312  unsigned int
313  StreamerInputSource::uncompressBuffer(unsigned char* inputBuffer,
314  unsigned int inputSize,
315  std::vector<unsigned char>& outputBuffer,
316  unsigned int expectedFullSize) {
317  unsigned long origSize = expectedFullSize;
318  unsigned long uncompressedSize = expectedFullSize*1.1;
319  FDEBUG(1) << "Uncompress: original size = " << origSize
320  << ", compressed size = " << inputSize
321  << std::endl;
322  outputBuffer.resize(uncompressedSize);
323  int ret = uncompress(&outputBuffer[0], &uncompressedSize,
324  inputBuffer, inputSize); // do not need compression level
325  //std::cout << "unCompress Return value: " << ret << " Okay = " << Z_OK << std::endl;
326  if(ret == Z_OK) {
327  // check the length against original uncompressed length
328  FDEBUG(10) << " original size = " << origSize << " final size = "
329  << uncompressedSize << std::endl;
330  if(origSize != uncompressedSize) {
331  std::cerr << "deserializeEvent: Problem with uncompress, original size = "
332  << origSize << " uncompress size = " << uncompressedSize << std::endl;
333  // we throw an error and return without event! null pointer
334  throw cms::Exception("StreamDeserialization","Uncompression error")
335  << "mismatch event lengths should be" << origSize << " got "
336  << uncompressedSize << "\n";
337  }
338  } else {
339  // we throw an error and return without event! null pointer
340  std::cerr << "deserializeEvent: Problem with uncompress, return value = "
341  << ret << std::endl;
342  throw cms::Exception("StreamDeserialization","Uncompression error")
343  << "Error code = " << ret << "\n ";
344  }
345  return (unsigned int) uncompressedSize;
346  }
347 
349  // called from an online streamer source to reset after a stop command
350  // so an enable command will work
353  assert(!eventCached());
354  reset();
355  }
356 
358  // Need to define a dummy setRun here or else the InputSource::setRun is called
359  // if we have a source inheriting from this and wants to define a setRun method
361  << "StreamerInputSource::setRun()\n"
362  << "Run number cannot be modified for this type of Input Source\n"
363  << "Contact a Storage Manager Developer\n";
364  }
365 
367 
369 
370  WrapperBase const*
372  return eventPrincipal_ ? eventPrincipal_->getIt(id) : nullptr;
373  }
374 
375  unsigned int
377  assert(eventPrincipal_ != nullptr);
378  return eventPrincipal_->transitionIndex();
379  }
380 
381  void
383  eventPrincipal_ = ep;
384  }
385 
386  void
389  }
390 }
void setID(ParameterSetID const &id)
ProcessHistoryRegistry const & processHistoryRegistry() const
Const accessor for process history registry.
Definition: InputSource.h:171
static void fillDescription(ParameterSetDescription &description)
size_t size() const
Definition: Principal.cc:238
uint32 lumi() const
Definition: EventMessage.cc:85
static void mergeIntoRegistry(SendJobHeader const &header, ProductRegistry &, BranchIDListHelper &, bool subsequent)
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
const uint8 * eventData() const
Definition: EventMessage.h:80
const uint8 * descData() const
Definition: InitMessage.h:91
std::string hostName() const
std::vector< BranchDescription > SendDescs
void doBuildRealData(const std::string &name)
Definition: ClassFiller.cc:31
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
#define nullptr
bool registerProcessHistory(ProcessHistory const &processHistory)
void resetRunAuxiliary(bool isNewRun=true) const
Definition: InputSource.h:358
void setRefCoreStreamer(bool resetAll=false)
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:599
StreamerInputSource(ParameterSet const &pset, InputSourceDescription const &desc)
void putOnRead(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const &productProvenance)
#define FDEBUG(lev)
Definition: DebugMacros.h:18
virtual void setRun(RunNumber_t r)
TClass * getTClass(const std::type_info &ti)
Definition: ClassFiller.cc:78
const int init_size
std::vector< EventSelectionID > EventSelectionIDVector
uint32 run() const
Definition: EventMessage.cc:73
uint32 adler32_chksum() const
Definition: EventMessage.h:97
uint32 eventLength() const
Definition: EventMessage.h:82
static void declareStreamers(SendDescs const &descs)
uint32 adler32_chksum() const
Definition: InitMessage.h:94
std::vector< BranchListIndex > BranchListIndexes
static void buildClassCache(SendDescs const &descs)
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:354
std::shared_ptr< BranchIDListHelper > branchIDListHelper() const
Accessor for branchIDListHelper.
Definition: InputSource.h:177
uint32 code() const
Definition: EventMessage.h:77
std::unique_ptr< SendEvent > sendEvent_
void deserializeEvent(EventMsgView const &eventView)
std::string hostName() const
Definition: InitMessage.cc:190
std::string merge(ProductRegistry const &other, std::string const &fileName, BranchDescription::MatchMode branchesMustMatch=BranchDescription::Permissive)
void setProcessHistoryID(ProcessHistoryID const &phid)
EventPrincipalHolder eventPrincipalHolder_
virtual WrapperBase const * getIt(edm::ProductID const &id) const override
uint32 event() const
Definition: EventMessage.cc:79
std::vector< StreamedProduct > SendProds
void fillEventPrincipal(EventAuxiliary const &aux, ProcessHistoryRegistry const &processHistoryRegistry, DelayedReader *reader=nullptr)
unsigned int uint32
Definition: MsgTools.h:13
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:381
virtual unsigned int transitionIndex_() const override
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:605
uint32 protocolVersion() const
Definition: InitMessage.cc:110
double sd
ProductRegistry & productRegistryUpdate() const
Definition: InputSource.h:347
std::string wrappedClassName(std::string const &iFullName)
uint32 size() const
Definition: EventMessage.h:78
BranchIDLists const & branchIDLists() const
uint32 origDataSize() const
Definition: EventMessage.cc:91
bool updateFromInput(BranchIDLists const &bidlists)
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:259
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:362
std::auto_ptr< SendJobHeader > deserializeRegistry(InitMsgView const &initView)
void reset() const
Definition: InputSource.h:366
uint32 code() const
Definition: InitMessage.h:70
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:350
std::shared_ptr< ProductRegistry const > productRegistry() const
Accessor for product registry.
Definition: InputSource.h:168
bool adjustToNewProductRegistry(ProductRegistry const &reg)
Definition: Principal.cc:253
ProcessHistoryRegistry & processHistoryRegistryUpdate() const
Definition: InputSource.h:348
std::string processName() const
Definition: InitMessage.cc:127
void updateFromInput(ProductList const &other)
void setProcessHistoryID(ProcessHistoryID const &phid)
Definition: RunAuxiliary.h:36
unsigned int RunNumber_t
Definition: EventRange.h:32
void adjustIndexesAfterProductRegistryAddition()
Definition: Principal.cc:830
uint32 descLength() const
Definition: InitMessage.h:90
volatile std::atomic< bool > shutdown_flag false
bool insertMapped(value_type const &v)
Definition: Registry.cc:37
void deserializeAndMergeWithRegistry(InitMsgView const &initView, bool subsequent=false)
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:262
SendDescs const & descs() const
static void fillDescription(ParameterSetDescription &description)
static unsigned int uncompressBuffer(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize)
static Registry * instance()
Definition: Registry.cc:14
virtual void read(EventPrincipal &eventPrincipal)
void loadExtraClasses()
Definition: ClassFiller.cc:43
virtual std::unique_ptr< FileBlock > readFile_()
bool eventCached() const
Definition: InputSource.h:379
std::vector< unsigned char > dest_
void loadCap(const std::string &name)
Definition: ClassFiller.cc:20