CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Classes | Public Member Functions | Static Public Member Functions | Protected Member Functions | Static Protected Member Functions | Private Member Functions | Private Attributes
edm::StreamerInputSource Class Reference

#include <StreamerInputSource.h>

Inheritance diagram for edm::StreamerInputSource:
edm::RawInputSource edm::InputSource edm::ProductRegistryHelper dqmservices::DQMStreamerReader edm::StreamerFileReader edm::StreamerInputModule< Producer >

Classes

class  EventPrincipalHolder
 

Public Member Functions

void deserializeAndMergeWithRegistry (InitMsgView const &initView, bool subsequent=false)
 
void deserializeEvent (EventMsgView const &eventView)
 
std::auto_ptr< SendJobHeaderdeserializeRegistry (InitMsgView const &initView)
 
 StreamerInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
 
virtual ~StreamerInputSource ()
 
- Public Member Functions inherited from edm::RawInputSource
 RawInputSource (ParameterSet const &pset, InputSourceDescription const &desc)
 
virtual ~RawInputSource ()
 
- Public Member Functions inherited from edm::InputSource
std::shared_ptr< ActivityRegistryactReg () const
 Accessor for Activity Registry. More...
 
std::shared_ptr
< BranchIDListHelper
branchIDListHelper () const
 Accessor for branchIDListHelper. More...
 
void closeFile (FileBlock *, bool cleaningUpAfterException)
 close current file More...
 
void doBeginJob ()
 Called by framework at beginning of job. More...
 
void doBeginLumi (LuminosityBlockPrincipal &lbp, ProcessContext const *)
 Called by framework at beginning of lumi block. More...
 
void doBeginRun (RunPrincipal &rp, ProcessContext const *)
 Called by framework at beginning of run. More...
 
void doEndJob ()
 Called by framework at end of job. More...
 
void doEndLumi (LuminosityBlockPrincipal &lbp, bool cleaningUpAfterException, ProcessContext const *)
 Called by framework at end of lumi block. More...
 
void doEndRun (RunPrincipal &rp, bool cleaningUpAfterException, ProcessContext const *)
 Called by framework at end of run. More...
 
void doPostForkReacquireResources (std::shared_ptr< multicore::MessageReceiverForSource >)
 
void doPreForkReleaseResources ()
 Called by the framework before forking the process. More...
 
ProcessingController::ForwardState forwardState () const
 
bool goToEvent (EventID const &eventID)
 
 InputSource (ParameterSet const &, InputSourceDescription const &)
 Constructor. More...
 
 InputSource (InputSource const &)=delete
 
void issueReports (EventID const &eventID)
 issue an event report More...
 
LuminosityBlockNumber_t luminosityBlock () const
 Accessor for current luminosity block number. More...
 
std::shared_ptr
< LuminosityBlockAuxiliary
luminosityBlockAuxiliary () const
 Called by the framework to merge or insert lumi in principal cache. More...
 
int maxEvents () const
 
int maxLuminosityBlocks () const
 
ModuleDescription const & moduleDescription () const
 Accessor for 'module' description. More...
 
ItemType nextItemType ()
 Advances the source to the next item. More...
 
InputSourceoperator= (InputSource const &)=delete
 
bool primary () const
 Accessor for primary input source flag. More...
 
ProcessConfiguration const & processConfiguration () const
 Accessor for Process Configuration. More...
 
std::string const & processGUID () const
 Accessor for global process identifier. More...
 
ProcessHistoryRegistry const & processHistoryRegistry () const
 Const accessor for process history registry. More...
 
ProcessHistoryRegistryprocessHistoryRegistryForUpdate ()
 Non-const accessor for process history registry. More...
 
ProcessingMode processingMode () const
 RunsLumisAndEvents (default), RunsAndLumis, or Runs. More...
 
std::shared_ptr
< ProductRegistry const > 
productRegistry () const
 Accessor for product registry. More...
 
bool randomAccess () const
 
void readAndMergeLumi (LuminosityBlockPrincipal &lbp)
 Read next luminosity block (same as a prior lumi) More...
 
void readAndMergeRun (RunPrincipal &rp)
 Read next run (same as a prior run) More...
 
void readEvent (EventPrincipal &ep, StreamContext &)
 Read next event. More...
 
bool readEvent (EventPrincipal &ep, EventID const &, StreamContext &)
 Read a specific event. More...
 
std::unique_ptr< FileBlockreadFile ()
 Read next file. More...
 
void readLuminosityBlock (LuminosityBlockPrincipal &lumiPrincipal, HistoryAppender &historyAppender)
 Read next luminosity block (new lumi) More...
 
std::shared_ptr
< LuminosityBlockAuxiliary
readLuminosityBlockAuxiliary ()
 Read next luminosity block Auxilary. More...
 
