Go to the documentation of this file.00001
00003
00004 #include "DQMServices/Core/interface/MonitorElement.h"
00005 #include "EventFilter/StorageManager/interface/CurlInterface.h"
00006 #include "EventFilter/StorageManager/src/EventServerProxy.icc"
00007 #include "EventFilter/StorageManager/src/EventStreamHttpReader.h"
00008 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00009 #include "FWCore/ServiceRegistry/interface/Service.h"
00010 #include "FWCore/Utilities/interface/Exception.h"
00011
00012 #include <string>
00013
00014
00015 namespace edm
00016 {
00017 EventStreamHttpReader::EventStreamHttpReader
00018 (
00019 ParameterSet const& pset,
00020 InputSourceDescription const& desc
00021 ):
00022 StreamerInputSource(pset, desc),
00023 eventServerProxy_(pset),
00024 dqmStore_(0),
00025 dqmStoreAvailabiltyChecked_(false),
00026 dropOldLumisectionEvents_(pset.getUntrackedParameter<bool>("dropOldLumisectionEvents", false)),
00027 consumerName_(pset.getUntrackedParameter<std::string>("consumerName")),
00028 totalDroppedEvents_(0),
00029 lastLS_(0)
00030 {
00031
00032 inputFileTransitionsEachEvent_ =
00033 pset.getUntrackedParameter<bool>("inputFileTransitionsEachEvent", true);
00034
00035 readHeader();
00036 }
00037
00038
00039 EventPrincipal* EventStreamHttpReader::read()
00040 {
00041 initializeDQMStore();
00042
00043 stor::CurlInterface::Content data;
00044 unsigned int currentLS(0);
00045 unsigned int droppedEvents(0);
00046
00047 do
00048 {
00049 eventServerProxy_.getOneEvent(data);
00050 if ( data.empty() ) return 0;
00051
00052 HeaderView hdrView(&data[0]);
00053 if (hdrView.code() == Header::DONE)
00054 {
00055 setEndRun();
00056 return 0;
00057 }
00058
00059 EventMsgView eventView(&data[0]);
00060 currentLS = eventView.lumi();
00061 droppedEvents += eventView.droppedEventsCount();
00062 }
00063 while (
00064 dropOldLumisectionEvents_ &&
00065 lastLS_ > currentLS
00066 );
00067
00068 lastLS_ = currentLS;
00069
00070 if (dqmStore_)
00071 {
00072 MonitorElement* me = dqmStore_->get("SM_SMPS_Stats/droppedEventsCount_" + consumerName_ );
00073 if (!me){
00074 dqmStore_->setCurrentFolder("SM_SMPS_Stats");
00075 me = dqmStore_->bookInt("droppedEventsCount_" + consumerName_ );
00076 }
00077 totalDroppedEvents_ += droppedEvents;
00078 me->Fill(totalDroppedEvents_);
00079 }
00080
00081 return deserializeEvent(EventMsgView(&data[0]));
00082 }
00083
00084
00085 void EventStreamHttpReader::readHeader()
00086 {
00087 stor::CurlInterface::Content data;
00088
00089 eventServerProxy_.getInitMsg(data);
00090 InitMsgView initView(&data[0]);
00091 deserializeAndMergeWithRegistry(initView);
00092 }
00093
00094
00095 void EventStreamHttpReader::initializeDQMStore()
00096 {
00097 if ( ! dqmStoreAvailabiltyChecked_ )
00098 {
00099 try
00100 {
00101 dqmStore_ = edm::Service<DQMStore>().operator->();
00102 }
00103 catch (cms::Exception& e)
00104 {
00105 edm::LogInfo("EventStreamHttpReader")
00106 << "Service DQMStore not defined. Will not record the number of dropped events.";
00107 }
00108 dqmStoreAvailabiltyChecked_ = true;
00109 }
00110 }
00111
00112
00113 }
00114
00115