00001 #include "DQMServices/Core/src/DQMService.h"
00002 #include "DQMServices/Core/src/DQMRootBuffer.h"
00003 #include "DQMServices/Core/interface/DQMNet.h"
00004 #include "DQMServices/Core/interface/DQMStore.h"
00005 #include "DQMServices/Core/interface/DQMScope.h"
00006 #include "DQMServices/Core/interface/MonitorElement.h"
00007 #include "FWCore/ServiceRegistry/interface/Service.h"
00008 #include "classlib/utils/Regexp.h"
00009 #include "classlib/utils/Error.h"
00010 #include <pthread.h>
00011 #include <iostream>
00012 #include <string>
00013 #include <memory>
00014
00015
00016 static pthread_mutex_t s_mutex = PTHREAD_MUTEX_INITIALIZER;
00017
00020 DQMScope::DQMScope(void)
00021 { pthread_mutex_lock(&s_mutex); }
00022
00024 DQMScope::~DQMScope(void)
00025 { pthread_mutex_unlock(&s_mutex); }
00026
00028 static void
00029 restrictDQMAccess(void)
00030 { pthread_mutex_lock(&s_mutex); }
00031
00032 static void
00033 restrictDQMAccessM(const edm::ModuleDescription &)
00034 { restrictDQMAccess(); }
00035
00037 static void
00038 releaseDQMAccess(void)
00039 { pthread_mutex_unlock(&s_mutex); }
00040
00041 static void
00042 releaseDQMAccessM(const edm::ModuleDescription &)
00043 { releaseDQMAccess(); }
00044
00045
00046 DQMService::DQMService(const edm::ParameterSet &pset, edm::ActivityRegistry &ar)
00047 : store_(&*edm::Service<DQMStore>()),
00048 net_(0),
00049 filter_(0),
00050 lastFlush_(0),
00051 publishFrequency_(5.0)
00052 {
00053 ar.watchPreSourceConstruction(&restrictDQMAccessM);
00054 ar.watchPostSourceConstruction(&releaseDQMAccessM);
00055 ar.watchPreSource(&restrictDQMAccess);
00056 ar.watchPostSource(&releaseDQMAccess);
00057 ar.watchPreModule(&restrictDQMAccessM);
00058 ar.watchPostModule(&releaseDQMAccessM);
00059 ar.watchPostProcessEvent(this, &DQMService::flush);
00060 ar.watchPostEndJob(this, &DQMService::shutdown);
00061
00062 std::string host = pset.getUntrackedParameter<std::string>("collectorHost", "");
00063 int port = pset.getUntrackedParameter<int>("collectorPort", 9090);
00064 bool verbose = pset.getUntrackedParameter<bool>("verbose", false);
00065 publishFrequency_ = pset.getUntrackedParameter<double>("publishFrequency", publishFrequency_);
00066 std::string filter = pset.getUntrackedParameter<std::string>("filter", "");
00067
00068 if (host != "" && port > 0)
00069 {
00070 net_ = new DQMBasicNet;
00071 net_->debug(verbose);
00072 net_->updateToCollector(host, port);
00073 net_->start();
00074 }
00075
00076 if (! filter.empty())
00077 {
00078 try
00079 {
00080 filter_ = new lat::Regexp(filter);
00081 if (! filter_->valid())
00082 throw cms::Exception("DQMService")
00083 << "Invalid 'filter' parameter value '" << filter << "':"
00084 << " bad regular expression syntax at character "
00085 << filter_->errorOffset() << ": " << filter_->errorMessage();
00086 filter_->study();
00087 }
00088 catch (lat::Error &e)
00089 {
00090 throw cms::Exception("DQMService")
00091 << "Invalid regular expression 'filter' parameter value '"
00092 << filter << "': " << e.explain();
00093 }
00094 }
00095 }
00096
00097 DQMService::~DQMService(void)
00098 {
00099 shutdown();
00100 }
00101
00102
00103
00104
00105 void
00106 DQMService::flush(const edm::Event &, const edm::EventSetup &)
00107 {
00108
00109 uint64_t version = lat::Time::current().ns();
00110 double vtime = version * 1e-9;
00111 if (vtime - lastFlush_ < publishFrequency_)
00112 return;
00113 lastFlush_ = vtime;
00114
00115
00116 if (net_)
00117 {
00118
00119 net_->lock();
00120 bool updated = false;
00121
00122
00123 DQMStore::MEMap::iterator i, e;
00124 for (i = store_->data_.begin(), e = store_->data_.end(); i != e; ++i)
00125 {
00126 MonitorElement &me = i->second;
00127 if (! me.wasUpdated())
00128 continue;
00129
00130 if (filter_ && filter_->search(me.data_.name) < 0)
00131 continue;
00132
00133 assert(me.data_.object);
00134
00135 DQMNet::Object o;
00136 o.version = version;
00137 o.name = me.data_.name;
00138 o.tags = me.data_.tags;
00139 o.object = 0;
00140 o.reference = 0;
00141 o.flags = me.data_.flags;
00142 o.lastreq = 0;
00143
00144 DQMRootBuffer buffer (DQMRootBuffer::kWrite);
00145 buffer.WriteObject(me.data_.object);
00146 if (me.data_.reference)
00147 buffer.WriteObject(me.data_.reference);
00148 else
00149 buffer.WriteObjectAny(0, 0);
00150
00151
00152 DQMNet::QReports::iterator qi, qe;
00153 for (qi = me.data_.qreports.begin(), qe = me.data_.qreports.end(); qi != qe; ++qi)
00154 {
00155 TObjString s (me.qualityTagString(*qi).c_str());
00156 buffer.WriteObject(&s);
00157 }
00158
00159
00160 o.rawdata.resize(buffer.Length());
00161 memcpy(&o.rawdata[0], buffer.Buffer(), buffer.Length());
00162 net_->updateLocalObject(o);
00163 updated = true;
00164 }
00165
00166
00167 std::vector<std::string>::iterator ri, re;
00168 for (ri = store_->removed_.begin(), re = store_->removed_.end(); ri != re; ++ri, updated = true)
00169 net_->removeLocalObject(*ri);
00170
00171
00172 net_->unlock();
00173
00174
00175 if (updated)
00176 net_->sendLocalChanges();
00177 }
00178
00179 store_->reset();
00180 }
00181
00182
00183 void
00184 DQMService::shutdown(void)
00185 {
00186
00187 if (net_)
00188 net_->shutdown();
00189 }