void readRun (RunPrincipal &runPrincipal, HistoryAppender &historyAppender)
 Read next run (new run) More...
 
std::shared_ptr< RunAuxiliaryreadRunAuxiliary ()
 Read next run Auxiliary. More...
 
ProcessHistoryID const & reducedProcessHistoryID () const
 
void registerProducts ()
 Register any produced products. More...
 
int remainingEvents () const
 
int remainingLuminosityBlocks () const
 
void repeat ()
 Reset the remaining number of events/lumis to the maximum number. More...
 
SharedResourcesAcquirerresourceSharedWithDelayedReader () const
 Returns nullptr if no resource shared between the Source and a DelayedReader. More...
 
ProcessingController::ReverseState reverseState () const
 
void rewind ()
 Begin again at the first event. More...
 
RunNumber_t run () const
 Accessor for current run number. More...
 
std::shared_ptr< RunAuxiliaryrunAuxiliary () const
 Called by the framework to merge or insert run in principal cache. More...
 
void setLuminosityBlockNumber_t (LuminosityBlockNumber_t lb)
 Set the luminosity block ID. More...
 
void setRunNumber (RunNumber_t r)
 Set the run number. More...
 
void skipEvents (int offset)
 
bool skipForForking ()
 
std::shared_ptr
< ThinnedAssociationsHelper
thinnedAssociationsHelper () const
 Accessor for thinnedAssociationsHelper. More...
 
Timestamp const & timestamp () const
 Accessor for the current time, as seen by the input source. More...
 
virtual ~InputSource ()
 Destructor. More...
 

Static Public Member Functions

static void fillDescription (ParameterSetDescription &description)
 
static void mergeIntoRegistry (SendJobHeader const &header, ProductRegistry &, BranchIDListHelper &, ThinnedAssociationsHelper &, bool subsequent)
 
static unsigned int uncompressBuffer (unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize)
 
- Static Public Member Functions inherited from edm::RawInputSource
static void fillDescription (ParameterSetDescription &description)
 
- Static Public Member Functions inherited from edm::InputSource
static const std::string & baseType ()
 
static void fillDescription (ParameterSetDescription &desc)
 
static void fillDescriptions (ConfigurationDescriptions &descriptions)
 
static void prevalidate (ConfigurationDescriptions &)
 

Protected Member Functions

void resetAfterEndRun ()
 
- Protected Member Functions inherited from edm::RawInputSource
virtual bool checkNextEvent ()=0
 
void makeEvent (EventPrincipal &eventPrincipal, EventAuxiliary const &eventAuxiliary)
 
void setInputFileTransitionsEachEvent ()
 
- Protected Member Functions inherited from edm::InputSource
void decreaseRemainingEventsBy (int iSkipped)
 
bool eventCached () const
 
std::shared_ptr
< LuminosityBlockPrincipal >
const 
luminosityBlockPrincipal () const
 
bool newLumi () const
 
bool newRun () const
 
ProcessHistoryRegistryprocessHistoryRegistryUpdate () const
 
ProductRegistryproductRegistryUpdate () const
 
void reset () const
 
void resetEventCached ()
 
void resetLuminosityBlockAuxiliary (bool isNewLumi=true) const
 
void resetNewLumi ()
 
void resetNewRun ()
 
void resetRunAuxiliary (bool isNewRun=true) const
 
std::shared_ptr< RunPrincipal >
const 
runPrincipal () const
 
void setEventCached ()
 Called by the framework to merge or ached() const {return eventCached_;}. More...
 
void setLuminosityBlockAuxiliary (LuminosityBlockAuxiliary *lbp)
 
void setNewLumi ()
 
void setNewRun ()
 
void setRunAuxiliary (RunAuxiliary *rp)
 
void setTimestamp (Timestamp const &theTime)
 To set the current time, as seen by the input source. More...
 
virtual void skip (int offset)
 
ItemType state () const
 

Static Protected Member Functions

static void buildClassCache (SendDescs const &descs)
 
static void declareStreamers (SendDescs const &descs)
 

Private Member Functions

virtual void read (EventPrincipal &eventPrincipal)
 
virtual std::unique_ptr
< FileBlock
readFile_ ()
 
virtual void setRun (RunNumber_t r)
 

Private Attributes

bool adjustEventToNewProductRegistry_
 
std::vector< unsigned char > dest_
 
std::unique_ptr
< EventPrincipalHolder
eventPrincipalHolder_
 
std::string processName_
 
unsigned int protocolVersion_
 
std::unique_ptr< SendEventsendEvent_
 
std::vector< std::unique_ptr
< EventPrincipalHolder > > 
streamToEventPrincipalHolders_
 
TClass * tc_
 
TBufferFile xbuf_
 

Additional Inherited Members

- Public Types inherited from edm::InputSource
enum  ItemType {
  IsInvalid, IsStop, IsFile, IsRun,
  IsLumi, IsEvent, IsRepeat, IsSynchronize
}
 
