CMS 3D CMS Logo

Public Member Functions | Private Member Functions | Private Attributes

DQMService Class Reference

#include <DQMService.h>

List of all members.

Public Member Functions

 DQMService (const edm::ParameterSet &pset, edm::ActivityRegistry &ar)
void flush (const edm::Event &, const edm::EventSetup &)
void flushStandalone ()
 ~DQMService (void)

Private Member Functions

void shutdown (void)

Private Attributes

lat::Regexp * filter_
double lastFlush_
DQMBasicNetnet_
double publishFrequency_
DQMStorestore_

Detailed Description

A bridge to udpate the DQM network layer at the end of every event.

Definition at line 13 of file DQMService.h.


Constructor & Destructor Documentation

DQMService::DQMService ( const edm::ParameterSet pset,
edm::ActivityRegistry ar 
)

Definition at line 47 of file DQMService.cc.

References alignCSCRings::e, Exception, align_tpl::filter, filter_, flush(), edm::ParameterSet::getUntrackedParameter(), query::host, query::port, publishFrequency_, releaseDQMAccess(), releaseDQMAccessM(), restrictDQMAccess(), restrictDQMAccessM(), shutdown(), edm::ActivityRegistry::watchPostEndJob(), edm::ActivityRegistry::watchPostModule(), edm::ActivityRegistry::watchPostProcessEvent(), edm::ActivityRegistry::watchPostSource(), edm::ActivityRegistry::watchPostSourceConstruction(), edm::ActivityRegistry::watchPreModule(), edm::ActivityRegistry::watchPreSource(), and edm::ActivityRegistry::watchPreSourceConstruction().

  : store_(&*edm::Service<DQMStore>()),
    net_(0),
    filter_(0),
    lastFlush_(0),
    publishFrequency_(5.0)
{
  ar.watchPreSourceConstruction(&restrictDQMAccessM);
  ar.watchPostSourceConstruction(&releaseDQMAccessM);
  ar.watchPreSource(&restrictDQMAccess);
  ar.watchPostSource(&releaseDQMAccess);
  ar.watchPreModule(&restrictDQMAccessM);
  ar.watchPostModule(&releaseDQMAccessM);
  ar.watchPostProcessEvent(this, &DQMService::flush);
  ar.watchPostEndJob(this, &DQMService::shutdown);

  std::string host = pset.getUntrackedParameter<std::string>("collectorHost", ""); 
  int port = pset.getUntrackedParameter<int>("collectorPort", 9090);
  bool verbose = pset.getUntrackedParameter<bool>("verbose", false);
  publishFrequency_ = pset.getUntrackedParameter<double>("publishFrequency", publishFrequency_);
  std::string filter = pset.getUntrackedParameter<std::string>("filter", "");

  if (host != "" && port > 0)
  {
    net_ = new DQMBasicNet;
    net_->debug(verbose);
    net_->updateToCollector(host, port);
    net_->start();
  }

  if (! filter.empty())
  {
    try
    {
      filter_ = new lat::Regexp(filter);
      if (! filter_->valid())
        throw cms::Exception("DQMService")
          << "Invalid 'filter' parameter value '" << filter << "':"
          << " bad regular expression syntax at character "
          << filter_->errorOffset() << ": " << filter_->errorMessage();
      filter_->study();
    }
    catch (lat::Error &e)
    {
      throw cms::Exception("DQMService")
        << "Invalid regular expression 'filter' parameter value '"
        << filter << "': " << e.explain();
    }
  }
}
DQMService::~DQMService ( void  )

Definition at line 98 of file DQMService.cc.

References shutdown().

{
  shutdown();
}

Member Function Documentation

void DQMService::flush ( const edm::Event ,
const edm::EventSetup  
)

Definition at line 205 of file DQMService.cc.

References flushStandalone().

Referenced by DQMService().

{
  // Call a function independent to the framework
  flushStandalone();
}
void DQMService::flushStandalone ( )

Definition at line 106 of file DQMService.cc.

