CMS 3D CMS Logo

stor::DQMEventServer Class Reference

#include <EventFilter/StorageManager/interface/DQMEventServer.h>

List of all members.

Public Member Functions

void addConsumer (boost::shared_ptr< DQMConsumerPipe > consumer)
 Adds the specified consumer to the event server.
void clearQueue ()
 DQMEventServer (double maximumRate)
 DQMEventServer constructor.
boost::shared_ptr
< DQMConsumerPipe
getConsumer (uint32 consumerId)
 Returns a shared pointer to the consumer pipe with the specified ID or an empty pointer if the ID was not found.
std::map< uint32,
boost::shared_ptr
< DQMConsumerPipe > > 
getConsumerTable ()
boost::shared_ptr< std::vector
< char > > 
getDQMEvent (uint32 consumerId)
 Returns the next event for the specified consumer.
void processDQMEvent (const DQMEventMsgView &eventView)
 Processes the specified event.
 ~DQMEventServer ()
 DQMEventServer destructor.

Private Attributes

std::map< uint32,
boost::shared_ptr
< DQMConsumerPipe > > 
consumerTable
int disconnectedConsumerTestCounter_
struct timeval lastAcceptedEventTime_
double minTimeBetweenEvents_

Static Private Attributes

static const double MAX_ACCEPT_INTERVAL = 86400.0
 Initialize the maximum accept interval.


Detailed Description

Definition at line 32 of file DQMEventServer.h.


Constructor & Destructor Documentation

DQMEventServer::DQMEventServer ( double  maximumRate  ) 

DQMEventServer constructor.

One way of throttling DQMevents is supported: specifying a maximimum allowed rate of accepted event. However it is not yet implemented because it should be a maximum rate of accepted updates, since one update can have multiple DQMEvents with the same folderID but different eventAtUpdate from different FUs are about the same time and we want to accept all these to collate them Needs implementation to handle if already collated. Also means we need a buffer that can hold more than one DQMEvent

Definition at line 35 of file DQMEventServer.cc.

References disconnectedConsumerTestCounter_, lastAcceptedEventTime_, MAX_ACCEPT_INTERVAL, and minTimeBetweenEvents_.

00036 {
00037   // determine the amount of time that we need to wait between accepted
00038   // events (to ensure that the event server doesn't send "too many" events
00039   // to consumers).  The maximum rate specified to this constructor is
00040   // converted to an interval that is used internally, and the interval
00041   // is required to be somewhat reasonable.
00042   /* Note that this is not
00043    * yet implemented because it should be a maximum rate of accepted updates,
00044    * since one update can have multiple DQMEvents with the same folderID but
00045    * different eventAtUpdate from different FUs are about the same time and we
00046    * want to accept all these to collate them
00047    * Needs implementation to handle if already collated.
00048    */
00049   if (maximumRate < (1.0 / MAX_ACCEPT_INTERVAL))
00050   {
00051     minTimeBetweenEvents_ = MAX_ACCEPT_INTERVAL;
00052   }
00053   else
00054   {
00055     minTimeBetweenEvents_ = 1.0 / maximumRate;  // seconds
00056   }
00057 
00058   // initialize the last accepted event time to construction time
00059   struct timezone dummyTZ;
00060   gettimeofday(&lastAcceptedEventTime_, &dummyTZ);
00061 
00062   // initialize counters
00063   disconnectedConsumerTestCounter_ = 0;
00064 }

DQMEventServer::~DQMEventServer (  ) 

DQMEventServer destructor.

Definition at line 69 of file DQMEventServer.cc.

References lat::endl(), and FDEBUG.

00070 {
00071   FDEBUG(5) << "Executing destructor for DQMevent server " << std::endl;
00072 }


Member Function Documentation

void DQMEventServer::addConsumer ( boost::shared_ptr< DQMConsumerPipe consumer  ) 

Adds the specified consumer to the event server.

Definition at line 77 of file DQMEventServer.cc.

References consumerTable.

00078 {
00079   uint32 consumerId = consumer->getConsumerId();
00080   consumerTable[consumerId] = consumer;
00081 }

void DQMEventServer::clearQueue (  ) 

Definition at line 245 of file DQMEventServer.cc.

References consumerTable.

00246 {
00247   std::map< uint32, boost::shared_ptr<DQMConsumerPipe> >::const_iterator consIter;
00248   for (consIter = consumerTable.begin();
00249        consIter != consumerTable.end();
00250        consIter++)
00251   {
00252     boost::shared_ptr<DQMConsumerPipe> consPipe = consIter->second;
00253     consPipe->clearQueue();
00254   }
00255 }