enum  ProcessingMode { Runs, RunsAndLumis, RunsLumisAndEvents }
 
typedef
ProductRegistryHelper::TypeLabelList 
TypeLabelList
 

Detailed Description

Definition at line 31 of file StreamerInputSource.h.

Constructor & Destructor Documentation

edm::StreamerInputSource::StreamerInputSource ( ParameterSet const &  pset,
InputSourceDescription const &  desc 
)
explicit

Definition at line 42 of file StreamerInputSource.cc.

44  :
45  RawInputSource(pset, desc),
46  tc_(getTClass(typeid(SendEvent))),
48  xbuf_(TBuffer::kRead, init_size),
49  sendEvent_(),
52  processName_(),
53  protocolVersion_(0U) {
54  }
std::unique_ptr< EventPrincipalHolder > eventPrincipalHolder_
RawInputSource(ParameterSet const &pset, InputSourceDescription const &desc)
TClass * getTClass(const std::type_info &ti)
Definition: ClassFiller.cc:79
std::unique_ptr< SendEvent > sendEvent_
const int init_size
std::vector< unsigned char > dest_
edm::StreamerInputSource::~StreamerInputSource ( )
virtual

Definition at line 56 of file StreamerInputSource.cc.

56 {}

Member Function Documentation

void edm::StreamerInputSource::buildClassCache ( SendDescs const &  descs)
staticprotected

Definition at line 108 of file StreamerInputSource.cc.

References edm::doBuildRealData(), FDEBUG, AlCaHLTBitMon_QueryRunRegistry::string, and edm::wrappedClassName().

Referenced by mergeIntoRegistry().

108  {
109  for(auto const& item : descs) {
110  //pi->init();
111  std::string const real_name = wrappedClassName(item.className());
112  FDEBUG(6) << "BuildReadData: " << real_name << std::endl;
113  doBuildRealData(real_name);
114  }
115  }
void doBuildRealData(const std::string &name)
Definition: ClassFiller.cc:34
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::string wrappedClassName(std::string const &iFullName)
void edm::StreamerInputSource::declareStreamers ( SendDescs const &  descs)
staticprotected

Definition at line 97 of file StreamerInputSource.cc.

References FDEBUG, edm::loadCap(), AlCaHLTBitMon_QueryRunRegistry::string, and edm::wrappedClassName().

Referenced by mergeIntoRegistry().