References cond::rpcobimon::current, DQMStore::data_, MonitorElement::data_, DQMNet::CoreObject::dirname, MonitorElement::DQM_KIND_INT, MonitorElement::DQM_KIND_REAL, MonitorElement::DQM_KIND_STRING, DQMNet::dqmhash(), alignCSCRings::e, filter_, DQMNet::CoreObject::flags, DQMNet::Object::hash, i, MonitorElement::kind(), lastFlush_, DQMNet::Object::lastreq, python::connectstrParser::o, MonitorElement::object_, DQMNet::CoreObject::objname, DQMNet::packQualityData(), MonitorElement::packScalarData(), publishFrequency_, DQMNet::Object::qdata, DQMNet::CoreObject::qreports, DQMNet::Object::rawdata, MonitorElement::reference_, DQMStore::reset(), DQMNet::Object::scalar, store_, DQMNet::CoreObject::tag, DQMNet::CoreObject::version, BeamSplash_cfg::version, and MonitorElement::wasUpdated().

Referenced by evf::iDie::doFlush(), and flush().

{
  // Avoid sending updates excessively often.
  uint64_t version = lat::Time::current().ns();
  double vtime = version * 1e-9;
  if (vtime - lastFlush_ < publishFrequency_)
    return;

  // OK, send an update.
  if (net_)
  {
    DQMNet::Object o;
    std::set<std::string> seen;
    std::string fullpath;

    // Lock the network layer so we can modify the data.
    net_->lock();
    bool updated = false;

    // Find updated contents and update the network cache.
    DQMStore::MEMap::iterator i, e;
    net_->reserveLocalSpace(store_->data_.size());
    for (i = store_->data_.begin(), e = store_->data_.end(); i != e; ++i)
    {
      const MonitorElement &me = *i;
      fullpath.clear();
      fullpath += *me.data_.dirname;
      if (! me.data_.dirname->empty())
        fullpath += '/';
      fullpath += me.data_.objname;

      if (filter_ && filter_->search(fullpath) < 0)
        continue;

      seen.insert(fullpath);
      if (! me.wasUpdated())
        continue;

      o.lastreq = 0;
      o.hash = DQMNet::dqmhash(fullpath.c_str(), fullpath.size());
      o.flags = me.data_.flags;
      o.tag = me.data_.tag;
      o.version = version;
      o.dirname = me.data_.dirname;
      o.objname = me.data_.objname;
      assert(o.rawdata.empty());
      assert(o.scalar.empty());
      assert(o.qdata.empty());

      // Pack object and reference, scalar and quality data.
      switch (me.kind())
      {
      case MonitorElement::DQM_KIND_INT:
      case MonitorElement::DQM_KIND_REAL:
      case MonitorElement::DQM_KIND_STRING:
        me.packScalarData(o.scalar, "");
        break;

      default:
        {
          TBufferFile buffer(TBufferFile::kWrite);
          buffer.WriteObject(me.object_);
          if (me.reference_)
            buffer.WriteObject(me.reference_);
          else
            buffer.WriteObjectAny(0, 0);
          o.rawdata.resize(buffer.Length());
          memcpy(&o.rawdata[0], buffer.Buffer(), buffer.Length());
          DQMNet::packQualityData(o.qdata, me.data_.qreports);
          break;
        }
      }

      // Update.
      net_->updateLocalObject(o);
      DQMNet::DataBlob().swap(o.rawdata);
      std::string().swap(o.scalar);
      std::string().swap(o.qdata);
      updated = true;
    }

    // Find removed contents and clear the network cache.
    if (net_->removeLocalExcept(seen))
      updated = true;

    // Unlock the network layer.
    net_->unlock();

    // Tell network to flush if we updated something.
    if (updated)
      net_->sendLocalChanges();
  }

  store_->reset();
  lastFlush_ = lat::Time::current().ns() * 1e-9;


}
void DQMService::shutdown ( void  ) [private]

Definition at line 213 of file DQMService.cc.

Referenced by DQMService(), and ~DQMService().

{
  // If we have a network, let it go.
  if (net_)
    net_->shutdown();
}

Member Data Documentation

lat::Regexp* DQMService::filter_ [private]

Definition at line 26 of file DQMService.h.

Referenced by DQMService(), and flushStandalone().

double DQMService::lastFlush_ [private]

Definition at line 27 of file DQMService.h.

Referenced by flushStandalone().

Definition at line 25 of file DQMService.h.

Definition at line 28 of file DQMService.h.

Referenced by DQMService(), and flushStandalone().

Definition at line 24 of file DQMService.h.

Referenced by flushStandalone().