boost::shared_ptr< DQMConsumerPipe > DQMEventServer::getConsumer ( uint32  consumerId  ) 

Returns a shared pointer to the consumer pipe with the specified ID or an empty pointer if the ID was not found.

Definition at line 87 of file DQMEventServer.cc.

References consumerTable.

00088 {
00089   // initial empty pointer
00090   boost::shared_ptr<DQMConsumerPipe> consPtr;
00091 
00092   // lookup the consumer
00093   std::map< uint32, boost::shared_ptr<DQMConsumerPipe> >::const_iterator consIter;
00094   consIter = consumerTable.find(consumerId);
00095   if (consIter != consumerTable.end())
00096   {
00097     consPtr = consIter->second;
00098   }
00099 
00100   // return the pointer
00101   return consPtr;
00102 }

std::map< uint32, boost::shared_ptr<DQMConsumerPipe> > stor::DQMEventServer::getConsumerTable (  )  [inline]

Definition at line 43 of file DQMEventServer.h.

References consumerTable.

00044     { return(consumerTable);}

boost::shared_ptr< std::vector< char > > DQMEventServer::getDQMEvent ( uint32  consumerId  ) 

Returns the next event for the specified consumer.

Definition at line 227 of file DQMEventServer.cc.

References consumerTable.

00228 {
00229   // initial empty buffer
00230   boost::shared_ptr< vector<char> > bufPtr;
00231 
00232   // lookup the consumer
00233   std::map< uint32, boost::shared_ptr<DQMConsumerPipe> >::const_iterator consIter;
00234   consIter = consumerTable.find(consumerId);
00235   if (consIter != consumerTable.end())
00236   {
00237     boost::shared_ptr<DQMConsumerPipe> consPipe = consIter->second;
00238     bufPtr = consPipe->getDQMEvent();
00239   }
00240 
00241   // return the event buffer
00242   return bufPtr;
00243 }

void DQMEventServer::processDQMEvent ( const DQMEventMsgView eventView  ) 

Processes the specified event.

This includes checking whether the event is allowed to be delivered to consumers based on the prescale and maximum event rate specified in the constructor, checking if any consumers are ready to receive events, checking if any consumers are interested in this specific event, making a local copy of the event, and saving the event to be delivered to consumers.

Definition at line 113 of file DQMEventServer.cc.

References consumerTable, edmNew::copy(), disconnectedConsumerTestCounter_, lat::endl(), DQMEventMsgView::eventNumberAtUpdate(), FDEBUG, lastAcceptedEventTime_, NULL, DQMEventMsgView::size(), source, DQMEventMsgView::startAddress(), target, and DQMEventMsgView::topFolderName().

