CMS 3D CMS Logo

StreamerInputSource.cc
Go to the documentation of this file.
2 
6 
18 
19 #include "zlib.h"
20 #include "lzma.h"
21 #include "zstd.h"
22 
31 
35 
36 #include <string>
37 #include <iostream>
38 #include <set>
39 
40 namespace edm {
41  namespace {
42  int const init_size = 1024 * 1024;
43  }
44 
46  : RawInputSource(pset, desc),
47  tc_(getTClass(typeid(SendEvent))),
48  dest_(init_size),
49  xbuf_(TBuffer::kRead, init_size),
50  sendEvent_(),
51  eventPrincipalHolder_(),
52  adjustEventToNewProductRegistry_(false),
53  processName_(),
54  protocolVersion_(0U) {}
55 
57 
58  // ---------------------------------------
60  ProductRegistry& reg,
61  BranchIDListHelper& branchIDListHelper,
62  ThinnedAssociationsHelper& thinnedHelper,
63  bool subsequent) {
64  SendDescs const& descs = header.descs();
65 
66  FDEBUG(6) << "mergeIntoRegistry: Product List: " << std::endl;
67 
68  if (subsequent) {
69  ProductRegistry pReg;
70  pReg.updateFromInput(descs);
72  if (!mergeInfo.empty()) {
73  throw cms::Exception("MismatchedInput", "RootInputFileSequence::previousEvent()") << mergeInfo;
74  }
75  branchIDListHelper.updateFromInput(header.branchIDLists());
76  thinnedHelper.updateFromPrimaryInput(header.thinnedAssociationsHelper());
77  } else {
78  declareStreamers(descs);
79  buildClassCache(descs);
81  if (!reg.frozen()) {
82  reg.updateFromInput(descs);
83  }
84  branchIDListHelper.updateFromInput(header.branchIDLists());
85  thinnedHelper.updateFromPrimaryInput(header.thinnedAssociationsHelper());
86  }
87  }
88 
90  std::vector<std::string> missingDictionaries;
91  std::vector<std::string> branchNamesForMissing;
92  std::vector<std::string> producedTypes;
93  for (auto const& item : descs) {
94  //pi->init();
95  std::string const real_name = wrappedClassName(item.className());
96  FDEBUG(6) << "declare: " << real_name << std::endl;
97  if (!loadCap(real_name, missingDictionaries)) {
98  branchNamesForMissing.emplace_back(item.branchName());
99  producedTypes.emplace_back(item.className() + std::string(" (read from input)"));
100  }
101  }
102  if (!missingDictionaries.empty()) {
103  std::string context("Calling StreamerInputSource::declareStreamers, checking dictionaries for input types");
104  throwMissingDictionariesException(missingDictionaries, context, producedTypes, branchNamesForMissing, true);
105  }
106  }
107 
109  for (auto const& item : descs) {
110  //pi->init();
111  std::string const real_name = wrappedClassName(item.className());
112  FDEBUG(6) << "BuildReadData: " << real_name << std::endl;
113  doBuildRealData(real_name);
114  }
115  }
116 
121  std::unique_ptr<SendJobHeader> StreamerInputSource::deserializeRegistry(InitMsgView const& initView) {
122  if (initView.code() != Header::INIT)
123  throw cms::Exception("StreamTranslation", "Registry deserialization error")
124  << "received wrong message type: expected INIT, got " << initView.code() << "\n";
125 
126  //Get the process name and store if for Protocol version 4 and above.
127  if (initView.protocolVersion() > 3) {
128  processName_ = initView.processName();
129  protocolVersion_ = initView.protocolVersion();
130 
131  FDEBUG(10) << "StreamerInputSource::deserializeRegistry processName = " << processName_ << std::endl;
132  FDEBUG(10) << "StreamerInputSource::deserializeRegistry protocolVersion_= " << protocolVersion_ << std::endl;
133  }
134 
135  // calculate the adler32 checksum
136  uint32_t adler32_chksum = cms::Adler32((char const*)initView.descData(), initView.descLength());
137  //std::cout << "Adler32 checksum of init message = " << adler32_chksum << std::endl;
138  //std::cout << "Adler32 checksum of init messsage from header = " << initView.adler32_chksum() << " "
139  // << "host name = " << initView.hostName() << " len = " << initView.hostName_len() << std::endl;
140  if ((uint32)adler32_chksum != initView.adler32_chksum()) {
141  // skip event (based on option?) or throw exception?
142  throw cms::Exception("StreamDeserialization", "Checksum error")
143  << " chksum from registry data = " << adler32_chksum << " from header = " << initView.adler32_chksum()
144  << " host name = " << initView.hostName() << std::endl;
145  }
146 
147  TClass* desc = getTClass(typeid(SendJobHeader));
148 
149  TBufferFile xbuf(
150  TBuffer::kRead, initView.descLength(), const_cast<char*>((char const*)initView.descData()), kFALSE);
151  RootDebug tracer(10, 10);
152  std::unique_ptr<SendJobHeader> sd((SendJobHeader*)xbuf.ReadObjectAny(desc));
153 
154  if (sd.get() == nullptr) {
155  throw cms::Exception("StreamTranslation", "Registry deserialization error")
156  << "Could not read the initial product registry list\n";
157  }
158 
159  sd->initializeTransients();
160  return sd;
161  }
162 
167  void StreamerInputSource::deserializeAndMergeWithRegistry(InitMsgView const& initView, bool subsequent) {
168  std::unique_ptr<SendJobHeader> sd = deserializeRegistry(initView);
170  if (subsequent) {
172  }
173  SendJobHeader::ParameterSetMap const& psetMap = sd->processParameterSet();
174  pset::Registry& psetRegistry = *pset::Registry::instance();
175  for (auto const& item : psetMap) {
176  ParameterSet pset(item.second.pset());
177  pset.setID(item.first);
178  psetRegistry.insertMapped(pset);
179  }
180  }
181 
186  if (eventView.code() != Header::EVENT)
187  throw cms::Exception("StreamTranslation", "Event deserialization error")
188  << "received wrong message type: expected EVENT, got " << eventView.code() << "\n";
189  FDEBUG(9) << "Decode event: " << eventView.event() << " " << eventView.run() << " " << eventView.size() << " "
190  << eventView.adler32_chksum() << " " << eventView.eventLength() << " " << eventView.eventData()
191  << std::endl;
192  // uncompress if we need to
193  // 78 was a dummy value (for no uncompressed) - should be 0 for uncompressed
194  // need to get rid of this when 090 MTCC streamers are gotten rid of
195  unsigned long origsize = eventView.origDataSize();
196  unsigned long dest_size; //(should be >= eventView.origDataSize())
197 
198  uint32_t adler32_chksum = cms::Adler32((char const*)eventView.eventData(), eventView.eventLength());
199  //std::cout << "Adler32 checksum of event = " << adler32_chksum << std::endl;
200  //std::cout << "Adler32 checksum from header = " << eventView.adler32_chksum() << " "
201  // << "host name = " << eventView.hostName() << " len = " << eventView.hostName_len() << std::endl;
202  if ((uint32)adler32_chksum != eventView.adler32_chksum()) {
203  // skip event (based on option?) or throw exception?
204  throw cms::Exception("StreamDeserialization", "Checksum error")
205  << " chksum from event = " << adler32_chksum << " from header = " << eventView.adler32_chksum()
206  << " host name = " << eventView.hostName() << std::endl;
207  }
208  if (origsize != 78 && origsize != 0) {
209  // compressed
210  if (isBufferLZMA((unsigned char const*)eventView.eventData(), eventView.eventLength())) {
211  dest_size = uncompressBufferLZMA(const_cast<unsigned char*>((unsigned char const*)eventView.eventData()),
212  eventView.eventLength(),
213  dest_,
214  origsize);
215  } else if (isBufferZSTD((unsigned char const*)eventView.eventData(), eventView.eventLength())) {
216  dest_size = uncompressBufferZSTD(const_cast<unsigned char*>((unsigned char const*)eventView.eventData()),
217  eventView.eventLength(),
218  dest_,
219  origsize);
220  } else
221  dest_size = uncompressBuffer(const_cast<unsigned char*>((unsigned char const*)eventView.eventData()),
222  eventView.eventLength(),
223  dest_,
224  origsize);
225  } else { // not compressed
226  // we need to copy anyway the buffer as we are using dest in xbuf
227  dest_size = eventView.eventLength();
228  dest_.resize(dest_size);
229  unsigned char* pos = (unsigned char*)&dest_[0];
230  unsigned char const* from = (unsigned char const*)eventView.eventData();
231  std::copy(from, from + dest_size, pos);
232  }
233  //TBuffer xbuf(TBuffer::kRead, dest_size,
234  // (char const*) &dest[0],kFALSE);
235  //TBuffer xbuf(TBuffer::kRead, eventView.eventLength(),
236  // (char const*) eventView.eventData(),kFALSE);
237  xbuf_.Reset();
238  xbuf_.SetBuffer(&dest_[0], dest_size, kFALSE);
239  RootDebug tracer(10, 10);
240 
241  //We do not yet know which EventPrincipal we will use, therefore
242  // we are using a new EventPrincipalHolder as a proxy. We need to
243  // make a new one instead of reusing the same one becuase when running
244  // multi-threaded there will be multiple EventPrincipals being used
245  // simultaneously.
246  eventPrincipalHolder_ = std::make_unique<EventPrincipalHolder>(); // propagate_const<T> has no reset() function
248  {
249  std::shared_ptr<void> refCoreStreamerGuard(nullptr, [](void*) {
251  ;
252  });
253  sendEvent_ = std::unique_ptr<SendEvent>((SendEvent*)xbuf_.ReadObjectAny(tc_));
254  }
255 
256  if (sendEvent_.get() == nullptr) {
257  throw cms::Exception("StreamTranslation", "Event deserialization error")
258  << "got a null event from input stream\n";
259  }
261 
262  FDEBUG(5) << "Got event: " << sendEvent_->aux().id() << " " << sendEvent_->products().size() << std::endl;
263  if (runAuxiliary().get() == nullptr || runAuxiliary()->run() != sendEvent_->aux().run() ||
264  runAuxiliary()->processHistoryID() != sendEvent_->processHistory().id()) {
266  new RunAuxiliary(sendEvent_->aux().run(), sendEvent_->aux().time(), Timestamp::invalidTimestamp());
267  runAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id());
270  }
273  runAuxiliary()->run(), eventView.lumi(), sendEvent_->aux().time(), Timestamp::invalidTimestamp());
274  luminosityBlockAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id());
276  }
277  setEventCached();
278  }
279 
283  bool eventOK = eventPrincipal.adjustToNewProductRegistry(*productRegistry());
284  assert(eventOK);
286  }
287  EventSelectionIDVector ids(sendEvent_->eventSelectionIDs());
288  BranchListIndexes indexes(sendEvent_->branchListIndexes());
289  branchIDListHelper()->fixBranchListIndexes(indexes);
290  auto history = processHistoryRegistry().getMapped(sendEvent_->aux().processHistoryID());
291  eventPrincipal.fillEventPrincipal(sendEvent_->aux(), history, std::move(ids), std::move(indexes));
292 
293  //We now know which eventPrincipal to use and we can reuse the slot in
294  // streamToEventPrincipalHolders to own the memory
295  eventPrincipalHolder_->setEventPrincipal(&eventPrincipal);
296  if (streamToEventPrincipalHolders_.size() < eventPrincipal.streamID().value() + 1) {
297  streamToEventPrincipalHolders_.resize(eventPrincipal.streamID().value() + 1);
298  }
300 
301  // no process name list handling
302 
303  SendProds& sps = sendEvent_->products();
304  for (auto& spitem : sps) {
305  FDEBUG(10) << "check prodpair" << std::endl;
306  if (spitem.desc() == nullptr)
307  throw cms::Exception("StreamTranslation", "Empty Provenance");
308  FDEBUG(5) << "Prov:"
309  << " " << spitem.desc()->className() << " " << spitem.desc()->productInstanceName() << " "
310  << spitem.desc()->branchID() << std::endl;
311 
312  BranchDescription const branchDesc(*spitem.desc());
313  // This ProductProvenance constructor inserts into the entry description registry
314  if (spitem.parents()) {
315  std::optional<ProductProvenance> productProvenance{std::in_place, spitem.branchID(), *spitem.parents()};
316  if (spitem.prod() != nullptr) {
317  FDEBUG(10) << "addproduct next " << spitem.branchID() << std::endl;
318  eventPrincipal.putOnRead(branchDesc,
319  std::unique_ptr<WrapperBase>(const_cast<WrapperBase*>(spitem.prod())),
320  std::move(productProvenance));
321  FDEBUG(10) << "addproduct done" << std::endl;
322  } else {
323  FDEBUG(10) << "addproduct empty next " << spitem.branchID() << std::endl;
324  eventPrincipal.putOnRead(branchDesc, std::unique_ptr<WrapperBase>(), std::move(productProvenance));
325  FDEBUG(10) << "addproduct empty done" << std::endl;
326  }
327  } else {
328  std::optional<ProductProvenance> productProvenance;
329  if (spitem.prod() != nullptr) {
330  FDEBUG(10) << "addproduct next " << spitem.branchID() << std::endl;
331  eventPrincipal.putOnRead(
332  branchDesc, std::unique_ptr<WrapperBase>(const_cast<WrapperBase*>(spitem.prod())), productProvenance);
333  FDEBUG(10) << "addproduct done" << std::endl;
334  } else {
335  FDEBUG(10) << "addproduct empty next " << spitem.branchID() << std::endl;
336  eventPrincipal.putOnRead(branchDesc, std::unique_ptr<WrapperBase>(), productProvenance);
337  FDEBUG(10) << "addproduct empty done" << std::endl;
338  }
339  }
340  spitem.clear();
341  }
342 
343  FDEBUG(10) << "Size = " << eventPrincipal.size() << std::endl;
344  }
345 
354  unsigned int StreamerInputSource::uncompressBuffer(unsigned char* inputBuffer,
355  unsigned int inputSize,
356  std::vector<unsigned char>& outputBuffer,
357  unsigned int expectedFullSize) {
358  unsigned long origSize = expectedFullSize;
359  unsigned long uncompressedSize = expectedFullSize * 1.1;
360  FDEBUG(1) << "Uncompress: original size = " << origSize << ", compressed size = " << inputSize << std::endl;
361  outputBuffer.resize(uncompressedSize);
362  int ret = uncompress(&outputBuffer[0], &uncompressedSize, inputBuffer, inputSize); // do not need compression level
363  //std::cout << "unCompress Return value: " << ret << " Okay = " << Z_OK << std::endl;
364  if (ret == Z_OK) {
365  // check the length against original uncompressed length
366  FDEBUG(10) << " original size = " << origSize << " final size = " << uncompressedSize << std::endl;
367  if (origSize != uncompressedSize) {
368  // we throw an error and return without event! null pointer
369  throw cms::Exception("StreamDeserialization", "Uncompression error")
370  << "mismatch event lengths should be" << origSize << " got " << uncompressedSize << "\n";
371  }
372  } else {
373  // we throw an error and return without event! null pointer
374  throw cms::Exception("StreamDeserialization", "Uncompression error") << "Error code = " << ret << "\n ";
375  }
376  return (unsigned int)uncompressedSize;
377  }
378 
379  bool StreamerInputSource::isBufferLZMA(unsigned char const* inputBuffer, unsigned int inputSize) {
380  if (inputSize >= 4 && !strcmp((const char*)inputBuffer, "XZ"))
381  return true;
382  else
383  return false;
384  }
385 
386  unsigned int StreamerInputSource::uncompressBufferLZMA(unsigned char* inputBuffer,
387  unsigned int inputSize,
388  std::vector<unsigned char>& outputBuffer,
389  unsigned int expectedFullSize,
390  bool hasHeader) {
391  unsigned long origSize = expectedFullSize;
392  unsigned long uncompressedSize = expectedFullSize * 1.1;
393  FDEBUG(1) << "Uncompress: original size = " << origSize << ", compressed size = " << inputSize << std::endl;
394  outputBuffer.resize(uncompressedSize);
395 
396  lzma_stream stream = LZMA_STREAM_INIT;
397  lzma_ret returnStatus;
398 
399  returnStatus = lzma_stream_decoder(&stream, UINT64_MAX, 0U);
400  if (returnStatus != LZMA_OK) {
401  throw cms::Exception("StreamDeserializationLZM", "LZMA stream decoder error")
402  << "Error code = " << returnStatus << "\n ";
403  }
404 
405  size_t hdrSize = hasHeader ? 4 : 0;
406  stream.next_in = (const uint8_t*)(inputBuffer + hdrSize);
407  stream.avail_in = (size_t)(inputSize - hdrSize);
408  stream.next_out = (uint8_t*)&outputBuffer[0];
409  stream.avail_out = (size_t)uncompressedSize;
410 
411  returnStatus = lzma_code(&stream, LZMA_FINISH);
412  if (returnStatus != LZMA_STREAM_END) {
413  lzma_end(&stream);
414  throw cms::Exception("StreamDeserializationLZM", "LZMA uncompression error")
415  << "Error code = " << returnStatus << "\n ";
416  }
417  lzma_end(&stream);
418 
419  uncompressedSize = (unsigned int)stream.total_out;
420 
421  FDEBUG(10) << " original size = " << origSize << " final size = " << uncompressedSize << std::endl;
422  if (origSize != uncompressedSize) {
423  // we throw an error and return without event! null pointer
424  throw cms::Exception("StreamDeserialization", "LZMA uncompression error")
425  << "mismatch event lengths should be" << origSize << " got " << uncompressedSize << "\n";
426  }
427 
428  return uncompressedSize;
429  }
430 
431  bool StreamerInputSource::isBufferZSTD(unsigned char const* inputBuffer, unsigned int inputSize) {
432  if (inputSize >= 4 && !strcmp((const char*)inputBuffer, "ZS"))
433  return true;
434  else
435  return false;
436  }
437 
438  unsigned int StreamerInputSource::uncompressBufferZSTD(unsigned char* inputBuffer,
439  unsigned int inputSize,
440  std::vector<unsigned char>& outputBuffer,
441  unsigned int expectedFullSize,
442  bool hasHeader) {
443  unsigned long uncompressedSize = expectedFullSize * 1.1;
444  FDEBUG(1) << "Uncompress: original size = " << expectedFullSize << ", compressed size = " << inputSize << std::endl;
445  outputBuffer.resize(uncompressedSize);
446 
447  size_t hdrSize = hasHeader ? 4 : 0;
448  size_t ret = ZSTD_decompress(
449  (void*)&(outputBuffer[0]), uncompressedSize, (const void*)(inputBuffer + hdrSize), inputSize - hdrSize);
450 
451  if (ZSTD_isError(ret)) {
452  throw cms::Exception("StreamDeserializationZSTD", "ZSTD uncompression error")
453  << "Error core " << ret << ", message:" << ZSTD_getErrorName(ret);
454  }
455  return (unsigned int)ret;
456  }
457 
459  // called from an online streamer source to reset after a stop command
460  // so an enable command will work
463  assert(!eventCached());
464  reset();
465  }
466 
468  // Need to define a dummy setRun here or else the InputSource::setRun is called
469  // if we have a source inheriting from this and wants to define a setRun method
470  throw Exception(errors::LogicError) << "StreamerInputSource::setRun()\n"
471  << "Run number cannot be modified for this type of Input Source\n"
472  << "Contact a Storage Manager Developer\n";
473  }
474 
476 
478 
480  return eventPrincipal_ ? eventPrincipal_->getIt(id) : nullptr;
481  }
482 
484  unsigned int& index) const {
485  return eventPrincipal_ ? eventPrincipal_->getThinnedProduct(id, index) : nullptr;
486  }
487 
489  std::vector<WrapperBase const*>& wrappers,
490  std::vector<unsigned int>& keys) const {
491  if (eventPrincipal_)
492  eventPrincipal_->getThinnedProducts(pid, wrappers, keys);
493  }
494 
496  assert(eventPrincipal_ != nullptr);
497  return eventPrincipal_->transitionIndex();
498  }
499 
501 
503 } // namespace edm
runTheMatrix.ret
ret
prodAgent to be discontinued
Definition: runTheMatrix.py:355
edm::pset::Registry::instance
static Registry * instance()
Definition: Registry.cc:12
edm::SendEvent
Definition: StreamedProducts.h:71
ThinnedAssociationsHelper.h
edm::RunNumber_t
unsigned int RunNumber_t
Definition: RunLumiEventNumber.h:14
edm::RootDebug
Definition: ClassFiller.h:15
InitMsgView::adler32_chksum
uint32 adler32_chksum() const
Definition: InitMessage.h:89
edm::ProcessHistoryRegistry::registerProcessHistory
bool registerProcessHistory(ProcessHistory const &processHistory)
Definition: ProcessHistoryRegistry.cc:11
InitMsgView::protocolVersion
uint32 protocolVersion() const
Definition: InitMessage.cc:109
edm::StreamerInputSource::deserializeRegistry
std::unique_ptr< SendJobHeader > deserializeRegistry(InitMsgView const &initView)
Definition: StreamerInputSource.cc:121
edm::throwMissingDictionariesException
void throwMissingDictionariesException(std::vector< std::string > &missingDictionaries, std::string const &context)
Definition: DictionaryTools.cc:193
funct::false
false
Definition: Factorize.h:34
filterCSVwithJSON.copy
copy
Definition: filterCSVwithJSON.py:36
InitMsgView::descLength
uint32 descLength() const
Definition: InitMessage.h:85
edm::InputSource::thinnedAssociationsHelper
std::shared_ptr< ThinnedAssociationsHelper const > thinnedAssociationsHelper() const
Accessors for thinnedAssociationsHelper.
Definition: InputSource.h:165
edm::StreamerInputSource::tc_
edm::propagate_const< TClass * > tc_
Definition: StreamerInputSource.h:114
edm::errors::LogicError
Definition: EDMException.h:37
edm::StreamerInputSource::deserializeEvent
void deserializeEvent(EventMsgView const &eventView)
Definition: StreamerInputSource.cc:185
InitMsgView::processName
std::string processName() const
Definition: InitMessage.cc:123
BranchIDListHelper.h
edm::doBuildRealData
void doBuildRealData(const std::string &name)
Definition: ClassFiller.cc:22
edm
HLT enums.
Definition: AlignableModifier.h:19
cms::cuda::stream
cudaStream_t stream
Definition: HistoContainer.h:57
pos
Definition: PixelAliasList.h:18
edm::Principal::adjustToNewProductRegistry
bool adjustToNewProductRegistry(ProductRegistry const &reg)
Definition: Principal.cc:310
edm::StreamerInputSource::EventPrincipalHolder::getIt
WrapperBase const * getIt(ProductID const &id) const override
Definition: StreamerInputSource.cc:479
edm::ParameterSetDescription
Definition: ParameterSetDescription.h:52
edm::StreamerInputSource::fillDescription
static void fillDescription(ParameterSetDescription &description)
Definition: StreamerInputSource.cc:502
edm::BranchListIndexes
std::vector< BranchListIndex > BranchListIndexes
Definition: BranchListIndex.h:18
edm::StreamerInputSource::declareStreamers
static void declareStreamers(SendDescs const &descs)
Definition: StreamerInputSource.cc:89
edm::ProductRegistry::updateFromInput
void updateFromInput(ProductList const &other)
Definition: ProductRegistry.cc:221
edm::InputSourceDescription
Definition: InputSourceDescription.h:20
cms::cuda::assert
assert(be >=bs)
edm::StreamerInputSource::uncompressBuffer
static unsigned int uncompressBuffer(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize)
Definition: StreamerInputSource.cc:354
edm::StreamerInputSource::~StreamerInputSource
~StreamerInputSource() override
Definition: StreamerInputSource.cc:56
edm::StreamerInputSource::sendEvent_
edm::propagate_const< std::unique_ptr< SendEvent > > sendEvent_
Definition: StreamerInputSource.h:117
ProcessHistoryRegistry.h
edm::StreamID::value
unsigned int value() const
Definition: StreamID.h:42
relativeConstraints.keys
keys
Definition: relativeConstraints.py:89
ProductRegistry.h
InitMessage.h
edm::StreamerInputSource::isBufferLZMA
bool isBufferLZMA(unsigned char const *inputBuffer, unsigned int inputSize)
Definition: StreamerInputSource.cc:379
edm::StreamerInputSource::buildClassCache
static void buildClassCache(SendDescs const &descs)
Definition: StreamerInputSource.cc:108
edm::RawInputSource
Definition: RawInputSource.h:17
Header::INIT
Definition: MsgHeader.h:15
edm::InputSource::processHistoryRegistryForUpdate
ProcessHistoryRegistry & processHistoryRegistryForUpdate()
Definition: InputSource.h:327
EventMsgView
Definition: EventMessage.h:72
edm::InputSource::productRegistry
std::shared_ptr< ProductRegistry const > productRegistry() const
Accessors for product registry.
Definition: InputSource.h:151
uint32
unsigned int uint32
Definition: MsgTools.h:13
edm::StreamerInputSource::xbuf_
TBufferFile xbuf_
Definition: StreamerInputSource.h:116
edm::StreamerInputSource::streamToEventPrincipalHolders_
std::vector< edm::propagate_const< std::unique_ptr< EventPrincipalHolder > > > streamToEventPrincipalHolders_
Definition: StreamerInputSource.h:119
edm::StreamerInputSource::deserializeAndMergeWithRegistry
void deserializeAndMergeWithRegistry(InitMsgView const &initView, bool subsequent=false)
Definition: StreamerInputSource.cc:167
edm::Principal::adjustIndexesAfterProductRegistryAddition
void adjustIndexesAfterProductRegistryAddition()
Definition: Principal.cc:884
EventMsgView::origDataSize
uint32 origDataSize() const
Definition: EventMessage.cc:86
edm::ProductRegistry
Definition: ProductRegistry.h:34
edm::BranchDescription::Permissive
Definition: BranchDescription.h:36
edm::LuminosityBlockAuxiliary
Definition: LuminosityBlockAuxiliary.h:15
EDMException.h
edm::StreamerInputSource::protocolVersion_
unsigned int protocolVersion_
Definition: StreamerInputSource.h:123
EventMsgView::adler32_chksum
uint32 adler32_chksum() const
Definition: EventMessage.h:96
InitMsgView::hostName
std::string hostName() const
Definition: InitMessage.cc:183
edm::EventPrincipal
Definition: EventPrincipal.h:46
Header::EVENT
Definition: MsgHeader.h:16
RefCoreStreamer.h
cms::Adler32
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
Definition: Adler32Calculator.cc:10
EventPrincipal.h
EventMsgView::code
uint32 code() const
Definition: EventMessage.h:76
ProductProvenance.h
BranchListIndex.h
StreamerInputSource.h
edm::ThinnedAssociationsHelper::updateFromPrimaryInput
void updateFromPrimaryInput(ThinnedAssociationsHelper const &)
Definition: ThinnedAssociationsHelper.cc:168
edm::InputSource::luminosityBlockAuxiliary
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:242
edm::StreamerInputSource::adjustEventToNewProductRegistry_
bool adjustEventToNewProductRegistry_
Definition: StreamerInputSource.h:120
EventMsgView::run
uint32 run() const
Definition: EventMessage.cc:71
edm::getTClass
TClass * getTClass(const std::type_info &ti)
Definition: ClassFiller.cc:63
edm::StreamerInputSource::EventPrincipalHolder::transitionIndex_
unsigned int transitionIndex_() const override
Definition: StreamerInputSource.cc:495
edm::StreamerInputSource::setRun
void setRun(RunNumber_t r) override
Definition: StreamerInputSource.cc:467
mitigatedMETSequence_cff.U
U
Definition: mitigatedMETSequence_cff.py:36
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
edm::StreamerInputSource::processName_
std::string processName_
Definition: StreamerInputSource.h:122
edm::StreamerInputSource::EventPrincipalHolder::setEventPrincipal
void setEventPrincipal(EventPrincipal *ep)
Definition: StreamerInputSource.cc:500
edm::InputSource::runAuxiliary
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:239
LuminosityBlockAuxiliary.h
edm::ThinnedAssociationsHelper
Definition: ThinnedAssociationsHelper.h:35
BranchDescription.h
edm::InputSource::processHistoryRegistry
ProcessHistoryRegistry const & processHistoryRegistry() const
Accessors for process history registry.
Definition: InputSource.h:155
edm::InputSource::setRunAuxiliary
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:329
edm::ParameterSet
Definition: ParameterSet.h:36
edm::InputSource::reset
void reset() const
Definition: InputSource.h:345
edm::InputSource::productRegistryUpdate
ProductRegistry & productRegistryUpdate()
Definition: InputSource.h:326
edm::StreamerInputSource::EventPrincipalHolder::getThinnedProduct
WrapperBase const * getThinnedProduct(ProductID const &, unsigned int &) const override
Definition: StreamerInputSource.cc:483
edm::StreamerInputSource::resetAfterEndRun
void resetAfterEndRun()
Definition: StreamerInputSource.cc:458
EventMsgView::hostName
std::string hostName() const
Definition: EventMessage.cc:121
FDEBUG
#define FDEBUG(lev)
Definition: DebugMacros.h:19
EventMsgView::event
uint64 event() const
Definition: EventMessage.cc:76
EventMsgView::eventLength
uint32 eventLength() const
Definition: EventMessage.h:81
edm::get
T const & get(Event const &event, InputTag const &tag) noexcept(false)
Definition: Event.h:669
edm::BranchIDListHelper
Definition: BranchIDListHelper.h:15
createfilelist.int
int
Definition: createfilelist.py:10
edm::EventPrincipal::putOnRead
void putOnRead(BranchDescription const &bd, std::unique_ptr< WrapperBase > edp, std::optional< ProductProvenance > productProvenance) const
Definition: EventPrincipal.cc:209
edm::StreamerInputSource::uncompressBufferZSTD
static unsigned int uncompressBufferZSTD(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize, bool hasHeader=true)
Definition: StreamerInputSource.cc:438
edm::InputSource::luminosityBlock
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:442
edm::WrapperBase
Definition: WrapperBase.h:23
B2GTnPMonitor_cfi.item
item
Definition: B2GTnPMonitor_cfi.py:147
edm::EventPrincipal::streamID
StreamID streamID() const
Definition: EventPrincipal.h:106
InitMsgView::code
uint32 code() const
Definition: InitMessage.h:65
edm::wrappedClassName
std::string wrappedClassName(std::string const &iFullName)
Definition: WrappedClassName.cc:4
edm::StreamerInputSource::EventPrincipalHolder::getThinnedProducts
void getThinnedProducts(ProductID const &pid, std::vector< WrapperBase const * > &wrappers, std::vector< unsigned int > &keys) const override
Definition: StreamerInputSource.cc:488
edm::pset::Registry::insertMapped
bool insertMapped(value_type const &v, bool forceUpdate=false)
Definition: Registry.cc:32
edm::StreamerInputSource::isBufferZSTD
bool isBufferZSTD(unsigned char const *inputBuffer, unsigned int inputSize)
Definition: StreamerInputSource.cc:431
FileBlock.h
edm::EventPrincipal::fillEventPrincipal
void fillEventPrincipal(EventAuxiliary const &aux, ProcessHistory const *processHistory, DelayedReader *reader=nullptr)
Definition: EventPrincipal.cc:103
edm::InputSource::setLuminosityBlockAuxiliary
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:333
edm::SendJobHeader::ParameterSetMap
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
Definition: StreamedProducts.h:104
EventMsgView::size
uint32 size() const
Definition: EventMessage.h:77
Registry.h
edm::InputSource::branchIDListHelper
std::shared_ptr< BranchIDListHelper const > branchIDListHelper() const
Accessors for branchIDListHelper.
Definition: InputSource.h:159
edm::Timestamp::invalidTimestamp
static Timestamp invalidTimestamp()
Definition: Timestamp.h:82
edm::InputSource::eventCached
bool eventCached() const
Definition: InputSource.h:356
WrappedClassName.h
edm::StreamerInputSource::eventPrincipalHolder_
edm::propagate_const< std::unique_ptr< EventPrincipalHolder > > eventPrincipalHolder_
Definition: StreamerInputSource.h:118
eostools.move
def move(src, dest)
Definition: eostools.py:511
edm::loadCap
bool loadCap(const std::string &name, std::vector< std::string > &missingDictionaries)
Definition: ClassFiller.cc:16
edm::StreamerInputSource::mergeIntoRegistry
static void mergeIntoRegistry(SendJobHeader const &header, ProductRegistry &, BranchIDListHelper &, ThinnedAssociationsHelper &, bool subsequent)
Definition: StreamerInputSource.cc:59
edm::Principal::size
size_t size() const
Definition: Principal.cc:298
EventAuxiliary.h
Exception
Definition: hltDiff.cc:246
edm::StreamerInputSource::dest_
std::vector< unsigned char > dest_
Definition: StreamerInputSource.h:115
RunAuxiliary.h
edm::ProcessHistoryRegistry::getMapped
bool getMapped(ProcessHistoryID const &key, ProcessHistory &value) const
Definition: ProcessHistoryRegistry.cc:29
EventMessage.h
edm::InputSource::setEventCached
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:358
edm::loadExtraClasses
void loadExtraClasses()
Definition: ClassFiller.cc:33
Adler32Calculator.h
edm::StreamerInputSource::EventPrincipalHolder::~EventPrincipalHolder
~EventPrincipalHolder() override
Definition: StreamerInputSource.cc:477
edm::setRefCoreStreamer
void setRefCoreStreamer(bool resetAll=false)
Definition: RefCoreStreamer.cc:83
Exception.h
edm::ProductRegistry::frozen
bool frozen() const
Definition: ProductRegistry.h:129
edm::StreamerInputSource::uncompressBufferLZMA
static unsigned int uncompressBufferLZMA(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize, bool hasHeader=true)
Definition: StreamerInputSource.cc:386
edm::InputSource::resetLuminosityBlockAuxiliary
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:341
AlignmentPI::index
index
Definition: AlignmentPayloadInspectorHelper.h:46
edm::StreamerInputSource::StreamerInputSource
StreamerInputSource(ParameterSet const &pset, InputSourceDescription const &desc)
Definition: StreamerInputSource.cc:45
sd
double sd
Definition: CascadeWrapper.h:113
RecoTauValidation_cfi.header
header
Definition: RecoTauValidation_cfi.py:292
edm::BranchDescription
Definition: BranchDescription.h:32
edm::InputSource::resetRunAuxiliary
void resetRunAuxiliary(bool isNewRun=true) const
Definition: InputSource.h:337
ClassFiller.h
DebugMacros.h
cms::Exception
Definition: Exception.h:70
edm::RawInputSource::fillDescription
static void fillDescription(ParameterSetDescription &description)
Definition: RawInputSource.cc:105
EventMsgView::lumi
uint32 lumi() const
Definition: EventMessage.cc:81
edm::ProductRegistry::merge
std::string merge(ProductRegistry const &other, std::string const &fileName, BranchDescription::MatchMode branchesMustMatch=BranchDescription::Permissive)
Definition: ProductRegistry.cc:262
ParameterSet.h
InitMsgView::descData
const uint8 * descData() const
Definition: InitMessage.h:86
EventMsgView::eventData
const uint8 * eventData() const
Definition: EventMessage.h:79
edm::SendJobHeader
Definition: StreamedProducts.h:102
DictionaryTools.h
edm::EventSelectionIDVector
std::vector< EventSelectionID > EventSelectionIDVector
Definition: EventSelectionID.h:16
edm::SendDescs
std::vector< BranchDescription > SendDescs
Definition: StreamedProducts.h:100
edm::BranchDescription::branchID
BranchID const & branchID() const
Definition: BranchDescription.h:74
SiStripBadComponentsDQMServiceTemplate_cfg.ep
ep
Definition: SiStripBadComponentsDQMServiceTemplate_cfg.py:86
edm::StreamerInputSource::read
void read(EventPrincipal &eventPrincipal) override
Definition: StreamerInputSource.cc:280
edm::ProductID
Definition: ProductID.h:27
edm::pset::Registry
Definition: Registry.h:26
EventSelectionID.h
muonDTDigis_cfi.pset
pset
Definition: muonDTDigis_cfi.py:27
edm::SendProds
std::vector< StreamedProduct > SendProds
Definition: StreamedProducts.h:67
edm::StreamerInputSource::EventPrincipalHolder::EventPrincipalHolder
EventPrincipalHolder()
Definition: StreamerInputSource.cc:475
edm::RunAuxiliary
Definition: RunAuxiliary.h:15
InitMsgView
Definition: InitMessage.h:61
edm::InputSource::run
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:437