97  {
98  for(auto const& item : descs) {
99  //pi->init();
100  std::string const real_name = wrappedClassName(item.className());
101  FDEBUG(6) << "declare: " << real_name << std::endl;
102  loadCap(real_name);
103  }
104  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::string wrappedClassName(std::string const &iFullName)
void loadCap(const std::string &name)
Definition: ClassFiller.cc:27
void edm::StreamerInputSource::deserializeAndMergeWithRegistry ( InitMsgView const &  initView,
bool  subsequent = false 
)

Deserializes the specified init message into a SendJobHeader object and merges registries.

Definition at line 171 of file StreamerInputSource.cc.

References adjustEventToNewProductRegistry_, edm::InputSource::branchIDListHelper(), deserializeRegistry(), edm::pset::Registry::insertMapped(), edm::pset::Registry::instance(), mergeIntoRegistry(), edm::InputSource::productRegistryUpdate(), sd, edm::ParameterSet::setID(), and edm::InputSource::thinnedAssociationsHelper().

Referenced by edm::StreamerFileReader::checkNextEvent(), dqmservices::DQMStreamerReader::checkNextEvent(), dqmservices::DQMStreamerReader::openFile_(), edm::StreamerFileReader::reset_(), and edm::StreamerInputModule< Producer >::StreamerInputModule().

171  {
172  std::auto_ptr<SendJobHeader> sd = deserializeRegistry(initView);
174  if (subsequent) {
176  }
177  SendJobHeader::ParameterSetMap const& psetMap = sd->processParameterSet();
178  pset::Registry& psetRegistry = *pset::Registry::instance();
179  for (auto const& item : psetMap) {
180  ParameterSet pset(item.second.pset());
181  pset.setID(item.first);
182  psetRegistry.insertMapped(pset);
183  }
184  }
static void mergeIntoRegistry(SendJobHeader const &header, ProductRegistry &, BranchIDListHelper &, ThinnedAssociationsHelper &, bool subsequent)
std::map< ParameterSetID, ParameterSetBlob > ParameterSetMap
std::shared_ptr< ThinnedAssociationsHelper > thinnedAssociationsHelper() const
Accessor for thinnedAssociationsHelper.
Definition: InputSource.h:181
std::shared_ptr< BranchIDListHelper > branchIDListHelper() const
Accessor for branchIDListHelper.
Definition: InputSource.h:178
double sd
ProductRegistry & productRegistryUpdate() const
Definition: InputSource.h:351
std::auto_ptr< SendJobHeader > deserializeRegistry(InitMsgView const &initView)
static Registry * instance()
Definition: Registry.cc:16
void edm::StreamerInputSource::deserializeEvent ( EventMsgView const &  eventView)

Deserializes the specified event message.

Definition at line 190 of file StreamerInputSource.cc.

References cms::Adler32(), EventMsgView::adler32_chksum(), ecal_dqm_sourceclient-live_cfg::cerr, EventMsgView::code(), filterCSVwithJSON::copy, dest_, Header::EVENT, EventMsgView::event(), EventMsgView::eventData(), EventMsgView::eventLength(), eventPrincipalHolder_, Exception, FDEBUG, EventMsgView::hostName(), edm::Timestamp::invalidTimestamp(), EventMsgView::lumi(), edm::InputSource::luminosityBlock(), edm::InputSource::luminosityBlockAuxiliary(), EventMsgView::origDataSize(), edm::InputSource::processHistoryRegistryUpdate(), edm::ProcessHistoryRegistry::registerProcessHistory(), edm::InputSource::resetLuminosityBlockAuxiliary(), EventMsgView::run(), edm::InputSource::run(), edm::InputSource::runAuxiliary(), sendEvent_, edm::InputSource::setEventCached(), edm::InputSource::setLuminosityBlockAuxiliary(), edm::RunAuxiliary::setProcessHistoryID(), edm::LuminosityBlockAuxiliary::setProcessHistoryID(), edm::setRefCoreStreamer(), edm::InputSource::setRunAuxiliary(), EventMsgView::size(), tc_, uncompressBuffer(), and xbuf_.

Referenced by edm::StreamerFileReader::checkNextEvent(), and dqmservices::DQMStreamerReader::checkNextEvent().

190  {
191  if(eventView.code() != Header::EVENT)
192  throw cms::Exception("StreamTranslation","Event deserialization error")
193  << "received wrong message type: expected EVENT, got "
194  << eventView.code() << "\n";
195  FDEBUG(9) << "Decode event: "
196  << eventView.event() << " "
197  << eventView.run() << " "
198  << eventView.size() << " "
199  << eventView.adler32_chksum() << " "
200  << eventView.eventLength() << " "
201  << eventView.eventData()
202  << std::endl;
203  // uncompress if we need to
204  // 78 was a dummy value (for no uncompressed) - should be 0 for uncompressed
205  // need to get rid of this when 090 MTCC streamers are gotten rid of
206  unsigned long origsize = eventView.origDataSize();
207  unsigned long dest_size; //(should be >= eventView.origDataSize())
208 
209  uint32_t adler32_chksum = cms::Adler32((char const*)eventView.eventData(), eventView.eventLength());
210  //std::cout << "Adler32 checksum of event = " << adler32_chksum << std::endl;
211  //std::cout << "Adler32 checksum from header = " << eventView.adler32_chksum() << " "
212  // << "host name = " << eventView.hostName() << " len = " << eventView.hostName_len() << std::endl;
213  if((uint32)adler32_chksum != eventView.adler32_chksum()) {
214  std::cerr << "Error from StreamerInputSource: checksum of event data blob failed "
215  << " chksum from event = " << adler32_chksum << " from header = "
216  << eventView.adler32_chksum() << " host name = " << eventView.hostName() << std::endl;
217  // skip event (based on option?) or throw exception?
218  }
219  if(origsize != 78 && origsize != 0) {
220  // compressed
221  dest_size = uncompressBuffer(const_cast<unsigned char*>((unsigned char const*)eventView.eventData()),
222  eventView.eventLength(), dest_, origsize);
223  } else { // not compressed
224  // we need to copy anyway the buffer as we are using dest in xbuf
225  dest_size = eventView.eventLength();
226  dest_.resize(dest_size);
227  unsigned char* pos = (unsigned char*) &dest_[0];
228  unsigned char const* from = (unsigned char const*) eventView.eventData();
229  std::copy(from,from+dest_size,pos);
230  }
231  //TBuffer xbuf(TBuffer::kRead, dest_size,
232  // (char const*) &dest[0],kFALSE);
233  //TBuffer xbuf(TBuffer::kRead, eventView.eventLength(),
234  // (char const*) eventView.eventData(),kFALSE);
235  xbuf_.Reset();
236  xbuf_.SetBuffer(&dest_[0],dest_size,kFALSE);
237  RootDebug tracer(10,10);
238 
239  //We do not yet know which EventPrincipal we will use, therefore
240  // we are using a new EventPrincipalHolder as a proxy. We need to
241  // make a new one instead of reusing the same one becuase when running
242  // multi-threaded there will be multiple EventPrincipals being used
243  // simultaneously.
244  eventPrincipalHolder_.reset( new EventPrincipalHolder() );
246  sendEvent_ = std::unique_ptr<SendEvent>((SendEvent*)xbuf_.ReadObjectAny(tc_));
248 
249  if(sendEvent_.get() == nullptr) {
250  throw cms::Exception("StreamTranslation","Event deserialization error")
251  << "got a null event from input stream\n";
252  }
253  processHistoryRegistryUpdate().registerProcessHistory(sendEvent_->processHistory());
254 
255  FDEBUG(5) << "Got event: " << sendEvent_->aux().id() << " " << sendEvent_->products().size() << std::endl;
256  if(runAuxiliary().get() == nullptr || runAuxiliary()->run() != sendEvent_->aux().run()) {
257  RunAuxiliary* runAuxiliary = new RunAuxiliary(sendEvent_->aux().run(), sendEvent_->aux().time(), Timestamp::invalidTimestamp());
258  runAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id());
259  setRunAuxiliary(runAuxiliary);
261  }
262  if(!luminosityBlockAuxiliary() || luminosityBlockAuxiliary()->luminosityBlock() != eventView.lumi()) {
263  LuminosityBlockAuxiliary* luminosityBlockAuxiliary =
264  new LuminosityBlockAuxiliary(runAuxiliary()->run(), eventView.lumi(), sendEvent_->aux().time(), Timestamp::invalidTimestamp());
265  luminosityBlockAuxiliary->setProcessHistoryID(sendEvent_->processHistory().id());
266  setLuminosityBlockAuxiliary(luminosityBlockAuxiliary);
267  }
268  setEventCached();
269  }
static Timestamp invalidTimestamp()
Definition: Timestamp.h:101
bool registerProcessHistory(ProcessHistory const &processHistory)
void setRefCoreStreamer(bool resetAll=false)
RunNumber_t run() const
Accessor for current run number.
Definition: InputSource.cc:600
std::unique_ptr< EventPrincipalHolder > eventPrincipalHolder_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
void setLuminosityBlockAuxiliary(LuminosityBlockAuxiliary *lbp)
Definition: InputSource.h:358
std::unique_ptr< SendEvent > sendEvent_
unsigned int uint32
Definition: MsgTools.h:13
void setEventCached()
Called by the framework to merge or ached() const {return eventCached_;}.
Definition: InputSource.h:385
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
LuminosityBlockNumber_t luminosityBlock() const
Accessor for current luminosity block number.
Definition: InputSource.cc:606
std::shared_ptr< RunAuxiliary > runAuxiliary() const
Called by the framework to merge or insert run in principal cache.
Definition: InputSource.h:263
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:366
void setRunAuxiliary(RunAuxiliary *rp)
Definition: InputSource.h:354
ProcessHistoryRegistry & processHistoryRegistryUpdate() const
Definition: InputSource.h:352
std::shared_ptr< LuminosityBlockAuxiliary > luminosityBlockAuxiliary() const
Called by the framework to merge or insert lumi in principal cache.
Definition: InputSource.h:266
static unsigned int uncompressBuffer(unsigned char *inputBuffer, unsigned int inputSize, std::vector< unsigned char > &outputBuffer, unsigned int expectedFullSize)
std::vector< unsigned char > dest_
std::auto_ptr< SendJobHeader > edm::StreamerInputSource::deserializeRegistry ( InitMsgView const &  initView)

