CMS 3D CMS Logo

Public Member Functions | Private Member Functions | Private Attributes

DQMService Class Reference

#include <DQMService.h>

List of all members.

Public Member Functions

 DQMService (const edm::ParameterSet &pset, edm::ActivityRegistry &ar)
 ~DQMService (void)

Private Member Functions

void flush (const edm::Event &, const edm::EventSetup &)
void shutdown (void)

Private Attributes

lat::Regexp * filter_
double lastFlush_
DQMBasicNetnet_
double publishFrequency_
DQMStorestore_

Detailed Description

A bridge to udpate the DQM network layer at the end of every event.

Definition at line 13 of file DQMService.h.


Constructor & Destructor Documentation

DQMService::DQMService ( const edm::ParameterSet pset,
edm::ActivityRegistry ar 
)

Definition at line 47 of file DQMService.cc.

References ExpressReco_HICollisions_FallBack::e, Exception, align_tpl::filter, filter_, flush(), edm::ParameterSet::getUntrackedParameter(), query::host, query::port, publishFrequency_, releaseDQMAccess(), releaseDQMAccessM(), restrictDQMAccess(), restrictDQMAccessM(), shutdown(), edm::ActivityRegistry::watchPostEndJob(), edm::ActivityRegistry::watchPostModule(), edm::ActivityRegistry::watchPostProcessEvent(), edm::ActivityRegistry::watchPostSource(), edm::ActivityRegistry::watchPostSourceConstruction(), edm::ActivityRegistry::watchPreModule(), edm::ActivityRegistry::watchPreSource(), and edm::ActivityRegistry::watchPreSourceConstruction().

  : store_(&*edm::Service<DQMStore>()),
    net_(0),
    filter_(0),
    lastFlush_(0),
    publishFrequency_(5.0)
{
  ar.watchPreSourceConstruction(&restrictDQMAccessM);
  ar.watchPostSourceConstruction(&releaseDQMAccessM);
  ar.watchPreSource(&restrictDQMAccess);
  ar.watchPostSource(&releaseDQMAccess);
  ar.watchPreModule(&restrictDQMAccessM);
  ar.watchPostModule(&releaseDQMAccessM);
  ar.watchPostProcessEvent(this, &DQMService::flush);
  ar.watchPostEndJob(this, &DQMService::shutdown);

  std::string host = pset.getUntrackedParameter<std::string>("collectorHost", ""); 
  int port = pset.getUntrackedParameter<int>("collectorPort", 9090);
  bool verbose = pset.getUntrackedParameter<bool>("verbose", false);
  publishFrequency_ = pset.getUntrackedParameter<double>("publishFrequency", publishFrequency_);
  std::string filter = pset.getUntrackedParameter<std::string>("filter", "");

  if (host != "" && port > 0)
  {
    net_ = new DQMBasicNet;
    net_->debug(verbose);
    net_->updateToCollector(host, port);
    net_->start();
  }

  if (! filter.empty())
  {
    try
    {
      filter_ = new lat::Regexp(filter);
      if (! filter_->valid())
        throw cms::Exception("DQMService")
          << "Invalid 'filter' parameter value '" << filter << "':"
          << " bad regular expression syntax at character "
          << filter_->errorOffset() << ": " << filter_->errorMessage();
      filter_->study();
    }
    catch (lat::Error &e)
    {
      throw cms::Exception("DQMService")
        << "Invalid regular expression 'filter' parameter value '"
        << filter << "': " << e.explain();
    }
  }
}
DQMService::~DQMService ( void  )

Definition at line 98 of file DQMService.cc.

References shutdown().

{
  shutdown();
}

Member Function Documentation

void DQMService::flush ( const edm::Event ,
const edm::EventSetup  
) [private]

Definition at line 107 of file DQMService.cc.