00114 {
00115   // check if we are ready to accept another event
00116   struct timeval now;
00117   // throttle events that occur more frequently than our max allowed rate
00118   struct timezone dummyTZ;
00119   gettimeofday(&now, &dummyTZ);
00120   double timeDiff = (double) now.tv_sec;
00121   timeDiff -= (double) lastAcceptedEventTime_.tv_sec;
00122   timeDiff += ((double) now.tv_usec / 1000000.0);
00123   timeDiff -= ((double) lastAcceptedEventTime_.tv_usec / 1000000.0);
00124   //cout << "timeDiff = " << timeDiff <<
00125   //  ", minTime = " << minTimeBetweenEvents_ << std::endl;
00126   //if (timeDiff < minTimeBetweenEvents_) {return;}
00127 
00128   // actually do not throttle for now as DQMEvents all come at once
00129   // for different top level folders and we can miss the second and
00130   // subsequent top level folders for the same update if we throttle this
00131   // way, we should include the updateAtEventNumber in deciding
00132   // we need a buffer that can hold more than one DQMEvent because all
00133   // FUs are going to send their same folderID DQMEvents at once
00134 
00135   // do nothing if the event is empty
00136   if (eventView.size() == 0) {return;}
00137 
00138   // loop over the consumers in our list, and for each one check whether
00139   // it is ready for an event and if it wants this specific event.  If so,
00140   // create a local copy of the event (if not already done) and pass it
00141   // to the consumer pipe.
00142   boost::shared_ptr< vector<char> > bufPtr;
00143   std::map< uint32, boost::shared_ptr<DQMConsumerPipe> >::const_iterator consIter;
00144   for (consIter = consumerTable.begin();
00145        consIter != consumerTable.end();
00146        consIter++)
00147   {
00148     // test if the consumer is ready and wants the event
00149     boost::shared_ptr<DQMConsumerPipe> consPipe = consIter->second;
00150     FDEBUG(5) << "Checking if consumer " << consPipe->getConsumerId() <<
00151       " wants update " << eventView.eventNumberAtUpdate() << 
00152       " and folder " << eventView.topFolderName() << std::endl;
00153     
00154     if (consPipe->isReadyForEvent() &&
00155         consPipe->wantsDQMEvent(eventView))
00156     {
00157       // check if we need to make a local copy of the event
00158       if (bufPtr.get() == NULL)
00159       {
00160         FDEBUG(5) << "Creating a buffer for update " <<
00161           eventView.eventNumberAtUpdate() << 
00162           " and folder " << eventView.topFolderName() <<std::endl;
00163 
00164         // create a local buffer of the appropriate size
00165         boost::shared_ptr< vector<char> >
00166           tmpBufPtr(new vector<char>(eventView.size()));
00167 
00168         // copy the data to the local buffer
00169         unsigned char *target = (unsigned char *) &(*tmpBufPtr)[0];
00170         unsigned char *source = eventView.startAddress();
00171         int dataSize = eventView.size();
00172         std::copy(source, source+dataSize, target);
00173 
00174         // switch buffers
00175         bufPtr.swap(tmpBufPtr); // what happens to the memory and do we need a delete?
00176 
00177         // update the local time stamp for the latest accepted event
00178         lastAcceptedEventTime_ = now;
00179       }
00180       FDEBUG(5) << "Adding DQMevent to consumer pipe for update " <<
00181           eventView.eventNumberAtUpdate() << 
00182           " and folder " << eventView.topFolderName() <<std::endl;
00183 
00184       // add the event to the consumer pipe
00185       consPipe->putDQMEvent(bufPtr);
00186     }
00187   }
00188 
00189   // periodically check for disconnected consumers
00190   disconnectedConsumerTestCounter_++;
00191   if (disconnectedConsumerTestCounter_ >= 500)
00192   {
00193     // reset counter
00194     disconnectedConsumerTestCounter_ = 0;
00195 
00196     // determine which consumers have disconnected
00197     std::vector<uint32> disconnectList;
00198     std::map< uint32, boost::shared_ptr<DQMConsumerPipe> >::const_iterator consIter;
00199     for (consIter = consumerTable.begin();
00200          consIter != consumerTable.end();
00201          consIter++)
00202     {
00203       boost::shared_ptr<DQMConsumerPipe> consPipe = consIter->second;
00204       FDEBUG(5) << "Checking if DQM consumer " << consPipe->getConsumerId() <<
00205         " has disconnected " << std::endl;
00206       if (consPipe->isDisconnected())
00207       {
00208         disconnectList.push_back(consIter->first);
00209       }
00210     }
00211 
00212     // remove disconnected consumers from the consumer table
00213     std::vector<uint32>::const_iterator listIter;
00214     for (listIter = disconnectList.begin();
00215          listIter != disconnectList.end();
00216          listIter++)
00217     {
00218       uint32 consumerId = *listIter;
00219       consumerTable.erase(consumerId);
00220     }
00221   }
00222 }


Member Data Documentation

std::map< uint32, boost::shared_ptr<DQMConsumerPipe> > stor::DQMEventServer::consumerTable [private]

Definition at line 57 of file DQMEventServer.h.

Referenced by addConsumer(), clearQueue(), getConsumer(), getConsumerTable(), getDQMEvent(), and processDQMEvent().

int stor::DQMEventServer::disconnectedConsumerTestCounter_ [private]

Definition at line 54 of file DQMEventServer.h.

Referenced by DQMEventServer(), and processDQMEvent().

struct timeval stor::DQMEventServer::lastAcceptedEventTime_ [read, private]

Definition at line 51 of file DQMEventServer.h.

Referenced by DQMEventServer(), and processDQMEvent().

const double DQMEventServer::MAX_ACCEPT_INTERVAL = 86400.0 [static, private]

Initialize the maximum accept interval.

Definition at line 49 of file DQMEventServer.h.

Referenced by DQMEventServer().

double stor::DQMEventServer::minTimeBetweenEvents_ [private]

Definition at line 50 of file DQMEventServer.h.

Referenced by DQMEventServer().


The documentation for this class was generated from the following files:
Generated on Tue Jun 9 18:52:50 2009 for CMSSW by  doxygen 1.5.4