Deserializes the specified init message into a SendJobHeader object (which is related to the product registry).

Definition at line 122 of file StreamerInputSource.cc.

References cms::Adler32(), InitMsgView::adler32_chksum(), ecal_dqm_sourceclient-live_cfg::cerr, InitMsgView::code(), InitMsgView::descData(), InitMsgView::descLength(), Exception, FDEBUG, edm::getTClass(), InitMsgView::hostName(), Header::INIT, InitMsgView::processName(), processName_, InitMsgView::protocolVersion(), protocolVersion_, and sd.

Referenced by deserializeAndMergeWithRegistry().

122  {
123  if(initView.code() != Header::INIT)
124  throw cms::Exception("StreamTranslation","Registry deserialization error")
125  << "received wrong message type: expected INIT, got "
126  << initView.code() << "\n";
127 
128  //Get the process name and store if for Protocol version 4 and above.
129  if (initView.protocolVersion() > 3) {
130 
131  processName_ = initView.processName();
132  protocolVersion_ = initView.protocolVersion();
133 
134  FDEBUG(10) << "StreamerInputSource::deserializeRegistry processName = "<< processName_<< std::endl;
135  FDEBUG(10) << "StreamerInputSource::deserializeRegistry protocolVersion_= "<< protocolVersion_<< std::endl;
136  }
137 
138  // calculate the adler32 checksum
139  uint32_t adler32_chksum = cms::Adler32((char const*)initView.descData(),initView.descLength());
140  //std::cout << "Adler32 checksum of init message = " << adler32_chksum << std::endl;
141  //std::cout << "Adler32 checksum of init messsage from header = " << initView.adler32_chksum() << " "
142  // << "host name = " << initView.hostName() << " len = " << initView.hostName_len() << std::endl;
143  if((uint32)adler32_chksum != initView.adler32_chksum()) {
144  std::cerr << "Error from StreamerInputSource: checksum of Init registry blob failed "
145  << " chksum from registry data = " << adler32_chksum << " from header = "
146  << initView.adler32_chksum() << " host name = " << initView.hostName() << std::endl;
147  // skip event (based on option?) or throw exception?
148  }
149 
150  TClass* desc = getTClass(typeid(SendJobHeader));
151 
152  TBufferFile xbuf(TBuffer::kRead, initView.descLength(),
153  const_cast<char*>((char const*)initView.descData()),kFALSE);
154  RootDebug tracer(10,10);
155  std::auto_ptr<SendJobHeader> sd((SendJobHeader*)xbuf.ReadObjectAny(desc));
156 
157  if(sd.get() == nullptr) {
158  throw cms::Exception("StreamTranslation","Registry deserialization error")
159  << "Could not read the initial product registry list\n";
160  }
161 
162  sd->initializeTransients();
163  return sd;
164  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18
TClass * getTClass(const std::type_info &ti)
Definition: ClassFiller.cc:79
unsigned int uint32
Definition: MsgTools.h:13
void Adler32(char const *data, size_t len, uint32_t &a, uint32_t &b)
double sd
void edm::StreamerInputSource::fillDescription ( ParameterSetDescription description)
static

Definition at line 420 of file StreamerInputSource.cc.

References edm::RawInputSource::fillDescription().

Referenced by edm::StreamerFileReader::fillDescriptions(), and dqmservices::DQMStreamerReader::fillDescriptions().

420  {
422  }
static void fillDescription(ParameterSetDescription &description)
void edm::StreamerInputSource::mergeIntoRegistry ( SendJobHeader const &  header,
ProductRegistry reg,
BranchIDListHelper branchIDListHelper,
ThinnedAssociationsHelper thinnedHelper,
bool  subsequent 
)
static

Definition at line 65 of file StreamerInputSource.cc.

References edm::SendJobHeader::branchIDLists(), buildClassCache(), declareStreamers(), edm::SendJobHeader::descs(), Exception, FDEBUG, edm::ProductRegistry::frozen(), edm::loadExtraClasses(), edm::ProductRegistry::merge(), edm::BranchDescription::Permissive, AlCaHLTBitMon_QueryRunRegistry::string, edm::SendJobHeader::thinnedAssociationsHelper(), edm::BranchIDListHelper::updateFromInput(), edm::ProductRegistry::updateFromInput(), and edm::ThinnedAssociationsHelper::updateFromInput().

Referenced by deserializeAndMergeWithRegistry().

69  {
70 
71  SendDescs const& descs = header.descs();
72 
73  FDEBUG(6) << "mergeIntoRegistry: Product List: " << std::endl;
74 
75  if (subsequent) {
76  ProductRegistry pReg;
77  pReg.updateFromInput(descs);
78  std::string mergeInfo = reg.merge(pReg, std::string(), BranchDescription::Permissive);
79  if (!mergeInfo.empty()) {
80  throw cms::Exception("MismatchedInput","RootInputFileSequence::previousEvent()") << mergeInfo;
81  }
82  branchIDListHelper.updateFromInput(header.branchIDLists());
83  thinnedHelper.updateFromInput(header.thinnedAssociationsHelper(), false, std::vector<BranchID>());
84  } else {
85  declareStreamers(descs);
86  buildClassCache(descs);
88  if(!reg.frozen()) {
89  reg.updateFromInput(descs);
90  }
91  branchIDListHelper.updateFromInput(header.branchIDLists());
92  thinnedHelper.updateFromInput(header.thinnedAssociationsHelper(), false, std::vector<BranchID>());
93  }
94  }
std::vector< BranchDescription > SendDescs
#define FDEBUG(lev)
Definition: DebugMacros.h:18
static void declareStreamers(SendDescs const &descs)
static void buildClassCache(SendDescs const &descs)
std::shared_ptr< BranchIDListHelper > branchIDListHelper() const
Accessor for branchIDListHelper.
Definition: InputSource.h:178
void loadExtraClasses()
Definition: ClassFiller.cc:46
void edm::StreamerInputSource::read ( EventPrincipal eventPrincipal)
privatevirtual

Implements edm::RawInputSource.

Definition at line 272 of file StreamerInputSource.cc.

References adjustEventToNewProductRegistry_, edm::Principal::adjustIndexesAfterProductRegistryAddition(), edm::Principal::adjustToNewProductRegistry(), assert(), edm::InputSource::branchIDListHelper(), eventPrincipalHolder_, FDEBUG, edm::EventPrincipal::fillEventPrincipal(), eostools::move(), edm::InputSource::processHistoryRegistry(), edm::InputSource::productRegistry(), edm::EventPrincipal::putOnRead(), sendEvent_, edm::Principal::size(), edm::EventPrincipal::streamID(), streamToEventPrincipalHolders_, and edm::StreamID::value().

272  {
274  eventPrincipal.adjustIndexesAfterProductRegistryAddition();
275  bool eventOK = eventPrincipal.adjustToNewProductRegistry(*productRegistry());
276  assert(eventOK);
278  }
279  EventSelectionIDVector ids(sendEvent_->eventSelectionIDs());
280  BranchListIndexes indexes(sendEvent_->branchListIndexes());
281  branchIDListHelper()->fixBranchListIndexes(indexes);
282  eventPrincipal.fillEventPrincipal(sendEvent_->aux(), processHistoryRegistry(), std::move(ids), std::move(indexes));
283 
284  //We now know which eventPrincipal to use and we can reuse the slot in
285  // streamToEventPrincipalHolders to own the memory
286  eventPrincipalHolder_->setEventPrincipal(&eventPrincipal);
287  if(streamToEventPrincipalHolders_.size() < eventPrincipal.streamID().value() +1) {
288  streamToEventPrincipalHolders_.resize(eventPrincipal.streamID().value() +1);
289  }
290  streamToEventPrincipalHolders_[eventPrincipal.streamID().value()] = std::move(eventPrincipalHolder_);
291 
292  // no process name list handling
293 
294  SendProds& sps = sendEvent_->products();
295  for(auto& spitem : sps) {
296  FDEBUG(10) << "check prodpair" << std::endl;
297  if(spitem.desc() == nullptr)
298  throw cms::Exception("StreamTranslation","Empty Provenance");
299  FDEBUG(5) << "Prov:"
300  << " " << spitem.desc()->className()
301  << " " << spitem.desc()->productInstanceName()
302  << " " << spitem.desc()->branchID()
303  << std::endl;
304 
305  BranchDescription const branchDesc(*spitem.desc());
306  // This ProductProvenance constructor inserts into the entry description registry
307  ProductProvenance productProvenance(spitem.branchID(), *spitem.parents());
308 
309  if(spitem.prod() != nullptr) {
310  FDEBUG(10) << "addproduct next " << spitem.branchID() << std::endl;
311  eventPrincipal.putOnRead(branchDesc, std::unique_ptr<WrapperBase>(const_cast<WrapperBase*>(spitem.prod())), productProvenance);
312  FDEBUG(10) << "addproduct done" << std::endl;
313  } else {
314  FDEBUG(10) << "addproduct empty next " << spitem.branchID() << std::endl;
315  eventPrincipal.putOnRead(branchDesc, std::unique_ptr<WrapperBase>(), productProvenance);
316  FDEBUG(10) << "addproduct empty done" << std::endl;
317  }
318  spitem.clear();
319  }
320 
321  FDEBUG(10) << "Size = " << eventPrincipal.size() << std::endl;
322  }
ProcessHistoryRegistry const & processHistoryRegistry() const
Const accessor for process history registry.
Definition: InputSource.h:172
assert(m_qm.get())
std::unique_ptr< EventPrincipalHolder > eventPrincipalHolder_
#define FDEBUG(lev)
Definition: DebugMacros.h:18
std::vector< EventSelectionID > EventSelectionIDVector
std::vector< BranchListIndex > BranchListIndexes
std::shared_ptr< BranchIDListHelper > branchIDListHelper() const
Accessor for branchIDListHelper.
Definition: InputSource.h:178
std::unique_ptr< SendEvent > sendEvent_
def move
Definition: eostools.py:508
std::vector< StreamedProduct > SendProds
std::shared_ptr< ProductRegistry const > productRegistry() const
Accessor for product registry.
Definition: InputSource.h:169
std::vector< std::unique_ptr< EventPrincipalHolder > > streamToEventPrincipalHolders_
std::unique_ptr< FileBlock > edm::StreamerInputSource::readFile_ ( )
privatevirtual

