CMS 3D CMS Logo

StreamerInputSource.cc
Go to the documentation of this file.
2 
6 
18 
19 #include "zlib.h"
20 #include "lzma.h"
21 #include "zstd.h"
22 
31 
35 
36 #include <string>
37 #include <iostream>
38 #include <set>
39 
40 namespace edm::streamer {
41  namespace {
42  int const init_size = 1024 * 1024;
43  }
44 
47  tc_(getTClass(typeid(SendEvent))),
48  dest_(init_size),
49  xbuf_(TBuffer::kRead, init_size),
50  sendEvent_(),
51  eventPrincipalHolder_(),
52  adjustEventToNewProductRegistry_(false),
53  processName_(),
54  protocolVersion_(0U) {}
55 
57 
58  // ---------------------------------------
60  SendDescs const& descs = header.descs();
61 
62  FDEBUG(6) << "mergeIntoRegistry: Product List: " << std::endl;
63 
64  if (subsequent) {
65  ProductRegistry pReg;
66  pReg.updateFromInput(descs);
68  if (!mergeInfo.empty()) {
69  throw cms::Exception("MismatchedInput", "RootInputFileSequence::previousEvent()") << mergeInfo;
70  }
71  } else {
72  declareStreamers(descs);
73  buildClassCache(descs);
75  if (!reg.frozen()) {
76  reg.updateFromInput(descs);
77  }
78  }
79  }
80 
82  std::vector<std::string> missingDictionaries;
83  std::vector<std::string> branchNamesForMissing;
84  std::vector<std::string> producedTypes;
85  for (auto const& item : descs) {
86  //pi->init();
87  std::string const real_name = wrappedClassName(item.className());
88  FDEBUG(6) << "declare: " << real_name << std::endl;
89  if (!loadCap(real_name, missingDictionaries)) {
90  branchNamesForMissing.emplace_back(item.branchName());
91  producedTypes.emplace_back(item.className() + std::string(" (read from input)"));
92  }
93  }
94  if (!missingDictionaries.empty()) {
95  std::string context("Calling StreamerInputSource::declareStreamers, checking dictionaries for input types");
96  throwMissingDictionariesException(missingDictionaries, context, producedTypes, branchNamesForMissing, true);
97  }
98  }
99 
101  for (auto const& item : descs) {
102  //pi->init();
103  std::string const real_name = wrappedClassName(item.className());
104  FDEBUG(6) << "BuildReadData: " << real_name << std::endl;
105  doBuildRealData(real_name);
106  }
107  }
108 
113  std::unique_ptr<SendJobHeader> StreamerInputSource::deserializeRegistry(InitMsgView const& initView) {
114  if (initView.code() != Header::INIT)
115  throw cms::Exception("StreamTranslation", "Registry deserialization error")
116  << "received wrong message type: expected INIT, got " << initView.code() << "\n";
117 
118  //Get the process name and store if for Protocol version 4 and above.
119  if (initView.protocolVersion() > 3) {
120  processName_ = initView.processName();
121  protocolVersion_ = initView.protocolVersion();
122 
123  FDEBUG(10) << "StreamerInputSource::deserializeRegistry processName = " << processName_ << std::endl;
124  FDEBUG(10) << "StreamerInputSource::deserializeRegistry protocolVersion_= " << protocolVersion_ << std::endl;
125  }
126 
127  // calculate the adler32 checksum
128  uint32_t adler32_chksum = cms::Adler32((char const*)initView.descData(), initView.descLength());
129  //std::cout << "Adler32 checksum of init message = " << adler32_chksum << std::endl;
130  //std::cout << "Adler32 checksum of init messsage from header = " << initView.adler32_chksum() << " "
131  // << "host name = " << initView.hostName() << " len = " << initView.hostName_len() << std::endl;
132  if ((uint32)adler32_chksum != initView.adler32_chksum()) {
133  // skip event (based on option?) or throw exception?
134  throw cms::Exception("StreamDeserialization", "Checksum error")
135  << " chksum from registry data = " << adler32_chksum << " from header = " << initView.adler32_chksum()
136  << " host name = " << initView.hostName() << std::endl;
137  }
138 
139  TClass* desc = getTClass(typeid(SendJobHeader));
140 
141  TBufferFile xbuf(
142  TBuffer::kRead, initView.descLength(), const_cast<char*>((char const*)initView.descData()), kFALSE);
143  RootDebug tracer(10, 10);
144  std::unique_ptr<SendJobHeader> sd((SendJobHeader*)xbuf.ReadObjectAny(desc));
145 
146  if (sd.get() == nullptr) {
147  throw cms::Exception("StreamTranslation", "Registry deserialization error")
148  << "Could not read the initial product registry list\n";
149  }
150 
151  sd->initializeTransients();
152  return sd;
153  }
154 
159  void StreamerInputSource::deserializeAndMergeWithRegistry(InitMsgView const& initView, bool subsequent) {
160  std::unique_ptr<SendJobHeader> sd = deserializeRegistry(initView);
161  mergeIntoRegistry(*sd, productRegistryUpdate(), subsequent);
162  if (subsequent) {
164  }
165  SendJobHeader::ParameterSetMap const& psetMap = sd->processParameterSet();
166  pset::Registry& psetRegistry = *pset::Registry::instance();
167  for (auto const& item : psetMap) {
168  ParameterSet pset(item.second.pset());
169  pset.setID(item.first);
170  psetRegistry.insertMapped(pset);
171  }
172  }
173 
175  branchIDListHelper()->updateFromInput(sendEvent_->branchIDLists());
176  thinnedAssociationsHelper()->updateFromPrimaryInput(sendEvent_->thinnedAssociationsHelper());
177  }
178 
179  uint32_t StreamerInputSource::eventMetaDataChecksum(EventMsgView const& eventView) const {
180  return eventView.adler32_chksum();
181  }
182 
184  deserializeEventCommon(eventView, true);
185  }
190  deserializeEventCommon(eventView, false);
191  }
192 
193  void StreamerInputSource::deserializeEventCommon(EventMsgView const& eventView, bool isMetaData) {
194  if (eventView.code() != Header::EVENT)
195  throw cms::Exception("StreamTranslation", "Event deserialization error")
196  << "received wrong message type: expected EVENT, got " << eventView.code() << "\n";
197  FDEBUG(9) << "Decode event: " << eventView.event() << " " << eventView.run() << " " << eventView.size() << " "
198  << eventView.adler32_chksum() << " " << eventView.eventLength() << " " << eventView.eventData()
199  << std::endl;
200  // uncompress if we need to
201  // 78 was a dummy value (for no uncompressed) - should be 0 for uncompressed
202  // need to get rid of this when 090 MTCC streamers are gotten rid of
203  unsigned long origsize = eventView.origDataSize();
204  unsigned long dest_size; //(should be >= eventView.origDataSize())
205 
206  uint32_t adler32_chksum = cms::Adler32((char const*)eventView.eventData(), eventView.eventLength());
207  //std::cout << "Adler32 checksum of event = " << adler32_chksum << std::endl;
208  //std::cout << "Adler32 checksum from header = " << eventView.adler32_chksum() << " "
209  // << "host name = " << eventView.hostName() << " len = " << eventView.hostName_len() << std::endl;
210  if (static_cast<uint32>(adler32_chksum) != eventView.adler32_chksum()) {
211  // skip event (based on option?) or throw exception?
212  throw cms::Exception("StreamDeserialization", "Checksum error")
213  << " chksum from event = " << adler32_chksum << " from header = " << eventView.adler32_chksum()
214  << " host name = " << eventView.hostName() << std::endl;
215  }
216  if (origsize != 78 && origsize != 0) {
217  // compressed
218  if (isBufferLZMA((unsigned char const*)eventView.eventData(), eventView.eventLength())) {
219  dest_size = uncompressBufferLZMA(const_cast<unsigned char*>((unsigned char const*)eventView.eventData()),
220  eventView.eventLength(),
221  dest_,
222  origsize);
223  } else if (isBufferZSTD((unsigned char const*)eventView.eventData(), eventView.eventLength())) {
224  dest_size = uncompressBufferZSTD(const_cast<unsigned char*>((unsigned char const*)eventView.eventData()),
225  eventView.eventLength(),
226  dest_,
227  origsize);
228  } else
229  dest_size = uncompressBuffer(const_cast<unsigned char*>((unsigned char const*)eventView.eventData()),
230  eventView.eventLength(),
231  dest_,
232  origsize);
233  } else { // not compressed
234  // we need to copy anyway the buffer as we are using dest in xbuf
235  dest_size = eventView.eventLength();
236  dest_.resize(dest_size);
237  unsigned char* pos = (unsigned char*)&dest_[0];
238  unsigned char const* from = (unsigned char const*)eventView.eventData();
239  std::copy(from, from + dest_size, pos);
240  }
241  //TBuffer xbuf(TBuffer::kRead, dest_size,
242  // (char const*) &dest[0],kFALSE);
243  //TBuffer xbuf(TBuffer::kRead, eventView.eventLength(),
244  // (char const*) eventView.eventData(),kFALSE);
245  xbuf_.Reset();
246  xbuf_.SetBuffer(&dest_[0], dest_size, kFALSE);
247  RootDebug tracer(10, 10);
248 
249  //We do not yet know which EventPrincipal we will use, therefore
250  // we are using a new EventPrincipalHolder as a proxy. We need to
251  // make a new one instead of reusing the same one becuase when running
252  // multi-threaded there will be multiple EventPrincipals being used
253  // simultaneously.
254  eventPrincipalHolder_ = std::make_unique<EventPrincipalHolder>(); // propagate_const<T> has no reset() function
256  {
257  std::shared_ptr<void> refCoreStreamerGuard(nullptr, [](void*) {
259  ;
260  });
261  sendEvent_ = std::unique_ptr<SendEvent>(reinterpret_cast<SendEvent*>(xbuf_.ReadObjectAny(tc_)));
262  }
263 
264  if (sendEvent_.get() == nullptr) {
265  throw cms::Exception("StreamTranslation", "Event deserialization error")
266  << "got a null event from input stream\n";
267  }
268 
269  if (isMetaData) {
270  eventMetaDataChecksum_ = adler32_chksum;
271  return;
272  }
273 
274  if (sendEvent_->metaDataChecksum() != eventMetaDataChecksum_) {
275  throw cms::Exception("StreamTranslation") << " meta data checksum from event " << sendEvent_->metaDataChecksum()
276  << " does not match last read meta data " << eventMetaDataChecksum_;
277  }
278 
280 
281  FDEBUG(5) << "Got event: " << sendEvent_->aux().id() << " " << sendEvent_->products().size() << std::endl;
282  if (runAuxiliary().get() == nullptr || runAuxiliary()->run() != sendEvent_->aux().run() ||
283  runAuxiliary()->processHistoryID() != sendEvent_->processHistory().id()) {
285  new RunAuxiliary(sendEvent_->aux().run(), sendEvent_->aux().time(), Timestamp::invalidTimestamp());
286  runAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id());
289  }
292  runAuxiliary()->run(), eventView.lumi(), sendEvent_->aux().time(), Timestamp::invalidTimestamp());
293  luminosityBlockAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id());
295  }
296  setEventCached();
297  }
298 
302  bool eventOK = eventPrincipal.adjustToNewProductRegistry(*productRegistry());
303  assert(eventOK);
305  }
306  EventSelectionIDVector ids(sendEvent_->eventSelectionIDs());
307  BranchListIndexes indexes(sendEvent_->branchListIndexes());
308  branchIDListHelper()->fixBranchListIndexes(indexes);
309  auto history = processHistoryRegistry().getMapped(sendEvent_->aux().processHistoryID());
310  eventPrincipal.fillEventPrincipal(sendEvent_->aux(), history, std::move(ids), std::move(indexes));
311 
312  //We now know which eventPrincipal to use and we can reuse the slot in
313  // streamToEventPrincipalHolders to own the memory
314  eventPrincipalHolder_->setEventPrincipal(&eventPrincipal);
315  if (streamToEventPrincipalHolders_.size() < eventPrincipal.streamID().value() + 1) {
316  streamToEventPrincipalHolders_.resize(eventPrincipal.streamID().value() + 1);
317  }
319 
320  // no process name list handling
321 
322  SendProds& sps = sendEvent_->products();
323  for (auto& spitem : sps) {
324  FDEBUG(10) << "check prodpair" << std::endl;
325  if (spitem.desc() == nullptr)
326  throw cms::Exception("StreamTranslation", "Empty Provenance");
327  FDEBUG(5) << "Prov:"
328  << " " << spitem.desc()->className() << " " << spitem.desc()->productInstanceName() << " "
329  << spitem.desc()->branchID() << std::endl;
330 
331  BranchDescription const branchDesc(*spitem.desc());
332  // This ProductProvenance constructor inserts into the entry description registry
333  if (spitem.parents()) {
334  std::optional<ProductProvenance> productProvenance{std::in_place, spitem.branchID(), *spitem.parents()};
335  if (spitem.prod() != nullptr) {
336  FDEBUG(10) << "addproduct next " << spitem.branchID() << std::endl;
337  eventPrincipal.putOnRead(branchDesc,
338  std::unique_ptr<WrapperBase>(const_cast<WrapperBase*>(spitem.prod())),
339  std::move(productProvenance));
340  FDEBUG(10) << "addproduct done" << std::endl;
341  } else {
342  FDEBUG(10) << "addproduct empty next " << spitem.branchID() << std::endl;
343  eventPrincipal.putOnRead(branchDesc, std::unique_ptr<WrapperBase>(), std::move(productProvenance));
344  FDEBUG(10) << "addproduct empty done" << std::endl;
345  }
346  } else {
347  std::optional<ProductProvenance> productProvenance;
348  if (spitem.prod() != nullptr) {
349  FDEBUG(10) << "addproduct next " << spitem.branchID() << std::endl;
350  eventPrincipal.putOnRead(
351  branchDesc, std::unique_ptr<WrapperBase>(const_cast<WrapperBase*>(spitem.prod())), productProvenance);
352  FDEBUG(10) << "addproduct done" << std::endl;
353  } else {
354  FDEBUG(10) << "addproduct empty next " << spitem.branchID() << std::endl;
355  eventPrincipal.putOnRead(branchDesc, std::unique_ptr<WrapperBase>(), productProvenance);
356  FDEBUG(10) << "addproduct empty done" << std::endl;
357  }
358  }
359  spitem.clear();
360  }
361 
362  FDEBUG(10) << "Size = " << eventPrincipal.size() << std::endl;
363  }
364 
373  unsigned int StreamerInputSource::uncompressBuffer(unsigned char* inputBuffer,
374  unsigned int inputSize,
375  std::vector<unsigned char>& outputBuffer,
376  unsigned int expectedFullSize) {
377  unsigned long origSize = expectedFullSize;
378  unsigned long uncompressedSize = expectedFullSize * 1.1;
379  FDEBUG(1) << "Uncompress: original size = " << origSize << ", compressed size = " << inputSize << std::endl;
380  outputBuffer.resize(uncompressedSize);
381  int ret = uncompress(&outputBuffer[0], &uncompressedSize, inputBuffer, inputSize); // do not need compression level
382  //std::cout << "unCompress Return value: " << ret << " Okay = " << Z_OK << std::endl;
383  if (ret == Z_OK) {
384  // check the length against original uncompressed length
385  FDEBUG(10) << " original size = " << origSize << " final size = " << uncompressedSize << std::endl;
386  if (origSize != uncompressedSize) {
387  // we throw an error and return without event! null pointer
388  throw cms::Exception("StreamDeserialization", "Uncompression error")
389  << "mismatch event lengths should be" << origSize << " got " << uncompressedSize << "\n";
390  }
391  } else {
392  // we throw an error and return without event! null pointer
393  throw cms::Exception("StreamDeserialization", "Uncompression error") << "Error code = " << ret << "\n ";
394  }
395  return (unsigned int)uncompressedSize;
396  }
397 
398  bool StreamerInputSource::isBufferLZMA(unsigned char const* inputBuffer, unsigned int inputSize) {
399  if (inputSize >= 4 && !strcmp((const char*)inputBuffer, "XZ"))
400  return true;
401  else
402  return false;
403  }
404 
405  unsigned int StreamerInputSource::uncompressBufferLZMA(unsigned char* inputBuffer,
406  unsigned int inputSize,
407  std::vector<unsigned char>& outputBuffer,
408  unsigned int expectedFullSize,
409  bool hasHeader) {
410  unsigned long origSize = expectedFullSize;
411  unsigned long uncompressedSize = expectedFullSize * 1.1;
412  FDEBUG(1) << "Uncompress: original size = " << origSize << ", compressed size = " << inputSize << std::endl;
413  outputBuffer.resize(uncompressedSize);
414 
415  lzma_stream stream = LZMA_STREAM_INIT;
416  lzma_ret returnStatus;
417 
418  returnStatus = lzma_stream_decoder(&stream, UINT64_MAX, 0U);
419  if (returnStatus != LZMA_OK) {
420  throw cms::Exception("StreamDeserializationLZM", "LZMA stream decoder error")
421  << "Error code = " << returnStatus << "\n ";
422  }
423 
424  size_t hdrSize = hasHeader ? 4 : 0;
425  stream.next_in = (const uint8_t*)(inputBuffer + hdrSize);
426  stream.avail_in = (size_t)(inputSize - hdrSize);
427  stream.next_out = (uint8_t*)&outputBuffer[0];
428  stream.avail_out = (size_t)uncompressedSize;
429 
430  returnStatus = lzma_code(&stream, LZMA_FINISH);
431  if (returnStatus != LZMA_STREAM_END) {
432  lzma_end(&stream);
433  throw cms::Exception("StreamDeserializationLZM", "LZMA uncompression error")
434  << "Error code = " << returnStatus << "\n ";
435  }
436  lzma_end(&stream);
437 
438  uncompressedSize = (unsigned int)stream.total_out;
439 
440  FDEBUG(10) << " original size = " << origSize << " final size = " << uncompressedSize << std::endl;
441  if (origSize != uncompressedSize) {
442  // we throw an error and return without event! null pointer
443  throw cms::Exception("StreamDeserialization", "LZMA uncompression error")
444  << "mismatch event lengths should be" << origSize << " got " << uncompressedSize << "\n";
445  }
446 
447  return uncompressedSize;
448  }
449 
450  bool StreamerInputSource::isBufferZSTD(unsigned char const* inputBuffer, unsigned int inputSize) {
451  if (inputSize >= 4 && !strcmp((const char*)inputBuffer, "ZS"))
452  return true;
453  else
454  return false;
455  }
456 
457  unsigned int StreamerInputSource::uncompressBufferZSTD(unsigned char* inputBuffer,
458  unsigned int inputSize,
459  std::vector<unsigned char>& outputBuffer,
460  unsigned int expectedFullSize,
461  bool hasHeader) {
462  unsigned long uncompressedSize = expectedFullSize * 1.1;
463  FDEBUG(1) << "Uncompress: original size = " << expectedFullSize << ", compressed size = " << inputSize << std::endl;
464  outputBuffer.resize(uncompressedSize);
465 
466  size_t hdrSize = hasHeader ? 4 : 0;
467  size_t ret = ZSTD_decompress(
468  (void*)&(outputBuffer[0]), uncompressedSize, (const void*)(inputBuffer + hdrSize), inputSize - hdrSize);
469 
470  if (ZSTD_isError(ret)) {
471  throw cms::Exception("StreamDeserializationZSTD", "ZSTD uncompression error")
472  << "Error core " << ret << ", message:" << ZSTD_getErrorName(ret);
473  }
474  return (unsigned int)ret;
475  }
476 
478  // called from an online streamer source to reset after a stop command
479  // so an enable command will work
482  assert(!eventCached());
483  reset();
484  }
485 
487  // Need to define a dummy setRun here or else the InputSource::setRun is called
488  // if we have a source inheriting from this and wants to define a setRun method
489  throw Exception(errors::LogicError) << "StreamerInputSource::setRun()\n"
490  << "Run number cannot be modified for this type of Input Source\n"
491  << "Contact a Storage Manager Developer\n";
492  }
493 
495 
497 
499  return eventPrincipal_ ? eventPrincipal_->getIt(id) : nullptr;
500  }
501 
502  std::optional<std::tuple<edm::WrapperBase const*, unsigned int>>
504  if (eventPrincipal_)
505  return eventPrincipal_->getThinnedProduct(id, index);
506  return std::nullopt;
507  }
508 
510  std::vector<WrapperBase const*>& wrappers,
511  std::vector<unsigned int>& keys) const {
512  if (eventPrincipal_)
513  eventPrincipal_->getThinnedProducts(pid, wrappers, keys);
514  }
515 
517  edm::ProductID const& parent, unsigned int index, edm::ProductID const& thinned) const {
518  if (eventPrincipal_) {
519  return eventPrincipal_->getThinnedKeyFrom(parent, index, thinned);
520  } else {
521  return std::monostate{};
522  }
523  }
524 
526  assert(eventPrincipal_ != nullptr);
527  return eventPrincipal_->transitionIndex();
528  }
529 
531 
533 } // namespace edm::streamer
static unsigned int uncompressBufferLZMA(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize, bool hasHeader=true)
std::optional< std::tuple< edm::WrapperBase const *, unsigned int > > getThinnedProduct(ProductID const &, unsigned int) const override
void throwMissingDictionariesException(std::vector< std::string > &missingDictionaries, std::string const &context)
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:261
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:359
static Timestamp invalidTimestamp()
Definition: Timestamp.h:75
const uint8 * eventData() const
Definition: EventMessage.h:79
std::variant< unsigned int, detail::GetThinnedKeyFromExceptionFactory, std::monostate > OptionalThinnedKey
void doBuildRealData(const std::string &name)
Definition: ClassFiller.cc:22
void deserializeEventMetaData(EventMsgView const &eventView)
BranchID const & branchID() const
ret
prodAgent to be discontinued
std::vector< BranchDescription > SendDescs
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
void reset() const
Definition: InputSource.h:378
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:458
bool registerProcessHistory(ProcessHistory const &processHistory)
OptionalThinnedKey getThinnedKeyFrom(ProductID const &, unsigned int, ProductID const &) const override
bool isBufferZSTD(unsigned char const *inputBuffer, unsigned int inputSize)
void setRefCoreStreamer(bool resetAll=false)
void deserializeEventCommon(EventMsgView const &eventView, bool isMetaData)
static void fillDescription(ParameterSetDescription &description)
size_t size() const
Definition: Principal.cc:300
StreamerInputSource(ParameterSet const &pset, InputSourceDescription const &desc)
bool isBufferLZMA(unsigned char const *inputBuffer, unsigned int inputSize)
uint32_t T const *__restrict__ uint32_t const *__restrict__ int32_t int Histo::index_type cudaStream_t stream
#define FDEBUG(lev)
Definition: DebugMacros.h:19
StreamID streamID() const
void read(EventPrincipal &eventPrincipal) override
assert(be >=bs)
void resetRunAuxiliary(bool isNewRun=true) const
Definition: InputSource.h:370
std::string hostName() const
Definition: InitMessage.cc:186
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Accessors for thinnedAssociationsHelper.
Definition: InputSource.h:184
std::vector< EventSelectionID > EventSelectionIDVector
edm::propagate_const< TClass * > tc_
void setRun(RunNumber_t r) override
const uint8 * descData() const
Definition: InitMessage.h:87
std::vector< BranchListIndex > BranchListIndexes
std::unique_ptr< SendJobHeader > deserializeRegistry(InitMsgView const &initView)
static unsigned int uncompressBufferZSTD(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize, bool hasHeader=true)
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:366
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Accessors for branchIDListHelper.
Definition: InputSource.h:172
uint32 protocolVersion() const
Definition: InitMessage.cc:111
std::string merge(ProductRegistry const &other, std::string const &fileName, BranchDescription::MatchMode branchesMustMatch=BranchDescription::Permissive)
bool insertMapped(value_type const &v, bool forceUpdate=false)
Definition: Registry.cc:32
edm::propagate_const< std::unique_ptr< SendEvent > > sendEvent_
void deserializeEvent(EventMsgView const &eventView)
uint32 descLength() const
Definition: InitMessage.h:86
bool eventCached() const
Definition: InputSource.h:389
std::vector< StreamedProduct > SendProds
std::string hostName() const
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:391
uint32 adler32_chksum() const
Definition: EventMessage.h:97
std::string processName() const
Definition: InitMessage.cc:125
ProcessHistoryRegistry const & processHistoryRegistry() const
Accessors for process history registry.
Definition: InputSource.h:168
TClass * getTClass(const std::type_info &ti)
Definition: ClassFiller.cc:63
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
static void mergeIntoRegistry(SendJobHeader const &header, ProductRegistry &, bool subsequent)
uint32_t eventMetaDataChecksum(EventMsgView const &eventView) const
void putOnRead(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, std::optional< ProductProvenance > productProvenance) const
void deserializeAndMergeWithRegistry(InitMsgView const &initView, bool subsequent=false)
std::string wrappedClassName(std::string const &iFullName)
std::shared_ptr< ProductRegistry const > productRegistry() const
Accessors for product registry.
Definition: InputSource.h:165
uint32 adler32_chksum() const
Definition: InitMessage.h:90
bool loadCap(const std::string &name, std::vector< std::string > &missingDictionaries)
Definition: ClassFiller.cc:16
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:374
bool getMapped(ProcessHistoryID const &key, ProcessHistory &value) const
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:360
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:463
void fillEventPrincipal(EventAuxiliary const &aux, ProcessHistory const *processHistory, DelayedReader *reader=nullptr)
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:362
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:264
bool adjustToNewProductRegistry(ProductRegistry const &reg)
Definition: Principal.cc:312
static void buildClassCache(SendDescs const &descs)
void updateFromInput(ProductList const &other)
unsigned int RunNumber_t
void adjustIndexesAfterProductRegistryAddition()
Definition: Principal.cc:873
void getThinnedProducts(ProductID const &pid, std::vector< WrapperBase const *> &wrappers, std::vector< unsigned int > &keys) const override
unsigned int value() const
Definition: StreamID.h:43
uint32 eventLength() const
Definition: EventMessage.h:81
unsigned int uint32
Definition: MsgTools.h:14
WrapperBase const * getIt(ProductID const &id) const override
edm::propagate_const< std::unique_ptr< EventPrincipalHolder > > eventPrincipalHolder_
static void fillDescription(ParameterSetDescription &description)
def move(src, dest)
Definition: eostools.py:511
static Registry * instance()
Definition: Registry.cc:12
static unsigned int uncompressBuffer(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize)
std::vector< unsigned char > dest_
std::vector< edm::propagate_const< std::unique_ptr< EventPrincipalHolder > > > streamToEventPrincipalHolders_
static void declareStreamers(SendDescs const &descs)
void loadExtraClasses()
Definition: ClassFiller.cc:33