Go to the documentation of this file.00001 #if !WITHOUT_CMS_FRAMEWORK
00002 # include "DQMServices/Core/src/DQMService.h"
00003 # include "DQMServices/Core/interface/DQMNet.h"
00004 # include "DQMServices/Core/interface/DQMStore.h"
00005 # include "DQMServices/Core/interface/DQMScope.h"
00006 # include "DQMServices/Core/interface/MonitorElement.h"
00007 # include "FWCore/ServiceRegistry/interface/Service.h"
00008 # include "classlib/utils/Regexp.h"
00009 # include "classlib/utils/Error.h"
00010 # include <pthread.h>
00011 # include <iostream>
00012 # include <string>
00013 # include <memory>
00014 #include "TBufferFile.h"
00015
00016
00017 static pthread_mutex_t s_mutex = PTHREAD_MUTEX_INITIALIZER;
00018
00021 DQMScope::DQMScope(void)
00022 { pthread_mutex_lock(&s_mutex); }
00023
00025 DQMScope::~DQMScope(void)
00026 { pthread_mutex_unlock(&s_mutex); }
00027
00029 static void
00030 restrictDQMAccess(void)
00031 { pthread_mutex_lock(&s_mutex); }
00032
00033 static void
00034 restrictDQMAccessM(const edm::ModuleDescription &)
00035 { restrictDQMAccess(); }
00036
00038 static void
00039 releaseDQMAccess(void)
00040 { pthread_mutex_unlock(&s_mutex); }
00041
00042 static void
00043 releaseDQMAccessM(const edm::ModuleDescription &)
00044 { releaseDQMAccess(); }
00045
00046
00047 DQMService::DQMService(const edm::ParameterSet &pset, edm::ActivityRegistry &ar)
00048 : store_(&*edm::Service<DQMStore>()),
00049 net_(0),
00050 filter_(0),
00051 lastFlush_(0),
00052 publishFrequency_(5.0)
00053 {
00054 ar.watchPreSourceConstruction(&restrictDQMAccessM);
00055 ar.watchPostSourceConstruction(&releaseDQMAccessM);
00056 ar.watchPreSource(&restrictDQMAccess);
00057 ar.watchPostSource(&releaseDQMAccess);
00058 ar.watchPreModule(&restrictDQMAccessM);
00059 ar.watchPostModule(&releaseDQMAccessM);
00060 ar.watchPostProcessEvent(this, &DQMService::flush);
00061 ar.watchPostEndJob(this, &DQMService::shutdown);
00062
00063 std::string host = pset.getUntrackedParameter<std::string>("collectorHost", "");
00064 int port = pset.getUntrackedParameter<int>("collectorPort", 9090);
00065 bool verbose = pset.getUntrackedParameter<bool>("verbose", false);
00066 publishFrequency_ = pset.getUntrackedParameter<double>("publishFrequency", publishFrequency_);
00067 std::string filter = pset.getUntrackedParameter<std::string>("filter", "");
00068
00069 if (host != "" && port > 0)
00070 {
00071 net_ = new DQMBasicNet;
00072 net_->debug(verbose);
00073 net_->updateToCollector(host, port);
00074 net_->start();
00075 }
00076
00077 if (! filter.empty())
00078 {
00079 try
00080 {
00081 filter_ = new lat::Regexp(filter);
00082 if (! filter_->valid())
00083 throw cms::Exception("DQMService")
00084 << "Invalid 'filter' parameter value '" << filter << "':"
00085 << " bad regular expression syntax at character "
00086 << filter_->errorOffset() << ": " << filter_->errorMessage();
00087 filter_->study();
00088 }
00089 catch (lat::Error &e)
00090 {
00091 throw cms::Exception("DQMService")
00092 << "Invalid regular expression 'filter' parameter value '"
00093 << filter << "': " << e.explain();
00094 }
00095 }
00096 }
00097
00098 DQMService::~DQMService(void)
00099 {
00100 shutdown();
00101 }
00102
00103
00104
00105
00106 void DQMService::flushStandalone()
00107 {
00108
00109 uint64_t version = lat::Time::current().ns();
00110 double vtime = version * 1e-9;
00111 if (vtime - lastFlush_ < publishFrequency_)
00112 return;
00113
00114
00115 if (net_)
00116 {
00117 DQMNet::Object o;
00118 std::set<std::string> seen;
00119 std::string fullpath;
00120
00121
00122 net_->lock();
00123 bool updated = false;
00124
00125
00126 DQMStore::MEMap::iterator i, e;
00127 net_->reserveLocalSpace(store_->data_.size());
00128 for (i = store_->data_.begin(), e = store_->data_.end(); i != e; ++i)
00129 {
00130 const MonitorElement &me = *i;
00131 fullpath.clear();
00132 fullpath += *me.data_.dirname;
00133 if (! me.data_.dirname->empty())
00134 fullpath += '/';
00135 fullpath += me.data_.objname;
00136
00137 if (filter_ && filter_->search(fullpath) < 0)
00138 continue;
00139
00140 seen.insert(fullpath);
00141 if (! me.wasUpdated())
00142 continue;
00143
00144 o.lastreq = 0;
00145 o.hash = DQMNet::dqmhash(fullpath.c_str(), fullpath.size());
00146 o.flags = me.data_.flags;
00147 o.tag = me.data_.tag;
00148 o.version = version;
00149 o.dirname = me.data_.dirname;
00150 o.objname = me.data_.objname;
00151 assert(o.rawdata.empty());
00152 assert(o.scalar.empty());
00153 assert(o.qdata.empty());
00154
00155
00156 switch (me.kind())
00157 {
00158 case MonitorElement::DQM_KIND_INT:
00159 case MonitorElement::DQM_KIND_REAL:
00160 case MonitorElement::DQM_KIND_STRING:
00161 me.packScalarData(o.scalar, "");
00162 break;
00163
00164 default:
00165 {
00166 TBufferFile buffer(TBufferFile::kWrite);
00167 buffer.WriteObject(me.object_);
00168 if (me.reference_)
00169 buffer.WriteObject(me.reference_);
00170 else
00171 buffer.WriteObjectAny(0, 0);
00172 o.rawdata.resize(buffer.Length());
00173 memcpy(&o.rawdata[0], buffer.Buffer(), buffer.Length());
00174 DQMNet::packQualityData(o.qdata, me.data_.qreports);
00175 break;
00176 }
00177 }
00178
00179
00180 net_->updateLocalObject(o);
00181 DQMNet::DataBlob().swap(o.rawdata);
00182 std::string().swap(o.scalar);
00183 std::string().swap(o.qdata);
00184 updated = true;
00185 }
00186
00187
00188 if (net_->removeLocalExcept(seen))
00189 updated = true;
00190
00191
00192 net_->unlock();
00193
00194
00195 if (updated)
00196 net_->sendLocalChanges();
00197 }
00198
00199 store_->reset();
00200 lastFlush_ = lat::Time::current().ns() * 1e-9;
00201
00202
00203 }
00204 void
00205 DQMService::flush(const edm::Event &, const edm::EventSetup &)
00206 {
00207
00208 flushStandalone();
00209 }
00210
00211
00212 void
00213 DQMService::shutdown(void)
00214 {
00215
00216 if (net_)
00217 net_->shutdown();
00218 }
00219
00220 #endif // !WITHOUT_CMS_FRAMEWORK