Reimplemented from edm::InputSource.

Definition at line 60 of file StreamerInputSource.cc.

60  {
61  return std::unique_ptr<FileBlock>(new FileBlock);
62  }
void edm::StreamerInputSource::resetAfterEndRun ( )
protected

Definition at line 368 of file StreamerInputSource.cc.

References assert(), edm::InputSource::eventCached(), edm::InputSource::reset(), edm::InputSource::resetLuminosityBlockAuxiliary(), and edm::InputSource::resetRunAuxiliary().

368  {
369  // called from an online streamer source to reset after a stop command
370  // so an enable command will work
373  assert(!eventCached());
374  reset();
375  }
assert(m_qm.get())
void resetRunAuxiliary(bool isNewRun=true) const
Definition: InputSource.h:362
void resetLuminosityBlockAuxiliary(bool isNewLumi=true) const
Definition: InputSource.h:366
void reset() const
Definition: InputSource.h:370
bool eventCached() const
Definition: InputSource.h:383
void edm::StreamerInputSource::setRun ( RunNumber_t  r)
privatevirtual

Reimplemented from edm::InputSource.

Definition at line 377 of file StreamerInputSource.cc.

References Exception, and edm::errors::LogicError.

377  {
378  // Need to define a dummy setRun here or else the InputSource::setRun is called
379  // if we have a source inheriting from this and wants to define a setRun method
381  << "StreamerInputSource::setRun()\n"
382  << "Run number cannot be modified for this type of Input Source\n"
383  << "Contact a Storage Manager Developer\n";
384  }
unsigned int edm::StreamerInputSource::uncompressBuffer ( unsigned char *  inputBuffer,
unsigned int  inputSize,
std::vector< unsigned char > &  outputBuffer,
unsigned int  expectedFullSize 
)
static

