![]() |
![]() |
#include <EventFilter/StorageManager/interface/DQMEventServer.h>
Public Member Functions | |
void | addConsumer (boost::shared_ptr< DQMConsumerPipe > consumer) |
Adds the specified consumer to the event server. | |
void | clearQueue () |
DQMEventServer (double maximumRate) | |
DQMEventServer constructor. | |
boost::shared_ptr < DQMConsumerPipe > | getConsumer (uint32 consumerId) |
Returns a shared pointer to the consumer pipe with the specified ID or an empty pointer if the ID was not found. | |
std::map< uint32, boost::shared_ptr < DQMConsumerPipe > > | getConsumerTable () |
boost::shared_ptr< std::vector < char > > | getDQMEvent (uint32 consumerId) |
Returns the next event for the specified consumer. | |
void | processDQMEvent (const DQMEventMsgView &eventView) |
Processes the specified event. | |
~DQMEventServer () | |
DQMEventServer destructor. | |
Private Attributes | |
std::map< uint32, boost::shared_ptr < DQMConsumerPipe > > | consumerTable |
int | disconnectedConsumerTestCounter_ |
struct timeval | lastAcceptedEventTime_ |
double | minTimeBetweenEvents_ |
Static Private Attributes | |
static const double | MAX_ACCEPT_INTERVAL = 86400.0 |
Initialize the maximum accept interval. |
Definition at line 32 of file DQMEventServer.h.
DQMEventServer::DQMEventServer | ( | double | maximumRate | ) |
DQMEventServer constructor.
One way of throttling DQMevents is supported: specifying a maximimum allowed rate of accepted event. However it is not yet implemented because it should be a maximum rate of accepted updates, since one update can have multiple DQMEvents with the same folderID but different eventAtUpdate from different FUs are about the same time and we want to accept all these to collate them Needs implementation to handle if already collated. Also means we need a buffer that can hold more than one DQMEvent
Definition at line 35 of file DQMEventServer.cc.
References disconnectedConsumerTestCounter_, lastAcceptedEventTime_, MAX_ACCEPT_INTERVAL, and minTimeBetweenEvents_.
00036 { 00037 // determine the amount of time that we need to wait between accepted 00038 // events (to ensure that the event server doesn't send "too many" events 00039 // to consumers). The maximum rate specified to this constructor is 00040 // converted to an interval that is used internally, and the interval 00041 // is required to be somewhat reasonable. 00042 /* Note that this is not 00043 * yet implemented because it should be a maximum rate of accepted updates, 00044 * since one update can have multiple DQMEvents with the same folderID but 00045 * different eventAtUpdate from different FUs are about the same time and we 00046 * want to accept all these to collate them 00047 * Needs implementation to handle if already collated. 00048 */ 00049 if (maximumRate < (1.0 / MAX_ACCEPT_INTERVAL)) 00050 { 00051 minTimeBetweenEvents_ = MAX_ACCEPT_INTERVAL; 00052 } 00053 else 00054 { 00055 minTimeBetweenEvents_ = 1.0 / maximumRate; // seconds 00056 } 00057 00058 // initialize the last accepted event time to construction time 00059 struct timezone dummyTZ; 00060 gettimeofday(&lastAcceptedEventTime_, &dummyTZ); 00061 00062 // initialize counters 00063 disconnectedConsumerTestCounter_ = 0; 00064 }
DQMEventServer::~DQMEventServer | ( | ) |
DQMEventServer destructor.
Definition at line 69 of file DQMEventServer.cc.
References lat::endl(), and FDEBUG.
void DQMEventServer::addConsumer | ( | boost::shared_ptr< DQMConsumerPipe > | consumer | ) |
Adds the specified consumer to the event server.
Definition at line 77 of file DQMEventServer.cc.
References consumerTable.
00078 { 00079 uint32 consumerId = consumer->getConsumerId(); 00080 consumerTable[consumerId] = consumer; 00081 }
void DQMEventServer::clearQueue | ( | ) |
Definition at line 245 of file DQMEventServer.cc.
References consumerTable.
00246 { 00247 std::map< uint32, boost::shared_ptr<DQMConsumerPipe> >::const_iterator consIter; 00248 for (consIter = consumerTable.begin(); 00249 consIter != consumerTable.end(); 00250 consIter++) 00251 { 00252 boost::shared_ptr<DQMConsumerPipe> consPipe = consIter->second; 00253 consPipe->clearQueue(); 00254 } 00255 }
boost::shared_ptr< DQMConsumerPipe > DQMEventServer::getConsumer | ( | uint32 | consumerId | ) |
Returns a shared pointer to the consumer pipe with the specified ID or an empty pointer if the ID was not found.
Definition at line 87 of file DQMEventServer.cc.
References consumerTable.
00088 { 00089 // initial empty pointer 00090 boost::shared_ptr<DQMConsumerPipe> consPtr; 00091 00092 // lookup the consumer 00093 std::map< uint32, boost::shared_ptr<DQMConsumerPipe> >::const_iterator consIter; 00094 consIter = consumerTable.find(consumerId); 00095 if (consIter != consumerTable.end()) 00096 { 00097 consPtr = consIter->second; 00098 } 00099 00100 // return the pointer 00101 return consPtr; 00102 }
std::map< uint32, boost::shared_ptr<DQMConsumerPipe> > stor::DQMEventServer::getConsumerTable | ( | ) | [inline] |
Definition at line 43 of file DQMEventServer.h.
References consumerTable.
00044 { return(consumerTable);}
boost::shared_ptr< std::vector< char > > DQMEventServer::getDQMEvent | ( | uint32 | consumerId | ) |
Returns the next event for the specified consumer.
Definition at line 227 of file DQMEventServer.cc.
References consumerTable.
00228 { 00229 // initial empty buffer 00230 boost::shared_ptr< vector<char> > bufPtr; 00231 00232 // lookup the consumer 00233 std::map< uint32, boost::shared_ptr<DQMConsumerPipe> >::const_iterator consIter; 00234 consIter = consumerTable.find(consumerId); 00235 if (consIter != consumerTable.end()) 00236 { 00237 boost::shared_ptr<DQMConsumerPipe> consPipe = consIter->second; 00238 bufPtr = consPipe->getDQMEvent(); 00239 } 00240 00241 // return the event buffer 00242 return bufPtr; 00243 }
void DQMEventServer::processDQMEvent | ( | const DQMEventMsgView & | eventView | ) |
Processes the specified event.
This includes checking whether the event is allowed to be delivered to consumers based on the prescale and maximum event rate specified in the constructor, checking if any consumers are ready to receive events, checking if any consumers are interested in this specific event, making a local copy of the event, and saving the event to be delivered to consumers.
Definition at line 113 of file DQMEventServer.cc.
References consumerTable, edmNew::copy(), disconnectedConsumerTestCounter_, lat::endl(), DQMEventMsgView::eventNumberAtUpdate(), FDEBUG, lastAcceptedEventTime_, NULL, DQMEventMsgView::size(), source, DQMEventMsgView::startAddress(), target, and DQMEventMsgView::topFolderName().
00114 { 00115 // check if we are ready to accept another event 00116 struct timeval now; 00117 // throttle events that occur more frequently than our max allowed rate 00118 struct timezone dummyTZ; 00119 gettimeofday(&now, &dummyTZ); 00120 double timeDiff = (double) now.tv_sec; 00121 timeDiff -= (double) lastAcceptedEventTime_.tv_sec; 00122 timeDiff += ((double) now.tv_usec / 1000000.0); 00123 timeDiff -= ((double) lastAcceptedEventTime_.tv_usec / 1000000.0); 00124 //cout << "timeDiff = " << timeDiff << 00125 // ", minTime = " << minTimeBetweenEvents_ << std::endl; 00126 //if (timeDiff < minTimeBetweenEvents_) {return;} 00127 00128 // actually do not throttle for now as DQMEvents all come at once 00129 // for different top level folders and we can miss the second and 00130 // subsequent top level folders for the same update if we throttle this 00131 // way, we should include the updateAtEventNumber in deciding 00132 // we need a buffer that can hold more than one DQMEvent because all 00133 // FUs are going to send their same folderID DQMEvents at once 00134 00135 // do nothing if the event is empty 00136 if (eventView.size() == 0) {return;} 00137 00138 // loop over the consumers in our list, and for each one check whether 00139 // it is ready for an event and if it wants this specific event. If so, 00140 // create a local copy of the event (if not already done) and pass it 00141 // to the consumer pipe. 00142 boost::shared_ptr< vector<char> > bufPtr; 00143 std::map< uint32, boost::shared_ptr<DQMConsumerPipe> >::const_iterator consIter; 00144 for (consIter = consumerTable.begin(); 00145 consIter != consumerTable.end(); 00146 consIter++) 00147 { 00148 // test if the consumer is ready and wants the event 00149 boost::shared_ptr<DQMConsumerPipe> consPipe = consIter->second; 00150 FDEBUG(5) << "Checking if consumer " << consPipe->getConsumerId() << 00151 " wants update " << eventView.eventNumberAtUpdate() << 00152 " and folder " << eventView.topFolderName() << std::endl; 00153 00154 if (consPipe->isReadyForEvent() && 00155 consPipe->wantsDQMEvent(eventView)) 00156 { 00157 // check if we need to make a local copy of the event 00158 if (bufPtr.get() == NULL) 00159 { 00160 FDEBUG(5) << "Creating a buffer for update " << 00161 eventView.eventNumberAtUpdate() << 00162 " and folder " << eventView.topFolderName() <<std::endl; 00163 00164 // create a local buffer of the appropriate size 00165 boost::shared_ptr< vector<char> > 00166 tmpBufPtr(new vector<char>(eventView.size())); 00167 00168 // copy the data to the local buffer 00169 unsigned char *target = (unsigned char *) &(*tmpBufPtr)[0]; 00170 unsigned char *source = eventView.startAddress(); 00171 int dataSize = eventView.size(); 00172 std::copy(source, source+dataSize, target); 00173 00174 // switch buffers 00175 bufPtr.swap(tmpBufPtr); // what happens to the memory and do we need a delete? 00176 00177 // update the local time stamp for the latest accepted event 00178 lastAcceptedEventTime_ = now; 00179 } 00180 FDEBUG(5) << "Adding DQMevent to consumer pipe for update " << 00181 eventView.eventNumberAtUpdate() << 00182 " and folder " << eventView.topFolderName() <<std::endl; 00183 00184 // add the event to the consumer pipe 00185 consPipe->putDQMEvent(bufPtr); 00186 } 00187 } 00188 00189 // periodically check for disconnected consumers 00190 disconnectedConsumerTestCounter_++; 00191 if (disconnectedConsumerTestCounter_ >= 500) 00192 { 00193 // reset counter 00194 disconnectedConsumerTestCounter_ = 0; 00195 00196 // determine which consumers have disconnected 00197 std::vector<uint32> disconnectList; 00198 std::map< uint32, boost::shared_ptr<DQMConsumerPipe> >::const_iterator consIter; 00199 for (consIter = consumerTable.begin(); 00200 consIter != consumerTable.end(); 00201 consIter++) 00202 { 00203 boost::shared_ptr<DQMConsumerPipe> consPipe = consIter->second; 00204 FDEBUG(5) << "Checking if DQM consumer " << consPipe->getConsumerId() << 00205 " has disconnected " << std::endl; 00206 if (consPipe->isDisconnected()) 00207 { 00208 disconnectList.push_back(consIter->first); 00209 } 00210 } 00211 00212 // remove disconnected consumers from the consumer table 00213 std::vector<uint32>::const_iterator listIter; 00214 for (listIter = disconnectList.begin(); 00215 listIter != disconnectList.end(); 00216 listIter++) 00217 { 00218 uint32 consumerId = *listIter; 00219 consumerTable.erase(consumerId); 00220 } 00221 } 00222 }
std::map< uint32, boost::shared_ptr<DQMConsumerPipe> > stor::DQMEventServer::consumerTable [private] |
Definition at line 57 of file DQMEventServer.h.
Referenced by addConsumer(), clearQueue(), getConsumer(), getConsumerTable(), getDQMEvent(), and processDQMEvent().
Definition at line 54 of file DQMEventServer.h.
Referenced by DQMEventServer(), and processDQMEvent().
struct timeval stor::DQMEventServer::lastAcceptedEventTime_ [read, private] |
Definition at line 51 of file DQMEventServer.h.
Referenced by DQMEventServer(), and processDQMEvent().
const double DQMEventServer::MAX_ACCEPT_INTERVAL = 86400.0 [static, private] |
Initialize the maximum accept interval.
Definition at line 49 of file DQMEventServer.h.
Referenced by DQMEventServer().
double stor::DQMEventServer::minTimeBetweenEvents_ [private] |