Go to the documentation of this file.00001 #if !WITHOUT_CMS_FRAMEWORK
00002 # include "DQMServices/Core/src/DQMService.h"
00003 # include "DQMServices/Core/interface/DQMNet.h"
00004 # include "DQMServices/Core/interface/DQMStore.h"
00005 # include "DQMServices/Core/interface/DQMScope.h"
00006 # include "DQMServices/Core/interface/MonitorElement.h"
00007 # include "FWCore/ServiceRegistry/interface/Service.h"
00008 # include "classlib/utils/Regexp.h"
00009 # include "classlib/utils/Error.h"
00010 # include <pthread.h>
00011 # include <iostream>
00012 # include <string>
00013 # include <memory>
00014 #include "TBufferFile.h"
00015
00016
00017 static pthread_mutex_t s_mutex = PTHREAD_MUTEX_INITIALIZER;
00018
00021 DQMScope::DQMScope(void)
00022 { pthread_mutex_lock(&s_mutex); }
00023
00025 DQMScope::~DQMScope(void)
00026 { pthread_mutex_unlock(&s_mutex); }
00027
00029 static void
00030 restrictDQMAccess(void)
00031 { pthread_mutex_lock(&s_mutex); }
00032
00033 static void
00034 restrictDQMAccessM(const edm::ModuleDescription &)
00035 { restrictDQMAccess(); }
00036
00038 static void
00039 releaseDQMAccess(void)
00040 { pthread_mutex_unlock(&s_mutex); }
00041
00042 static void
00043 releaseDQMAccessM(const edm::ModuleDescription &)
00044 { releaseDQMAccess(); }
00045
00046
00047 DQMService::DQMService(const edm::ParameterSet &pset, edm::ActivityRegistry &ar)
00048 : store_(&*edm::Service<DQMStore>()),
00049 net_(0),
00050 filter_(0),
00051 lastFlush_(0),
00052 publishFrequency_(5.0)
00053 {
00054 ar.watchPreSourceConstruction(&restrictDQMAccessM);
00055 ar.watchPostSourceConstruction(&releaseDQMAccessM);
00056 ar.watchPreSource(&restrictDQMAccess);
00057 ar.watchPostSource(&releaseDQMAccess);
00058 ar.watchPreModule(&restrictDQMAccessM);
00059 ar.watchPostModule(&releaseDQMAccessM);
00060 ar.watchPostProcessEvent(this, &DQMService::flush);
00061 ar.watchPostEndJob(this, &DQMService::shutdown);
00062
00063 std::string host = pset.getUntrackedParameter<std::string>("collectorHost", "");
00064 int port = pset.getUntrackedParameter<int>("collectorPort", 9090);
00065 bool verbose = pset.getUntrackedParameter<bool>("verbose", false);
00066 publishFrequency_ = pset.getUntrackedParameter<double>("publishFrequency", publishFrequency_);
00067 std::string filter = pset.getUntrackedParameter<std::string>("filter", "");
00068
00069 if (host != "" && port > 0)
00070 {
00071 net_ = new DQMBasicNet;
00072 net_->debug(verbose);
00073 net_->updateToCollector(host, port);
00074 net_->start();
00075 }
00076
00077 if (! filter.empty())
00078 {
00079 try
00080 {
00081 filter_ = new lat::Regexp(filter);
00082 if (! filter_->valid())
00083 throw cms::Exception("DQMService")
00084 << "Invalid 'filter' parameter value '" << filter << "':"
00085 << " bad regular expression syntax at character "
00086 << filter_->errorOffset() << ": " << filter_->errorMessage();
00087 filter_->study();
00088 }
00089 catch (lat::Error &e)
00090 {
00091 throw cms::Exception("DQMService")
00092 << "Invalid regular expression 'filter' parameter value '"
00093 << filter << "': " << e.explain();
00094 }
00095 }
00096 }
00097
00098 DQMService::~DQMService(void)
00099 {
00100 shutdown();
00101 }
00102
00103
00104
00105
00106 void
00107 DQMService::flush(const edm::Event &, const edm::EventSetup &)
00108 {
00109
00110 uint64_t version = lat::Time::current().ns();
00111 double vtime = version * 1e-9;
00112 if (vtime - lastFlush_ < publishFrequency_)
00113 return;
00114
00115
00116 if (net_)
00117 {
00118 DQMNet::Object o;
00119 std::set<std::string> seen;
00120 std::string fullpath;
00121
00122
00123 net_->lock();
00124 bool updated = false;
00125
00126
00127 DQMStore::MEMap::iterator i, e;
00128 net_->reserveLocalSpace(store_->data_.size());
00129 for (i = store_->data_.begin(), e = store_->data_.end(); i != e; ++i)
00130 {
00131 const MonitorElement &me = *i;
00132 fullpath.clear();
00133 fullpath += *me.data_.dirname;
00134 if (! me.data_.dirname->empty())
00135 fullpath += '/';
00136 fullpath += me.data_.objname;
00137
00138 if (filter_ && filter_->search(fullpath) < 0)
00139 continue;
00140
00141 seen.insert(fullpath);
00142 if (! me.wasUpdated())
00143 continue;
00144
00145 o.lastreq = 0;
00146 o.hash = DQMNet::dqmhash(fullpath.c_str(), fullpath.size());
00147 o.flags = me.data_.flags;
00148 o.tag = me.data_.tag;
00149 o.version = version;
00150 o.dirname = me.data_.dirname;
00151 o.objname = me.data_.objname;
00152 assert(o.rawdata.empty());
00153 assert(o.scalar.empty());
00154 assert(o.qdata.empty());
00155
00156
00157 switch (me.kind())
00158 {
00159 case MonitorElement::DQM_KIND_INT:
00160 case MonitorElement::DQM_KIND_REAL:
00161 case MonitorElement::DQM_KIND_STRING:
00162 me.packScalarData(o.scalar, "");
00163 break;
00164
00165 default:
00166 {
00167 TBufferFile buffer(TBufferFile::kWrite);
00168 buffer.WriteObject(me.object_);
00169 if (me.reference_)
00170 buffer.WriteObject(me.reference_);
00171 else
00172 buffer.WriteObjectAny(0, 0);
00173 o.rawdata.resize(buffer.Length());
00174 memcpy(&o.rawdata[0], buffer.Buffer(), buffer.Length());
00175 DQMNet::packQualityData(o.qdata, me.data_.qreports);
00176 break;
00177 }
00178 }
00179
00180
00181 net_->updateLocalObject(o);
00182 DQMNet::DataBlob().swap(o.rawdata);
00183 std::string().swap(o.scalar);
00184 std::string().swap(o.qdata);
00185 updated = true;
00186 }
00187
00188
00189 if (net_->removeLocalExcept(seen))
00190 updated = true;
00191
00192
00193 net_->unlock();
00194
00195
00196 if (updated)
00197 net_->sendLocalChanges();
00198 }
00199
00200 store_->reset();
00201 lastFlush_ = lat::Time::current().ns() * 1e-9;
00202 }
00203
00204
00205 void
00206 DQMService::shutdown(void)
00207 {
00208
00209 if (net_)
00210 net_->shutdown();
00211 }
00212
00213 #endif // !WITHOUT_CMS_FRAMEWORK