CMS 3D CMS Logo

All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
StreamerInputSource.cc
Go to the documentation of this file.
2 
6 
18 
19 #include "zlib.h"
20 #include "lzma.h"
21 #include "zstd.h"
22 
31 
35 
36 #include <string>
37 #include <iostream>
38 #include <set>
39 
40 namespace edm {
41  namespace {
42  int const init_size = 1024 * 1024;
43  }
44 
46  : RawInputSource(pset, desc),
47  tc_(getTClass(typeid(SendEvent))),
48  dest_(init_size),
49  xbuf_(TBuffer::kRead, init_size),
50  sendEvent_(),
51  eventPrincipalHolder_(),
52  adjustEventToNewProductRegistry_(false),
53  processName_(),
54  protocolVersion_(0U) {}
55 
57 
58  // ---------------------------------------
59  std::unique_ptr<FileBlock> StreamerInputSource::readFile_() { return std::make_unique<FileBlock>(); }
60 
62  ProductRegistry& reg,
64  ThinnedAssociationsHelper& thinnedHelper,
65  bool subsequent) {
66  SendDescs const& descs = header.descs();
67 
68  FDEBUG(6) << "mergeIntoRegistry: Product List: " << std::endl;
69 
70  if (subsequent) {
71  ProductRegistry pReg;
72  pReg.updateFromInput(descs);
74  if (!mergeInfo.empty()) {
75  throw cms::Exception("MismatchedInput", "RootInputFileSequence::previousEvent()") << mergeInfo;
76  }
77  branchIDListHelper.updateFromInput(header.branchIDLists());
78  thinnedHelper.updateFromPrimaryInput(header.thinnedAssociationsHelper());
79  } else {
80  declareStreamers(descs);
81  buildClassCache(descs);
83  if (!reg.frozen()) {
84  reg.updateFromInput(descs);
85  }
86  branchIDListHelper.updateFromInput(header.branchIDLists());
87  thinnedHelper.updateFromPrimaryInput(header.thinnedAssociationsHelper());
88  }
89  }
90 
92  std::vector<std::string> missingDictionaries;
93  std::vector<std::string> branchNamesForMissing;
94  std::vector<std::string> producedTypes;
95  for (auto const& item : descs) {
96  //pi->init();
97  std::string const real_name = wrappedClassName(item.className());
98  FDEBUG(6) << "declare: " << real_name << std::endl;
99  if (!loadCap(real_name, missingDictionaries)) {
100  branchNamesForMissing.emplace_back(item.branchName());
101  producedTypes.emplace_back(item.className() + std::string(" (read from input)"));
102  }
103  }
104  if (!missingDictionaries.empty()) {
105  std::string context("Calling StreamerInputSource::declareStreamers, checking dictionaries for input types");
106  throwMissingDictionariesException(missingDictionaries, context, producedTypes, branchNamesForMissing, true);
107  }
108  }
109 
111  for (auto const& item : descs) {
112  //pi->init();
113  std::string const real_name = wrappedClassName(item.className());
114  FDEBUG(6) << "BuildReadData: " << real_name << std::endl;
115  doBuildRealData(real_name);
116  }
117  }
118 
123  std::unique_ptr<SendJobHeader> StreamerInputSource::deserializeRegistry(InitMsgView const& initView) {
124  if (initView.code() != Header::INIT)
125  throw cms::Exception("StreamTranslation", "Registry deserialization error")
126  << "received wrong message type: expected INIT, got " << initView.code() << "\n";
127 
128  //Get the process name and store if for Protocol version 4 and above.
129  if (initView.protocolVersion() > 3) {
130  processName_ = initView.processName();
131  protocolVersion_ = initView.protocolVersion();
132 
133  FDEBUG(10) << "StreamerInputSource::deserializeRegistry processName = " << processName_ << std::endl;
134  FDEBUG(10) << "StreamerInputSource::deserializeRegistry protocolVersion_= " << protocolVersion_ << std::endl;
135  }
136 
137  // calculate the adler32 checksum
138  uint32_t adler32_chksum = cms::Adler32((char const*)initView.descData(), initView.descLength());
139  //std::cout << "Adler32 checksum of init message = " << adler32_chksum << std::endl;
140  //std::cout << "Adler32 checksum of init messsage from header = " << initView.adler32_chksum() << " "
141  // << "host name = " << initView.hostName() << " len = " << initView.hostName_len() << std::endl;
142  if ((uint32)adler32_chksum != initView.adler32_chksum()) {
143  // skip event (based on option?) or throw exception?
144  throw cms::Exception("StreamDeserialization", "Checksum error")
145  << " chksum from registry data = " << adler32_chksum << " from header = " << initView.adler32_chksum()
146  << " host name = " << initView.hostName() << std::endl;
147  }
148 
149  TClass* desc = getTClass(typeid(SendJobHeader));
150 
151  TBufferFile xbuf(
152  TBuffer::kRead, initView.descLength(), const_cast<char*>((char const*)initView.descData()), kFALSE);
153  RootDebug tracer(10, 10);
154  std::unique_ptr<SendJobHeader> sd((SendJobHeader*)xbuf.ReadObjectAny(desc));
155 
156  if (sd.get() == nullptr) {
157  throw cms::Exception("StreamTranslation", "Registry deserialization error")
158  << "Could not read the initial product registry list\n";
159  }
160 
161  sd->initializeTransients();
162  return sd;
163  }
164 
169  void StreamerInputSource::deserializeAndMergeWithRegistry(InitMsgView const& initView, bool subsequent) {
170  std::unique_ptr<SendJobHeader> sd = deserializeRegistry(initView);
172  if (subsequent) {
174  }
175  SendJobHeader::ParameterSetMap const& psetMap = sd->processParameterSet();
176  pset::Registry& psetRegistry = *pset::Registry::instance();
177  for (auto const& item : psetMap) {
178  ParameterSet pset(item.second.pset());
179  pset.setID(item.first);
180  psetRegistry.insertMapped(pset);
181  }
182  }
183 
188  if (eventView.code() != Header::EVENT)
189  throw cms::Exception("StreamTranslation", "Event deserialization error")
190  << "received wrong message type: expected EVENT, got " << eventView.code() << "\n";
191  FDEBUG(9) << "Decode event: " << eventView.event() << " " << eventView.run() << " " << eventView.size() << " "
192  << eventView.adler32_chksum() << " " << eventView.eventLength() << " " << eventView.eventData()
193  << std::endl;
194  // uncompress if we need to
195  // 78 was a dummy value (for no uncompressed) - should be 0 for uncompressed
196  // need to get rid of this when 090 MTCC streamers are gotten rid of
197  unsigned long origsize = eventView.origDataSize();
198  unsigned long dest_size; //(should be >= eventView.origDataSize())
199 
200  uint32_t adler32_chksum = cms::Adler32((char const*)eventView.eventData(), eventView.eventLength());
201  //std::cout << "Adler32 checksum of event = " << adler32_chksum << std::endl;
202  //std::cout << "Adler32 checksum from header = " << eventView.adler32_chksum() << " "
203  // << "host name = " << eventView.hostName() << " len = " << eventView.hostName_len() << std::endl;
204  if ((uint32)adler32_chksum != eventView.adler32_chksum()) {
205  // skip event (based on option?) or throw exception?
206  throw cms::Exception("StreamDeserialization", "Checksum error")
207  << " chksum from event = " << adler32_chksum << " from header = " << eventView.adler32_chksum()
208  << " host name = " << eventView.hostName() << std::endl;
209  }
210  if (origsize != 78 && origsize != 0) {
211  // compressed
212  if (isBufferLZMA((unsigned char const*)eventView.eventData(), eventView.eventLength())) {
213  dest_size = uncompressBufferLZMA(const_cast<unsigned char*>((unsigned char const*)eventView.eventData()),
214  eventView.eventLength(),
215  dest_,
216  origsize);
217  } else if (isBufferZSTD((unsigned char const*)eventView.eventData(), eventView.eventLength())) {
218  dest_size = uncompressBufferZSTD(const_cast<unsigned char*>((unsigned char const*)eventView.eventData()),
219  eventView.eventLength(),
220  dest_,
221  origsize);
222  } else
223  dest_size = uncompressBuffer(const_cast<unsigned char*>((unsigned char const*)eventView.eventData()),
224  eventView.eventLength(),
225  dest_,
226  origsize);
227  } else { // not compressed
228  // we need to copy anyway the buffer as we are using dest in xbuf
229  dest_size = eventView.eventLength();
230  dest_.resize(dest_size);
231  unsigned char* pos = (unsigned char*)&dest_[0];
232  unsigned char const* from = (unsigned char const*)eventView.eventData();
233  std::copy(from, from + dest_size, pos);
234  }
235  //TBuffer xbuf(TBuffer::kRead, dest_size,
236  // (char const*) &dest[0],kFALSE);
237  //TBuffer xbuf(TBuffer::kRead, eventView.eventLength(),
238  // (char const*) eventView.eventData(),kFALSE);
239  xbuf_.Reset();
240  xbuf_.SetBuffer(&dest_[0], dest_size, kFALSE);
241  RootDebug tracer(10, 10);
242 
243  //We do not yet know which EventPrincipal we will use, therefore
244  // we are using a new EventPrincipalHolder as a proxy. We need to
245  // make a new one instead of reusing the same one becuase when running
246  // multi-threaded there will be multiple EventPrincipals being used
247  // simultaneously.
248  eventPrincipalHolder_ = std::make_unique<EventPrincipalHolder>(); // propagate_const<T> has no reset() function
250  {
251  std::shared_ptr<void> refCoreStreamerGuard(nullptr, [](void*) {
253  ;
254  });
255  sendEvent_ = std::unique_ptr<SendEvent>((SendEvent*)xbuf_.ReadObjectAny(tc_));
256  }
257 
258  if (sendEvent_.get() == nullptr) {
259  throw cms::Exception("StreamTranslation", "Event deserialization error")
260  << "got a null event from input stream\n";
261  }
263 
264  FDEBUG(5) << "Got event: " << sendEvent_->aux().id() << " " << sendEvent_->products().size() << std::endl;
265  if (runAuxiliary().get() == nullptr || runAuxiliary()->run() != sendEvent_->aux().run()) {
267  new RunAuxiliary(sendEvent_->aux().run(), sendEvent_->aux().time(), Timestamp::invalidTimestamp());
268  runAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id());
269  setRunAuxiliary(runAuxiliary);
271  }
274  runAuxiliary()->run(), eventView.lumi(), sendEvent_->aux().time(), Timestamp::invalidTimestamp());
275  luminosityBlockAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id());
276  setLuminosityBlockAuxiliary(luminosityBlockAuxiliary);
277  }
278  setEventCached();
279  }
280 
284  bool eventOK = eventPrincipal.adjustToNewProductRegistry(*productRegistry());
285  assert(eventOK);
287  }
288  EventSelectionIDVector ids(sendEvent_->eventSelectionIDs());
289  BranchListIndexes indexes(sendEvent_->branchListIndexes());
290  branchIDListHelper()->fixBranchListIndexes(indexes);
291  eventPrincipal.fillEventPrincipal(sendEvent_->aux(), processHistoryRegistry(), std::move(ids), std::move(indexes));
292 
293  //We now know which eventPrincipal to use and we can reuse the slot in
294  // streamToEventPrincipalHolders to own the memory
295  eventPrincipalHolder_->setEventPrincipal(&eventPrincipal);
296  if (streamToEventPrincipalHolders_.size() < eventPrincipal.streamID().value() + 1) {
297  streamToEventPrincipalHolders_.resize(eventPrincipal.streamID().value() + 1);
298  }
300 
301  // no process name list handling
302 
303  SendProds& sps = sendEvent_->products();
304  for (auto& spitem : sps) {
305  FDEBUG(10) << "check prodpair" << std::endl;
306  if (spitem.desc() == nullptr)
307  throw cms::Exception("StreamTranslation", "Empty Provenance");
308  FDEBUG(5) << "Prov:"
309  << " " << spitem.desc()->className() << " " << spitem.desc()->productInstanceName() << " "
310  << spitem.desc()->branchID() << std::endl;
311 
312  BranchDescription const branchDesc(*spitem.desc());
313  // This ProductProvenance constructor inserts into the entry description registry
314  if (spitem.parents()) {
315  ProductProvenance productProvenance(spitem.branchID(), *spitem.parents());
316  if (spitem.prod() != nullptr) {
317  FDEBUG(10) << "addproduct next " << spitem.branchID() << std::endl;
318  eventPrincipal.putOnRead(
319  branchDesc, std::unique_ptr<WrapperBase>(const_cast<WrapperBase*>(spitem.prod())), &productProvenance);
320  FDEBUG(10) << "addproduct done" << std::endl;
321  } else {
322  FDEBUG(10) << "addproduct empty next " << spitem.branchID() << std::endl;
323  eventPrincipal.putOnRead(branchDesc, std::unique_ptr<WrapperBase>(), &productProvenance);
324  FDEBUG(10) << "addproduct empty done" << std::endl;
325  }
326  } else {
327  ProductProvenance const* productProvenance = nullptr;
328  if (spitem.prod() != nullptr) {
329  FDEBUG(10) << "addproduct next " << spitem.branchID() << std::endl;
330  eventPrincipal.putOnRead(
331  branchDesc, std::unique_ptr<WrapperBase>(const_cast<WrapperBase*>(spitem.prod())), productProvenance);
332  FDEBUG(10) << "addproduct done" << std::endl;
333  } else {
334  FDEBUG(10) << "addproduct empty next " << spitem.branchID() << std::endl;
335  eventPrincipal.putOnRead(branchDesc, std::unique_ptr<WrapperBase>(), productProvenance);
336  FDEBUG(10) << "addproduct empty done" << std::endl;
337  }
338  }
339  spitem.clear();
340  }
341 
342  FDEBUG(10) << "Size = " << eventPrincipal.size() << std::endl;
343  }
344 
353  unsigned int StreamerInputSource::uncompressBuffer(unsigned char* inputBuffer,
354  unsigned int inputSize,
355  std::vector<unsigned char>& outputBuffer,
356  unsigned int expectedFullSize) {
357  unsigned long origSize = expectedFullSize;
358  unsigned long uncompressedSize = expectedFullSize * 1.1;
359  FDEBUG(1) << "Uncompress: original size = " << origSize << ", compressed size = " << inputSize << std::endl;
360  outputBuffer.resize(uncompressedSize);
361  int ret = uncompress(&outputBuffer[0], &uncompressedSize, inputBuffer, inputSize); // do not need compression level
362  //std::cout << "unCompress Return value: " << ret << " Okay = " << Z_OK << std::endl;
363  if (ret == Z_OK) {
364  // check the length against original uncompressed length
365  FDEBUG(10) << " original size = " << origSize << " final size = " << uncompressedSize << std::endl;
366  if (origSize != uncompressedSize) {
367  // we throw an error and return without event! null pointer
368  throw cms::Exception("StreamDeserialization", "Uncompression error")
369  << "mismatch event lengths should be" << origSize << " got " << uncompressedSize << "\n";
370  }
371  } else {
372  // we throw an error and return without event! null pointer
373  throw cms::Exception("StreamDeserialization", "Uncompression error") << "Error code = " << ret << "\n ";
374  }
375  return (unsigned int)uncompressedSize;
376  }
377 
378  bool StreamerInputSource::isBufferLZMA(unsigned char const* inputBuffer, unsigned int inputSize) {
379  if (inputSize >= 4 && !strcmp((const char*)inputBuffer, "XZ"))
380  return true;
381  else
382  return false;
383  }
384 
385  unsigned int StreamerInputSource::uncompressBufferLZMA(unsigned char* inputBuffer,
386  unsigned int inputSize,
387  std::vector<unsigned char>& outputBuffer,
388  unsigned int expectedFullSize,
389  bool hasHeader) {
390  unsigned long origSize = expectedFullSize;
391  unsigned long uncompressedSize = expectedFullSize * 1.1;
392  FDEBUG(1) << "Uncompress: original size = " << origSize << ", compressed size = " << inputSize << std::endl;
393  outputBuffer.resize(uncompressedSize);
394 
395  lzma_stream stream = LZMA_STREAM_INIT;
396  lzma_ret returnStatus;
397 
398  returnStatus = lzma_stream_decoder(&stream, UINT64_MAX, 0U);
399  if (returnStatus != LZMA_OK) {
400  throw cms::Exception("StreamDeserializationLZM", "LZMA stream decoder error")
401  << "Error code = " << returnStatus << "\n ";
402  }
403 
404  size_t hdrSize = hasHeader ? 4 : 0;
405  stream.next_in = (const uint8_t*)(inputBuffer + hdrSize);
406  stream.avail_in = (size_t)(inputSize - hdrSize);
407  stream.next_out = (uint8_t*)&outputBuffer[0];
408  stream.avail_out = (size_t)uncompressedSize;
409 
410  returnStatus = lzma_code(&stream, LZMA_FINISH);
411  if (returnStatus != LZMA_STREAM_END) {
412  lzma_end(&stream);
413  throw cms::Exception("StreamDeserializationLZM", "LZMA uncompression error")
414  << "Error code = " << returnStatus << "\n ";
415  }
416  lzma_end(&stream);
417 
418  uncompressedSize = (unsigned int)stream.total_out;
419 
420  FDEBUG(10) << " original size = " << origSize << " final size = " << uncompressedSize << std::endl;
421  if (origSize != uncompressedSize) {
422  // we throw an error and return without event! null pointer
423  throw cms::Exception("StreamDeserialization", "LZMA uncompression error")
424  << "mismatch event lengths should be" << origSize << " got " << uncompressedSize << "\n";
425  }
426 
427  return uncompressedSize;
428  }
429 
430  bool StreamerInputSource::isBufferZSTD(unsigned char const* inputBuffer, unsigned int inputSize) {
431  if (inputSize >= 4 && !strcmp((const char*)inputBuffer, "ZS"))
432  return true;
433  else
434  return false;
435  }
436 
437  unsigned int StreamerInputSource::uncompressBufferZSTD(unsigned char* inputBuffer,
438  unsigned int inputSize,
439  std::vector<unsigned char>& outputBuffer,
440  unsigned int expectedFullSize,
441  bool hasHeader) {
442  unsigned long uncompressedSize = expectedFullSize * 1.1;
443  FDEBUG(1) << "Uncompress: original size = " << expectedFullSize << ", compressed size = " << inputSize << std::endl;
444  outputBuffer.resize(uncompressedSize);
445 
446  size_t hdrSize = hasHeader ? 4 : 0;
447  size_t ret = ZSTD_decompress(
448  (void*)&(outputBuffer[0]), uncompressedSize, (const void*)(inputBuffer + hdrSize), inputSize - hdrSize);
449 
450  if (ZSTD_isError(ret)) {
451  throw cms::Exception("StreamDeserializationZSTD", "ZSTD uncompression error")
452  << "Error core " << ret << ", message:" << ZSTD_getErrorName(ret);
453  }
454  return (unsigned int)ret;
455  }
456 
458  // called from an online streamer source to reset after a stop command
459  // so an enable command will work
462  assert(!eventCached());
463  reset();
464  }
465 
467  // Need to define a dummy setRun here or else the InputSource::setRun is called
468  // if we have a source inheriting from this and wants to define a setRun method
469  throw Exception(errors::LogicError) << "StreamerInputSource::setRun()\n"
470  << "Run number cannot be modified for this type of Input Source\n"
471  << "Contact a Storage Manager Developer\n";
472  }
473 
475 
477 
479  return eventPrincipal_ ? eventPrincipal_->getIt(id) : nullptr;
480  }
481 
483  unsigned int& index) const {
484  return eventPrincipal_ ? eventPrincipal_->getThinnedProduct(id, index) : nullptr;
485  }
486 
488  std::vector<WrapperBase const*>& wrappers,
489  std::vector<unsigned int>& keys) const {
490  if (eventPrincipal_)
491  eventPrincipal_->getThinnedProducts(pid, wrappers, keys);
492  }
493 
495  assert(eventPrincipal_ != nullptr);
497  }
498 
500 
502 } // namespace edm
void read(EventPrincipal &eventPrincipal) override
ProcessHistoryRegistry const & processHistoryRegistry() const
Accessors for process history registry.
Definition: InputSource.h:155
static void fillDescription(ParameterSetDescription &description)
size_t size() const
Definition: Principal.cc:299
uint32 lumi() const
Definition: EventMessage.cc:81
void throwMissingDictionariesException(std::vector< std::string > &missingDictionaries, std::string const &context)
WrapperBase const * getThinnedProduct(ProductID const &pid, unsigned int &key) const override
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:326
static Timestamp invalidTimestamp()
Definition: Timestamp.h:82
const uint8 * eventData() const
Definition: EventMessage.h:79
const uint8 * descData() const
Definition: InitMessage.h:86
static void mergeIntoRegistry(SendJobHeader const &header, ProductRegistry &, BranchIDListHelper &, ThinnedAssociationsHelper &, bool subsequent)
std::string hostName() const
ret
prodAgent to be discontinued
std::vector< BranchDescription > SendDescs
void doBuildRealData(const std::string &name)
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
bool isBufferZSTD(unsigned char const *inputBuffer, unsigned int inputSize)
bool isBufferLZMA(unsigned char const *inputBuffer, unsigned int inputSize)
#define nullptr
bool registerProcessHistory(ProcessHistory const &processHistory)
void resetRunAuxiliary(bool isNewRun=true) const
Definition: InputSource.h:337
std::unique_ptr< SendJobHeader > deserializeRegistry(InitMsgView const &initView)
void setRefCoreStreamer(bool resetAll=false)
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:436
StreamerInputSource(ParameterSet const &pset, InputSourceDescription const &desc)
void setRun(RunNumber_t r) override
void updateFromPrimaryInput(ThinnedAssociationsHelper const &)
std::vector< EventSelectionID > EventSelectionIDVector
uint32 run() const
Definition: EventMessage.cc:71
void getThinnedProducts(ProductID const &pid, std::vector< WrapperBase const * > &foundContainers, std::vector< unsigned int > &keys) const override
uint32 adler32_chksum() const
Definition: EventMessage.h:96
uint32 eventLength() const
Definition: EventMessage.h:81
std::unique_ptr< FileBlock > readFile_() override
static void declareStreamers(SendDescs const &descs)
uint32 adler32_chksum() const
Definition: InitMessage.h:89
std::vector< BranchListIndex > BranchListIndexes
static void buildClassCache(SendDescs const &descs)
edm::propagate_const< std::unique_ptr< EventPrincipalHolder > > eventPrincipalHolder_
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:333
uint32 code() const
Definition: EventMessage.h:76
StreamID streamID() const
void deserializeEvent(EventMsgView const &eventView)
std::string hostName() const
Definition: InitMessage.cc:184
void getThinnedProducts(ProductID const &pid, std::vector< WrapperBase const * > &wrappers, std::vector< unsigned int > &keys) const override
std::string merge(ProductRegistry const &other, std::string const &fileName, BranchDescription::MatchMode branchesMustMatch=BranchDescription::Permissive)
bool insertMapped(value_type const &v, bool forceUpdate=false)
Definition: Registry.cc:32
static unsigned int uncompressBufferLZMA(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize, bool hasHeader=true)
void setProcessHistoryID(ProcessHistoryID const &phid)
std::vector< StreamedProduct > SendProds
unsigned int uint32
Definition: MsgTools.h:13
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:358
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
ThinnedAssociationsHelper const & thinnedAssociationsHelper() const
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:441
uint32 protocolVersion() const
Definition: InitMessage.cc:109
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Accessors for branchIDListHelper.
Definition: InputSource.h:159
unsigned int value() const
Definition: StreamID.h:42
double sd
std::string wrappedClassName(std::string const &iFullName)
edm::propagate_const< std::unique_ptr< SendEvent > > sendEvent_
void putOnRead(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, ProductProvenance const *productProvenance) const
bool loadCap(const std::string &name, std::vector< std::string > &missingDictionaries)
uint32 size() const
Definition: EventMessage.h:77
BranchIDLists const & branchIDLists() const
TClass * getTClass(const std::type_info &ti)
uint32 origDataSize() const
Definition: EventMessage.cc:86
unsigned int transitionIndex() const
WrapperBase const * getThinnedProduct(ProductID const &, unsigned int &) const override
bool updateFromInput(BranchIDLists const &bidlists)
WrapperBase const * getIt(ProductID const &id) const override
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:239
edm::propagate_const< TClass * > tc_
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:341
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:327
void fillEventPrincipal(EventAuxiliary const &aux, ProcessHistoryRegistry const &processHistoryRegistry, DelayedReader *reader=0)
HLT enums.
void reset() const
Definition: InputSource.h:345
uint32 code() const
Definition: InitMessage.h:65
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:329
std::shared_ptr< ProductRegistry const > productRegistry() const
Accessors for product registry.
Definition: InputSource.h:151
bool adjustToNewProductRegistry(ProductRegistry const &reg)
Definition: Principal.cc:311
std::vector< edm::propagate_const< std::unique_ptr< EventPrincipalHolder > > > streamToEventPrincipalHolders_
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Accessors for thinnedAssociationsHelper.
Definition: InputSource.h:165
std::string processName() const
Definition: InitMessage.cc:123
void updateFromInput(ProductList const &other)
void setProcessHistoryID(ProcessHistoryID const &phid)
Definition: RunAuxiliary.h:26
unsigned int RunNumber_t
void adjustIndexesAfterProductRegistryAddition()
Definition: Principal.cc:886
static unsigned int uncompressBufferZSTD(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize, bool hasHeader=true)
uint32 descLength() const
Definition: InitMessage.h:85
uint64 event() const
Definition: EventMessage.cc:76
#define FDEBUG(lev)
Definition: DebugMacros.h:19
void deserializeAndMergeWithRegistry(InitMsgView const &initView, bool subsequent=false)
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:242
SendDescs const & descs() const
static void fillDescription(ParameterSetDescription &description)
def move(src, dest)
Definition: eostools.py:511
static unsigned int uncompressBuffer(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize)
WrapperBase const * getIt(ProductID const &pid) const override
static Registry * instance()
Definition: Registry.cc:12
void loadExtraClasses()
Definition: ClassFiller.cc:33
bool eventCached() const
Definition: InputSource.h:356
std::vector< unsigned char > dest_