Uncompresses the data in the specified input buffer into the specified output buffer. The inputSize should be set to the size of the compressed data in the inputBuffer. The expectedFullSize should be set to the original size of the data (before compression). Returns the actual size of the uncompressed data. Errors are reported by throwing exceptions.

Definition at line 333 of file StreamerInputSource.cc.

References ecal_dqm_sourceclient-live_cfg::cerr, Exception, FDEBUG, and run_regression::ret.

Referenced by deserializeEvent().

336  {
337  unsigned long origSize = expectedFullSize;
338  unsigned long uncompressedSize = expectedFullSize*1.1;
339  FDEBUG(1) << "Uncompress: original size = " << origSize
340  << ", compressed size = " << inputSize
341  << std::endl;
342  outputBuffer.resize(uncompressedSize);
343  int ret = uncompress(&outputBuffer[0], &uncompressedSize,
344  inputBuffer, inputSize); // do not need compression level
345  //std::cout << "unCompress Return value: " << ret << " Okay = " << Z_OK << std::endl;
346  if(ret == Z_OK) {
347  // check the length against original uncompressed length
348  FDEBUG(10) << " original size = " << origSize << " final size = "
349  << uncompressedSize << std::endl;
350  if(origSize != uncompressedSize) {
351  std::cerr << "deserializeEvent: Problem with uncompress, original size = "
352  << origSize << " uncompress size = " << uncompressedSize << std::endl;
353  // we throw an error and return without event! null pointer
354  throw cms::Exception("StreamDeserialization","Uncompression error")
355  << "mismatch event lengths should be" << origSize << " got "
356  << uncompressedSize << "\n";
357  }
358  } else {
359  // we throw an error and return without event! null pointer
360  std::cerr << "deserializeEvent: Problem with uncompress, return value = "
361  << ret << std::endl;
362  throw cms::Exception("StreamDeserialization","Uncompression error")
363  << "Error code = " << ret << "\n ";
364  }
365  return (unsigned int) uncompressedSize;
366  }
#define FDEBUG(lev)
Definition: DebugMacros.h:18

