#include <EventFilter/StorageManager/interface/DQMServiceManager.h>
Public Member Functions | |
DQMServiceManager (std::string filePrefix="/tmp/DQM", int purgeTime=DEFAULT_PURGE_TIME, int readyTime=DEFAULT_READY_TIME, bool collateDQM=false, bool archiveDQM=false, int archiveInterval=0, bool useCompression=true, int compressionLevel=1) | |
DQMGroupDescriptor * | getBestDQMGroupDescriptor (std::string groupName) |
void | manageDQMEventMsg (DQMEventMsgView &msg) |
void | setArchiveDQM (bool archiveDQM) |
void | setArchiveInterval (int archiveInterval) |
void | setCollateDQM (bool collateDQM) |
void | setCompressionLevel (int compressionLevel) |
void | setDQMEventServer (boost::shared_ptr< DQMEventServer > &es) |
void | setFilePrefix (std::string filePrefix) |
void | setPurgeTime (int purgeTime) |
void | setReadyTime (int readyTime) |
void | setUseCompression (bool useCompression) |
void | stop () |
~DQMServiceManager () | |
Protected Types | |
enum | { DEFAULT_PURGE_TIME = 120, DEFAULT_READY_TIME = 30 } |
Protected Member Functions | |
DQMInstance * | findDQMInstance (int runNumber, int lumiSection, int instance) |
int | writeAndPurgeDQMInstances (bool purgeAll=false) |
Protected Attributes | |
bool | archiveDQM_ |
int | archiveInterval_ |
bool | collateDQM_ |
int | compressionLevel_ |
boost::shared_ptr< DQMEventServer > | DQMeventServer_ |
std::vector< DQMInstance * > | dqmInstances_ |
std::string | filePrefix_ |
int | nUpdates_ |
int | purgeTime_ |
int | readyTime_ |
bool | useCompression_ |
Definition at line 24 of file DQMServiceManager.h.
anonymous enum [protected] |
Definition at line 75 of file DQMServiceManager.h.
00076 { 00077 DEFAULT_PURGE_TIME = 120, 00078 DEFAULT_READY_TIME = 30 00079 };
DQMServiceManager::DQMServiceManager | ( | std::string | filePrefix = "/tmp/DQM" , |
|
int | purgeTime = DEFAULT_PURGE_TIME , |
|||
int | readyTime = DEFAULT_READY_TIME , |
|||
bool | collateDQM = false , |
|||
bool | archiveDQM = false , |
|||
int | archiveInterval = 0 , |
|||
bool | useCompression = true , |
|||
int | compressionLevel = 1 | |||
) | [explicit] |
Definition at line 20 of file DQMServiceManager.cc.
References dqmInstances_.
00027 : 00028 useCompression_(useCompression), 00029 compressionLevel_(compressionLevel), 00030 collateDQM_(collateDQM), 00031 archiveDQM_(archiveDQM), 00032 archiveInterval_(archiveInterval), 00033 nUpdates_(0), 00034 filePrefix_(filePrefix), 00035 purgeTime_(purgeTime), 00036 readyTime_(readyTime) 00037 { 00038 dqmInstances_.reserve(20); 00039 00040 gROOT->SetBatch(kTRUE); 00041 }
DQMServiceManager::~DQMServiceManager | ( | ) |
DQMInstance * DQMServiceManager::findDQMInstance | ( | int | runNumber, | |
int | lumiSection, | |||
int | instance | |||
) | [protected] |
Definition at line 185 of file DQMServiceManager.cc.
References dqmInstances_, i, n, and NULL.
Referenced by manageDQMEventMsg().
00188 { 00189 DQMInstance * reply = NULL; 00190 int n = dqmInstances_.size(); 00191 for ( int i=0; (i<n) && (reply==NULL); i++) 00192 { 00193 if ( ( dqmInstances_[i]->getRunNumber() == runNumber ) && 00194 ( dqmInstances_[i]->getLumiSection() == lumiSection ) && 00195 ( dqmInstances_[i]->getInstance() == instance ) ) 00196 { reply = dqmInstances_[i]; } 00197 } 00198 return(reply); 00199 }
DQMGroupDescriptor * DQMServiceManager::getBestDQMGroupDescriptor | ( | std::string | groupName | ) |
Definition at line 258 of file DQMServiceManager.cc.
References dqmInstances_, stor::DQMInstance::getDQMGroup(), stor::DQMGroup::getLastUpdate(), group, stor::DQMGroup::isReady(), and NULL.
Referenced by manageDQMEventMsg().
00259 { 00260 DQMGroupDescriptor * reply = NULL; 00261 TTimeStamp now; 00262 now.Set(); 00263 00264 DQMInstance * newestInstance = NULL; 00265 DQMGroup * newestGroup = NULL; 00266 int maxTime = 0; 00267 for (std::vector<DQMInstance * >::iterator i0 = 00268 dqmInstances_.begin(); i0 != dqmInstances_.end() ; ++i0) 00269 { 00270 DQMInstance * instance = *i0; 00271 DQMGroup * group = instance->getDQMGroup(groupName); 00272 if (group != NULL && group->isReady(now.GetSec())) 00273 { 00274 TTimeStamp * zeit = group->getLastUpdate(); 00275 if ( ( zeit != NULL ) && ( zeit->GetSec() > maxTime )) 00276 { 00277 maxTime = zeit->GetSec(); 00278 newestInstance = instance; 00279 newestGroup = group; 00280 } 00281 } 00282 } 00283 00284 if ( ( newestInstance != NULL ) && 00285 ( newestGroup != NULL ) ) 00286 { reply = new DQMGroupDescriptor(newestInstance,newestGroup); } 00287 00288 return(reply); 00289 }
void DQMServiceManager::manageDQMEventMsg | ( | DQMEventMsgView & | msg | ) |
Definition at line 49 of file DQMServiceManager.cc.
References edm::StreamDQMSerializer::bufferPointer(), collateDQM_, compressionLevel_, edmNew::copy(), edm::StreamDQMSerializer::currentEventSize(), edm::StreamDQMSerializer::currentSpaceUsed(), edm::StreamDQMDeserializer::deserializeDQMEvent(), DQMeventServer_, stor::DQMGroup::dqmFolders_, dqmInstances_, stor::DQMFolder::dqmObjects_, lat::endl(), DQMEventMsgView::eventNumberAtUpdate(), FDEBUG, findDQMInstance(), getBestDQMGroupDescriptor(), stor::DQMInstance::getInstance(), stor::DQMGroup::getLastEvent(), stor::DQMGroup::getLastUpdate(), stor::DQMInstance::getLumiSection(), stor::DQMInstance::getRunNumber(), group, stor::DQMGroupDescriptor::group_, i1, i2, stor::DQMGroupDescriptor::instance_, int, DQMEventMsgView::lumiSection(), NULL, purgeTime_, readyTime_, DQMEventMsgView::releaseTag(), DQMEventMsgView::runNumber(), edm::StreamDQMSerializer::serializeDQMEvent(), stor::DQMGroup::setServedSinceUpdate(), source, edm::table, DQMEventMsgView::topFolderName(), DQMEventMsgView::updateNumber(), stor::DQMInstance::updateObject(), useCompression_, stor::DQMGroup::wasServedSinceUpdate(), and writeAndPurgeDQMInstances().
00050 { 00051 // At the moment implement the behaviour such that collateDQM = archive DQM 00052 // so if we don't collate we also don't archive, else we need changes here 00053 if(!collateDQM_) 00054 { 00055 // no collation just pass the DQMEvent to the Event Server and return 00056 if (DQMeventServer_.get() != NULL) 00057 { 00058 DQMeventServer_->processDQMEvent(msg); 00059 } 00060 return; 00061 } 00062 00063 DQMInstance * dqm = findDQMInstance(msg.runNumber(), 00064 msg.lumiSection(), 00065 msg.updateNumber()); 00066 if ( dqm == NULL ) 00067 { 00068 dqm = new DQMInstance(msg.runNumber(), 00069 msg.lumiSection(), 00070 msg.updateNumber(), 00071 purgeTime_, 00072 readyTime_); 00073 dqmInstances_.push_back(dqm); 00074 int preSize = dqmInstances_.size(); 00075 00076 // At this point, purge old instances from the list 00077 writeAndPurgeDQMInstances(false); 00078 FDEBUG(4) << "Live DQMInstances before purge " << preSize << 00079 " and after " << dqmInstances_.size() << std::endl; 00080 } 00081 00082 edm::StreamDQMDeserializer deserializer; 00083 std::auto_ptr<DQMEvent::TObjectTable> toTablePtr = 00084 deserializer.deserializeDQMEvent(msg); 00085 00086 DQMEvent::TObjectTable::const_iterator toIter; 00087 for (toIter = toTablePtr->begin(); 00088 toIter != toTablePtr->end(); toIter++) 00089 { 00090 std::string subFolderName = toIter->first; 00091 std::vector<TObject *> toList = toIter->second; 00092 00093 for (int tdx = 0; tdx < (int) toList.size(); tdx++) 00094 { 00095 TObject *object = toList[tdx]; 00096 dqm->updateObject(msg.topFolderName(), 00097 subFolderName, 00098 object, 00099 msg.eventNumberAtUpdate()); 00100 delete(object); 00101 } 00102 } 00103 00104 // Now send the best DQMGroup for this grouping, which may 00105 // not be the currently updated one (it may not yet be ready) 00106 DQMGroupDescriptor * descriptor = 00107 getBestDQMGroupDescriptor(msg.topFolderName()); 00108 if ( descriptor != NULL ) 00109 { 00110 // Reserialize the data and give to DQM server 00111 DQMGroup * group = descriptor->group_; 00112 if ( !group->wasServedSinceUpdate() ) 00113 { 00114 group->setServedSinceUpdate(); 00115 DQMInstance * instance = descriptor->instance_; 00116 00117 // Package list of TObjects into a DQMEvent::TObjectTable 00118 DQMEvent::TObjectTable table; 00119 00120 int subFolderSize = 0; 00121 for ( std::map<std::string, DQMFolder *>::iterator i1 = 00122 group->dqmFolders_.begin(); i1 != group->dqmFolders_.end(); ++i1) 00123 { 00124 std::string folderName = i1->first; 00125 DQMFolder * folder = i1->second; 00126 for ( std::map<std::string, TObject *>::iterator i2 = 00127 folder->dqmObjects_.begin(); i2!=folder->dqmObjects_.end(); ++i2) 00128 { 00129 std::string objectName = i2->first; 00130 TObject *object = i2->second; 00131 if ( object != NULL ) 00132 { 00133 if ( table.count(folderName) == 0 ) 00134 { 00135 std::vector<TObject *> newObjectVector; 00136 table[folderName] = newObjectVector; 00137 subFolderSize += 2*sizeof(uint32) + folderName.length(); 00138 } 00139 std::vector<TObject *> &objectVector = table[folderName]; 00140 objectVector.push_back(object); 00141 } 00142 } 00143 } 00144 00145 edm::StreamDQMSerializer serializer; 00146 serializer.serializeDQMEvent(table, 00147 useCompression_, 00148 compressionLevel_); 00149 // Add space for header 00150 unsigned int sourceSize = serializer.currentSpaceUsed(); 00151 unsigned int totalSize = sourceSize 00152 + sizeof(DQMEventHeader) 00153 + 12*sizeof(uint32) 00154 + msg.releaseTag().length() 00155 + msg.topFolderName().length() 00156 + subFolderSize; 00157 unsigned char * buffer = (unsigned char *)malloc(totalSize); 00158 00159 edm::Timestamp zeit( ( (unsigned long long)group->getLastUpdate()->GetSec() << 32 ) | 00160 ( group->getLastUpdate()->GetNanoSec())); 00161 00162 DQMEventMsgBuilder builder((void *)&buffer[0], 00163 totalSize, 00164 instance->getRunNumber(), 00165 group->getLastEvent(), 00166 zeit, 00167 instance->getLumiSection(), 00168 instance->getInstance(), 00169 msg.releaseTag(), 00170 msg.topFolderName(), 00171 table); 00172 unsigned char * source = serializer.bufferPointer(); 00173 std::copy(source,source+sourceSize, builder.eventAddress()); 00174 builder.setEventLength(sourceSize); 00175 if ( useCompression_ ) { builder.setCompressionFlag(serializer.currentEventSize()); } 00176 DQMEventMsgView serveMessage(&buffer[0]); 00177 DQMeventServer_->processDQMEvent(serveMessage); 00178 00179 free(buffer); 00180 } 00181 delete(descriptor); 00182 } 00183 }
Definition at line 47 of file DQMServiceManager.h.
References archiveDQM_.
00047 { archiveDQM_ = archiveDQM; }
Definition at line 48 of file DQMServiceManager.h.
References archiveInterval_.
00048 { archiveInterval_ = archiveInterval; }
Definition at line 46 of file DQMServiceManager.h.
References collateDQM_.
00046 { collateDQM_ = collateDQM; }
Definition at line 53 of file DQMServiceManager.h.
References compressionLevel_.
00054 { compressionLevel_ = compressionLevel;}
void stor::DQMServiceManager::setDQMEventServer | ( | boost::shared_ptr< DQMEventServer > & | es | ) | [inline] |
Definition at line 49 of file DQMServiceManager.h.
References DQMeventServer_.
00050 { DQMeventServer_ = es; }
void stor::DQMServiceManager::setFilePrefix | ( | std::string | filePrefix | ) | [inline] |
Definition at line 43 of file DQMServiceManager.h.
References filePrefix_.
00044 { filePrefix_ = filePrefix;}
Definition at line 41 of file DQMServiceManager.h.
References purgeTime_.
00041 { purgeTime_ = purgeTime;}
Definition at line 42 of file DQMServiceManager.h.
References readyTime_.
00042 { readyTime_ = readyTime;}
Definition at line 51 of file DQMServiceManager.h.
References useCompression_.
00052 { useCompression_ = useCompression;}
void DQMServiceManager::stop | ( | ) |
Definition at line 46 of file DQMServiceManager.cc.
References collateDQM_, and writeAndPurgeDQMInstances().
Referenced by ~DQMServiceManager().
00047 { if(collateDQM_) writeAndPurgeDQMInstances(true);}
Definition at line 201 of file DQMServiceManager.cc.
References archiveDQM_, archiveInterval_, dqmInstances_, filePrefix_, stor::DQMInstance::getLumiSection(), stor::DQMInstance::isReady(), stor::DQMInstance::isStale(), n, and stor::DQMInstance::writeFile().
Referenced by manageDQMEventMsg(), and stop().
00202 { 00203 int reply = 0; 00204 00205 TTimeStamp now; 00206 now.Set(); 00207 00208 // Always keep at least one instance in memory, unless we're 00209 // writing everything 00210 int minInstances = 1; 00211 if ( writeAll ) { minInstances=0;} 00212 00213 // if we're writing everything, find the last instance that is ready 00214 int listSizeWithOneReady = 0; 00215 if ( writeAll ) 00216 { 00217 std::vector<DQMInstance *>::reverse_iterator r0 = dqmInstances_.rbegin(); 00218 while ( ( r0 != dqmInstances_.rend() ) ) 00219 { 00220 ++listSizeWithOneReady; 00221 DQMInstance * instance = *r0; 00222 if (instance->isReady(now.GetSec())) 00223 { 00224 break; 00225 } 00226 ++r0; 00227 } 00228 } 00229 00230 int n = dqmInstances_.size(); 00231 std::vector<DQMInstance *>::iterator i0 = dqmInstances_.begin(); 00232 while ( ( i0 != dqmInstances_.end() ) && ( n>minInstances) ) 00233 { 00234 DQMInstance * instance = *i0; 00235 if ( instance->isStale(now.GetSec()) || writeAll) 00236 { 00237 if (archiveDQM_ && instance->isReady(now.GetSec()) && 00238 ((archiveInterval_ > 0 && 00239 (instance->getLumiSection() % archiveInterval_) == 0) 00240 || (writeAll && n == listSizeWithOneReady))) 00241 { 00242 instance->writeFile(filePrefix_, 00243 (writeAll && n == listSizeWithOneReady)); 00244 } 00245 delete(instance); 00246 reply++; 00247 i0 = dqmInstances_.erase(i0); 00248 } 00249 else 00250 { 00251 ++i0; 00252 } 00253 n = dqmInstances_.size(); 00254 } 00255 return(reply); 00256 }
bool stor::DQMServiceManager::archiveDQM_ [protected] |
Definition at line 61 of file DQMServiceManager.h.
Referenced by setArchiveDQM(), and writeAndPurgeDQMInstances().
int stor::DQMServiceManager::archiveInterval_ [protected] |
Definition at line 62 of file DQMServiceManager.h.
Referenced by setArchiveInterval(), and writeAndPurgeDQMInstances().
bool stor::DQMServiceManager::collateDQM_ [protected] |
Definition at line 60 of file DQMServiceManager.h.
Referenced by manageDQMEventMsg(), setCollateDQM(), and stop().
int stor::DQMServiceManager::compressionLevel_ [protected] |
Definition at line 59 of file DQMServiceManager.h.
Referenced by manageDQMEventMsg(), and setCompressionLevel().
boost::shared_ptr<DQMEventServer> stor::DQMServiceManager::DQMeventServer_ [protected] |
Definition at line 74 of file DQMServiceManager.h.
Referenced by manageDQMEventMsg(), and setDQMEventServer().
std::vector<DQMInstance *> stor::DQMServiceManager::dqmInstances_ [protected] |
Definition at line 68 of file DQMServiceManager.h.
Referenced by DQMServiceManager(), findDQMInstance(), getBestDQMGroupDescriptor(), manageDQMEventMsg(), and writeAndPurgeDQMInstances().
std::string stor::DQMServiceManager::filePrefix_ [protected] |
Definition at line 64 of file DQMServiceManager.h.
Referenced by setFilePrefix(), and writeAndPurgeDQMInstances().
int stor::DQMServiceManager::nUpdates_ [protected] |
Definition at line 63 of file DQMServiceManager.h.
int stor::DQMServiceManager::purgeTime_ [protected] |
Definition at line 65 of file DQMServiceManager.h.
Referenced by manageDQMEventMsg(), and setPurgeTime().
int stor::DQMServiceManager::readyTime_ [protected] |
Definition at line 66 of file DQMServiceManager.h.
Referenced by manageDQMEventMsg(), and setReadyTime().
bool stor::DQMServiceManager::useCompression_ [protected] |
Definition at line 58 of file DQMServiceManager.h.
Referenced by manageDQMEventMsg(), and setUseCompression().