References cond::rpcobimon::current, DQMStore::data_, MonitorElement::data_, DQMNet::CoreObject::dirname, MonitorElement::DQM_KIND_INT, MonitorElement::DQM_KIND_REAL, MonitorElement::DQM_KIND_STRING, DQMNet::dqmhash(), ExpressReco_HICollisions_FallBack::e, filter_, DQMNet::CoreObject::flags, DQMNet::Object::hash, i, MonitorElement::kind(), lastFlush_, DQMNet::Object::lastreq, connectstrParser::o, MonitorElement::object_, DQMNet::CoreObject::objname, DQMNet::packQualityData(), MonitorElement::packScalarData(), publishFrequency_, DQMNet::Object::qdata, DQMNet::CoreObject::qreports, DQMNet::Object::rawdata, MonitorElement::reference_, DQMStore::reset(), DQMNet::Object::scalar, store_, DQMNet::CoreObject::tag, DQMNet::CoreObject::version, AlCaRecoCosmics_cfg::version, and MonitorElement::wasUpdated().

Referenced by DQMService().

{
  // Avoid sending updates excessively often.
  uint64_t version = lat::Time::current().ns();
  double vtime = version * 1e-9;
  if (vtime - lastFlush_ < publishFrequency_)
    return;

  // OK, send an update.
  if (net_)
  {
    DQMNet::Object o;
    std::set<std::string> seen;
    std::string fullpath;

    // Lock the network layer so we can modify the data.
    net_->lock();
    bool updated = false;

    // Find updated contents and update the network cache.
    DQMStore::MEMap::iterator i, e;
    net_->reserveLocalSpace(store_->data_.size());
    for (i = store_->data_.begin(), e = store_->data_.end(); i != e; ++i)
    {
      const MonitorElement &me = *i;
      fullpath.clear();
      fullpath += *me.data_.dirname;
      if (! me.data_.dirname->empty())
        fullpath += '/';
      fullpath += me.data_.objname;

      if (filter_ && filter_->search(fullpath) < 0)
        continue;

      seen.insert(fullpath);
      if (! me.wasUpdated())
        continue;

      o.lastreq = 0;
      o.hash = DQMNet::dqmhash(fullpath.c_str(), fullpath.size());
      o.flags = me.data_.flags;
      o.tag = me.data_.tag;
      o.version = version;
      o.dirname = me.data_.dirname;
      o.objname = me.data_.objname;
      assert(o.rawdata.empty());
      assert(o.scalar.empty());
      assert(o.qdata.empty());

      // Pack object and reference, scalar and quality data.
      switch (me.kind())
      {
      case MonitorElement::DQM_KIND_INT:
      case MonitorElement::DQM_KIND_REAL:
      case MonitorElement::DQM_KIND_STRING:
        me.packScalarData(o.scalar, "");
        break;

      default:
        {
          TBufferFile buffer(TBufferFile::kWrite);
          buffer.WriteObject(me.object_);
          if (me.reference_)
            buffer.WriteObject(me.reference_);
          else
            buffer.WriteObjectAny(0, 0);
          o.rawdata.resize(buffer.Length());
          memcpy(&o.rawdata[0], buffer.Buffer(), buffer.Length());
          DQMNet::packQualityData(o.qdata, me.data_.qreports);
          break;
        }
      }

      // Update.
      net_->updateLocalObject(o);
      DQMNet::DataBlob().swap(o.rawdata);
      std::string().swap(o.scalar);
      std::string().swap(o.qdata);
      updated = true;
    }

    // Find removed contents and clear the network cache.
    if (net_->removeLocalExcept(seen))
      updated = true;

    // Unlock the network layer.
    net_->unlock();

    // Tell network to flush if we updated something.
    if (updated)
      net_->sendLocalChanges();
  }

  store_->reset();
  lastFlush_ = lat::Time::current().ns() * 1e-9;
}
void DQMService::shutdown ( void  ) [private]

Definition at line 206 of file DQMService.cc.

Referenced by DQMService(), and ~DQMService().

{
  // If we have a network, let it go.
  if (net_)
    net_->shutdown();
}

Member Data Documentation

lat::Regexp* DQMService::filter_ [private]

Definition at line 25 of file DQMService.h.

Referenced by DQMService(), and flush().

double DQMService::lastFlush_ [private]

Definition at line 26 of file DQMService.h.

Referenced by flush().

Definition at line 24 of file DQMService.h.

Definition at line 27 of file DQMService.h.

Referenced by DQMService(), and flush().

Definition at line 23 of file DQMService.h.

Referenced by flush().