Member Data Documentation

bool edm::StreamerInputSource::adjustEventToNewProductRegistry_
private

Definition at line 103 of file StreamerInputSource.h.

Referenced by deserializeAndMergeWithRegistry(), and read().

std::vector<unsigned char> edm::StreamerInputSource::dest_
private

Definition at line 98 of file StreamerInputSource.h.

Referenced by deserializeEvent().

std::unique_ptr<EventPrincipalHolder> edm::StreamerInputSource::eventPrincipalHolder_
private

Definition at line 101 of file StreamerInputSource.h.

Referenced by deserializeEvent(), and read().

std::string edm::StreamerInputSource::processName_
private

Definition at line 105 of file StreamerInputSource.h.

Referenced by deserializeRegistry().

unsigned int edm::StreamerInputSource::protocolVersion_
private

Definition at line 106 of file StreamerInputSource.h.

Referenced by deserializeRegistry().

std::unique_ptr<SendEvent> edm::StreamerInputSource::sendEvent_
private

Definition at line 100 of file StreamerInputSource.h.

Referenced by deserializeEvent(), and read().

std::vector<std::unique_ptr<EventPrincipalHolder> > edm::StreamerInputSource::streamToEventPrincipalHolders_
private

Definition at line 102 of file StreamerInputSource.h.

Referenced by read().

TClass* edm::StreamerInputSource::tc_
private

Definition at line 97 of file StreamerInputSource.h.

Referenced by deserializeEvent().

TBufferFile edm::StreamerInputSource::xbuf_
private

Definition at line 99 of file StreamerInputSource.h.

Referenced by deserializeEvent().