CMS 3D CMS Logo

Public Member Functions | Protected Member Functions | Protected Attributes | Private Member Functions | Private Attributes | Static Private Attributes

FUShmDQMOutputService Class Reference

#include <FUShmDQMOutputService.h>

Inheritance diagram for FUShmDQMOutputService:
evf::ServiceWeb

List of all members.

Public Member Functions

bool attachToShm ()
void defaultWebPage (xgi::Input *in, xgi::Output *out)
bool detachFromShm ()
 FUShmDQMOutputService (const edm::ParameterSet &pset, edm::ActivityRegistry &actReg)
void postEndJobProcessing ()
void postEndLumi (edm::LuminosityBlock const &, edm::EventSetup const &)
void postEventProcessing (const edm::Event &event, const edm::EventSetup &eventSetup)
void postSourceConstructionProcessing (const edm::ModuleDescription &modDesc)
void preBeginRun (const edm::RunID &runID, const edm::Timestamp &timestamp)
void publish (xdata::InfoSpace *)
void reset ()
void setAttachToShm ()
 ~FUShmDQMOutputService (void)

Protected Member Functions

void findMonitorElements (DQMEvent::TObjectTable &toTable, std::string folderPath)

Protected Attributes

DQMStorebei

Private Member Functions

void writeShmDQMData (DQMEventMsgBuilder const &dqmMsgBuilder)

Private Attributes

bool attach_
int compressionLevel_
edm::StreamDQMDeserializer deserializeWorker_
const std::string dqm
unsigned int firstLumiSectionSeen_
char host_name_ [255]
bool initializationIsNeeded_
const std::string input
int lumiSectionInterval_
unsigned int lumiSectionOfPreviousUpdate_
double lumiSectionsPerUpdate_
std::vector< char > messageBuffer_
xdata::UnsignedInteger32 nbUpdates_
edm::StreamDQMSerializer serializeWorker_
evf::FUShmBuffershmBuffer_
double timeInSecSinceUTC_
uint32 updateNumber_
bool useCompression_

Static Private Attributes

static uint32 fuGuidValue_ = 0
static bool fuIdsInitialized_ = false

Detailed Description

This class is responsible for collecting data quality monitoring (DQM) information, packaging it in DQMEvent objects, and writing out the data to shared memory for the Resource Broker to send to the Storage Manager

27-Dec-2006 - KAB - Initial Implementation 31-Mar-2007 - HWKC - modification for shared memory usage

Id:
FUShmDQMOutputService.h,v 1.13 2012/05/02 15:02:19 smorovic Exp

Definition at line 32 of file FUShmDQMOutputService.h.


Constructor & Destructor Documentation

FUShmDQMOutputService::FUShmDQMOutputService ( const edm::ParameterSet pset,
edm::ActivityRegistry actReg 
)

FUShmDQMOutputService constructor.

Definition at line 56 of file FUShmDQMOutputService.cc.

References compressionLevel_, fuGuidValue_, fuIdsInitialized_, edm::ParameterSet::getParameter(), edm::ParameterSet::getUntrackedParameter(), host_name_, initializationIsNeeded_, dttmaxenums::L, lumiSectionInterval_, lumiSectionsPerUpdate_, messageBuffer_, postEndJobProcessing(), postEndLumi(), postSourceConstructionProcessing(), preBeginRun(), AlCaHLTBitMon_QueryRunRegistry::string, timeInSecSinceUTC_, edm::Guid::toString(), useCompression_, edm::ActivityRegistry::watchPostEndJob(), edm::ActivityRegistry::watchPostEndLumi(), edm::ActivityRegistry::watchPostSourceConstruction(), and edm::ActivityRegistry::watchPreBeginRun().

  : evf::ServiceWeb("FUShmDQMOutputService")
  , updateNumber_(0)
  , shmBuffer_(0)
  , nbUpdates_(0)
  , input("INPUT")
  , dqm("DQM")
  , attach_(false)
{

  // specify the routine to be called after event processing.  This routine
  // will be used to periodically fetch monitor elements from the DQM
  // backend and write out to shared memory for sending to the storage manager.
  actReg.watchPostEndLumi(this, &FUShmDQMOutputService::postEndLumi);

  // specify the routine to be called after the input source has been
  // constructed.  This routine will be used to initialize our connection
  // to the storage manager and any other needed setup.??
  actReg.watchPostSourceConstruction(this,
         &FUShmDQMOutputService::postSourceConstructionProcessing);

  // specify the routine to be called when a run begins
  actReg.watchPreBeginRun(this, &FUShmDQMOutputService::preBeginRun);

  // specify the routine to be called when the job has finished.  It will
  // be used to disconnect from the SM, if needed, and any other shutdown
  // tasks that are needed.??
  actReg.watchPostEndJob(this, &FUShmDQMOutputService::postEndJobProcessing);

  // set internal values from the parameter set
  int initialSize =
    pset.getUntrackedParameter<int>("initialMessageBufferSize", 1000000);
  messageBuffer_.resize(initialSize);
  lumiSectionsPerUpdate_ = pset.getParameter<double>("lumiSectionsPerUpdate");
  // for the moment, only support a number of lumi sections per update >= 1
  if (lumiSectionsPerUpdate_ <= 1.0) {lumiSectionsPerUpdate_ = 1.0;}
  initializationIsNeeded_ = true;
  useCompression_ = pset.getParameter<bool>("useCompression");
  compressionLevel_ = pset.getParameter<int>("compressionLevel");
  // the default for lumiSectionInterval_ is 0, meaning get it from the event
  // otherwise we get a fake one that should match the fake lumi block
  // for events (if any) as long as the time between lumi blocks is larger
  // than the time difference between creating this service and the 
  // FUShmOutputModule event output module
  lumiSectionInterval_ =
    pset.getUntrackedParameter<int>("lumiSectionInterval", 0); // seconds
  if (lumiSectionInterval_ < 1) {lumiSectionInterval_ = 0;}

  // for fake test luminosity sections
  struct timeval now;
  struct timezone dummyTZ;
  gettimeofday(&now, &dummyTZ);
  // we will count lumi section numbers from this time
  timeInSecSinceUTC_ = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0);

  int got_host = gethostname(host_name_, sizeof(host_name_));
  if(got_host != 0) strcpy(host_name_, "noHostNameFoundOrTooLong");

  if (! fuIdsInitialized_) {
    fuIdsInitialized_ = true;

    edm::Guid guidObj(true);
    std::string guidString = guidObj.toString();
    //std::cout << "DQMOutput GUID string = " << guidString << std::endl;

    uLong crc = crc32(0L, Z_NULL, 0);
    Bytef* buf = (Bytef*)guidString.data();
    crc = crc32(crc, buf, guidString.length());
    fuGuidValue_ = crc;

    //std::cout << "DQMOutput GUID value = 0x" << std::hex << fuGuidValue_ << std::endl;
  }
}
FUShmDQMOutputService::~FUShmDQMOutputService ( void  )

FUShmDQMOutputService destructor.

Definition at line 134 of file FUShmDQMOutputService.cc.

References shmBuffer_.

{
  shmdt(shmBuffer_);
}

Member Function Documentation

bool FUShmDQMOutputService::attachToShm ( )

Definition at line 421 of file FUShmDQMOutputService.cc.

References evf::FUShmBuffer::getShmBuffer(), and shmBuffer_.

Referenced by postEndLumi().

{
  if(0==shmBuffer_) {
    shmBuffer_ = evf::FUShmBuffer::getShmBuffer();
    if (0==shmBuffer_) {
      edm::LogError("FUDQMShmOutputService")<<"Failed to attach to shared memory";
      return false;
    }
    return true;    
  }
  return false;

}
void FUShmDQMOutputService::defaultWebPage ( xgi::Input *  in,
xgi::Output *  out 
) [virtual]

Reimplemented from evf::ServiceWeb.

Definition at line 139 of file FUShmDQMOutputService.cc.

{
}
bool FUShmDQMOutputService::detachFromShm ( )

Definition at line 437 of file FUShmDQMOutputService.cc.

References shmBuffer_.

{
  if(0!=shmBuffer_) {
    shmdt(shmBuffer_);
    shmBuffer_ = 0;
  }
  return true;
}
void FUShmDQMOutputService::findMonitorElements ( DQMEvent::TObjectTable toTable,
std::string  folderPath 
) [protected]

Finds all of the monitor elements under the specified folder, including those in subdirectories.

Definition at line 348 of file FUShmDQMOutputService.cc.

References bei, DQMStore::dirExists(), DQMStore::getContents(), MonitorElement::getRootObject(), DQMStore::getSubdirs(), customizeTrackingMonitorSeedNumber::idx, NULL, DQMStore::setCurrentFolder(), and AlCaHLTBitMon_QueryRunRegistry::string.

Referenced by postEndLumi().

{
  if (bei == NULL) {return;}

  // fetch the monitor elements in the specified directory
  std::vector<MonitorElement *> localMEList = bei->getContents(folderPath);
  //MonitorElementRootFolder* folderPtr = bei->getDirectory(folderPath);

  // add the MEs that should be updated to the table
  std::vector<TObject *> updateTOList;
  for (int idx = 0; idx < (int) localMEList.size(); idx++) {
    MonitorElement *mePtr = localMEList[idx];
    //    if (mePtr->wasUpdated()) { // @@EM send updated and not (to be revised)
    updateTOList.push_back(mePtr->getRootObject());
      //    }
  }
  if (updateTOList.size() > 0) {
    toTable[folderPath] = updateTOList;
  }

  // find the subdirectories in this folder
  // (checking if the directory exists is probably overkill,
  // but we really don't want to create new folders using
  // setCurrentFolder())
  if (bei->dirExists(folderPath)) {
    bei->setCurrentFolder(folderPath);
    std::vector<std::string> subDirList = bei->getSubdirs();

    // loop over the subdirectories, find the MEs in each one
    std::vector<std::string>::const_iterator dirIter;
    for (dirIter = subDirList.begin(); dirIter != subDirList.end(); dirIter++) {
      std::string subDirPath = (*dirIter);
      findMonitorElements(toTable, subDirPath);
    }
  }
}
void FUShmDQMOutputService::postEndJobProcessing ( )

Callback to be used after the end job operation has finished. It takes care of any necessary cleanup.

Definition at line 338 of file FUShmDQMOutputService.cc.

References initializationIsNeeded_.

Referenced by FUShmDQMOutputService().

{
  // since the service is not destroyed we need to take care of endjob items here
  initializationIsNeeded_ = true;
}
void FUShmDQMOutputService::postEndLumi ( edm::LuminosityBlock const &  lb,
edm::EventSetup const &  es 
)

Definition at line 154 of file FUShmDQMOutputService.cc.

References edm::StreamDQMSerializer::adler32_chksum(), attach_, attachToShm(), bei, edm::StreamDQMSerializer::bufferPointer(), DQMStore::cd(), compressionLevel_, filterCSVwithJSON::copy, edm::StreamDQMSerializer::currentEventSize(), edm::StreamDQMSerializer::currentSpaceUsed(), TrackerOfflineValidation_Dqm_cff::dirName, dqm, edm::LuminosityBlockBase::endTime(), Exception, findMonitorElements(), firstLumiSectionSeen_, fuGuidValue_, edm::getReleaseVersion(), DQMStore::getSubdirs(), host_name_, initializationIsNeeded_, input, edm::LuminosityBlockBase::luminosityBlock(), lumiSectionInterval_, lumiSectionOfPreviousUpdate_, messageBuffer_, nbUpdates_, NULL, cppFunctionSkipper::operator, edm::LuminosityBlockBase::run(), edm::StreamDQMSerializer::serializeDQMEvent(), serializeWorker_, evf::MicroStateService::setMicroState(), alcazmumu_cfi::src, AlCaHLTBitMon_QueryRunRegistry::string, timeInSecSinceUTC_, updateNumber_, useCompression_, and writeShmDQMData().

Referenced by FUShmDQMOutputService().

{
  if (attach_) attachToShm();
  attach_=false;
  
  evf::MicroStateService *mss = 0;
  try{
    mss = edm::Service<evf::MicroStateService>().operator->();
    if(mss) mss->setMicroState(&dqm);
  }
  catch(...) { 
    edm::LogError("FUShmDQMOutputService")<< "exception when trying to get service MicroStateService";
  }
  

  // fake the luminosity section if we don't want to use the real one
  unsigned int thisLumiSection = 0;
  if(lumiSectionInterval_ == 0)
    thisLumiSection = lb.luminosityBlock();
  else {
    // match the code in Event output module to get the same (almost) lumi number
    struct timeval now;
    struct timezone dummyTZ;
    gettimeofday(&now, &dummyTZ);
    double timeInSec = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0) - timeInSecSinceUTC_;
    // what about overflows?
    if(lumiSectionInterval_ > 0) thisLumiSection = static_cast<uint32>(timeInSec/lumiSectionInterval_);
  }

   // special handling for the first event
  if (initializationIsNeeded_) {
    initializationIsNeeded_ = false;
    lumiSectionOfPreviousUpdate_ = thisLumiSection;
    firstLumiSectionSeen_ = thisLumiSection;

    // for when a run(job) had ended and we start a new run(job)
    // for fake test luminosity sections
    struct timeval now;
    struct timezone dummyTZ;
    gettimeofday(&now, &dummyTZ);
    // we will count lumi section numbers from this time
    timeInSecSinceUTC_ = static_cast<double>(now.tv_sec) + (static_cast<double>(now.tv_usec)/1000000.0);
  }

  //  std::cout << getpid() << ": :" //<< gettid() 
  //        << ":DQMOutputService check if have to send update for lumiSection " << thisLumiSection << std::endl;
  if(thisLumiSection%4!=0) 
    {
//       std::cout << getpid() << ": :" //<< gettid() 
//              << ":DQMOutputService skipping update for lumiSection " << thisLumiSection << std::endl;
      if(mss) mss->setMicroState(&input);
      return;
    }
//   std::cout << getpid() << ": :" //<< gettid() 
//          << ":DQMOutputService sending update for lumiSection " << thisLumiSection << std::endl;
  // Calculate the update ID and lumi ID for this update
  // fullUpdateRatio and fullLsDelta are unused. comment out the calculation.
  //int fullLsDelta = (int) (thisLumiSection - firstLumiSectionSeen_);
  //double fullUpdateRatio = ((double) fullLsDelta) / lumiSectionsPerUpdate_;
  // this is the update number starting from zero

  // this is the actual luminosity section number for the beginning lumi section of this update
  unsigned int lumiSectionTag = thisLumiSection;

  // retry the lookup of the backend interface, if needed
  if (bei == NULL) {
    bei = edm::Service<DQMStore>().operator->();
  }

  // to go any further, a backend interface pointer is crucial
  if (bei == NULL) {
    throw cms::Exception("postEventProcessing", "FUShmDQMOutputService")
      << "Unable to lookup the DQMStore service!\n";
  }

  // determine the top level folders (these will be used for grouping
  // monitor elements into DQM Events)
  std::vector<std::string> topLevelFolderList;
  //std::cout << "### SenderService, pwd = " << bei->pwd() << std::endl;
  bei->cd();
  //std::cout << "### SenderService, pwd = " << bei->pwd() << std::endl;
  topLevelFolderList = bei->getSubdirs();

  // find the monitor elements under each top level folder (including
  // subdirectories)
  std::map< std::string, DQMEvent::TObjectTable > toMap;
  std::vector<std::string>::const_iterator dirIter;
  for (dirIter = topLevelFolderList.begin();
       dirIter != topLevelFolderList.end();
       dirIter++) {
    std::string dirName = *dirIter;
    DQMEvent::TObjectTable toTable;

    // find the MEs
    findMonitorElements(toTable, dirName);

    // store the list in the map
    toMap[dirName] = toTable;
  }

  // create a DQMEvent message for each top-level folder
  // and write each to the shared memory
  for (dirIter = topLevelFolderList.begin();
       dirIter != topLevelFolderList.end();
       dirIter++) {
    std::string dirName = *dirIter;
    DQMEvent::TObjectTable toTable = toMap[dirName];
    if (toTable.size() == 0) {continue;}

    // serialize the monitor element data
    serializeWorker_.serializeDQMEvent(toTable, useCompression_,
                                       compressionLevel_);

    // resize the message buffer, if needed 
    unsigned int srcSize = serializeWorker_.currentSpaceUsed();
    unsigned int newSize = srcSize + 50000;  // allow for header
    if (messageBuffer_.size() < newSize) messageBuffer_.resize(newSize);

    // create the message
    DQMEventMsgBuilder dqmMsgBuilder(&messageBuffer_[0], messageBuffer_.size(),
                                     lb.run(), lb.luminosityBlock(),
                                     lb.endTime(),
                                     lumiSectionTag, updateNumber_,
                                     (uint32)serializeWorker_.adler32_chksum(),
                                     host_name_,
                                     edm::getReleaseVersion(), dirName,
                                     toTable);

    // copy the serialized data into the message
    unsigned char* src = serializeWorker_.bufferPointer();
    std::copy(src,src + srcSize, dqmMsgBuilder.eventAddress());
    dqmMsgBuilder.setEventLength(srcSize);
    if (useCompression_) {
      dqmMsgBuilder.setCompressionFlag(serializeWorker_.currentEventSize());
    }

    // write the filter unit UUID and PID into the message
    dqmMsgBuilder.setFUProcessId(getpid());
    dqmMsgBuilder.setFUGuid(fuGuidValue_);

    // send the message
    writeShmDQMData(dqmMsgBuilder);
//     std::cout << getpid() << ": :" // << gettid() 
//            << ":DQMOutputService DONE sending update for lumiSection " << thisLumiSection << std::endl;
    if(mss) mss->setMicroState(&input);

  }
  
  // reset monitor elements that have requested it
  // TODO - enable this
  //bei->doneSending(true, true);
  
  // update the "previous" lumi section
  lumiSectionOfPreviousUpdate_ = thisLumiSection;
  nbUpdates_++;
  updateNumber_++;
}
void FUShmDQMOutputService::postEventProcessing ( const edm::Event event,
const edm::EventSetup eventSetup 
)
void FUShmDQMOutputService::postSourceConstructionProcessing ( const edm::ModuleDescription moduleDesc)

Callback to be used after the input source has been constructed. It takes care of any intialization that is needed by this service.

Definition at line 316 of file FUShmDQMOutputService.cc.

References bei, and cppFunctionSkipper::operator.

Referenced by FUShmDQMOutputService().

void FUShmDQMOutputService::preBeginRun ( const edm::RunID runID,
const edm::Timestamp timestamp 
)

Callback to be used after the Run has been created by the InputSource but before any modules have seen the Run

Definition at line 326 of file FUShmDQMOutputService.cc.

References initializationIsNeeded_, nbUpdates_, and updateNumber_.

Referenced by FUShmDQMOutputService().

void FUShmDQMOutputService::publish ( xdata::InfoSpace *  is) [virtual]

Implements evf::ServiceWeb.

Definition at line 143 of file FUShmDQMOutputService.cc.

References alignCSCRings::e, Exception, and nbUpdates_.

{
  try{
    is->fireItemAvailable("nbDqmUpdates",&nbUpdates_);
  }
  catch(xdata::exception::Exception &e){
    edm::LogInfo("FUShmDQMOutputService")
      << " exception when publishing to InfoSpace "; 
  } 
}
void FUShmDQMOutputService::reset ( void  ) [inline]

Definition at line 53 of file FUShmDQMOutputService.h.

References initializationIsNeeded_, and nbUpdates_.

void FUShmDQMOutputService::setAttachToShm ( )

Definition at line 417 of file FUShmDQMOutputService.cc.

References attach_.

                                           {
  attach_=true;
}
void FUShmDQMOutputService::writeShmDQMData ( DQMEventMsgBuilder const &  dqmMsgBuilder) [private]

Writes the specified DQM event message to shared memory.

Definition at line 389 of file FUShmDQMOutputService.cc.

References DQMEventMsgView::eventNumberAtUpdate(), fuGuidValue_, dttmaxenums::L, run_regression::ret, DQMEventMsgView::runNumber(), shmBuffer_, findQualityFiles::size, DQMEventMsgBuilder::size(), DQMEventMsgBuilder::startAddress(), AlCaHLTBitMon_QueryRunRegistry::string, DQMEventMsgView::topFolderName(), and evf::FUShmBuffer::writeDqmEventData().

Referenced by postEndLumi().

{
  // fetch the location and size of the message buffer
  unsigned char* buffer = (unsigned char*) dqmMsgBuilder.startAddress();
  unsigned int size = dqmMsgBuilder.size();

  // fetch the run, event, and folder number for addition to the I2O fragments
  DQMEventMsgView dqmMsgView(buffer);
  unsigned int runid = dqmMsgView.runNumber();
  unsigned int eventid = dqmMsgView.eventNumberAtUpdate();

  // We need to generate an unique 32 bit ID from the top folder name
  std::string topFolder = dqmMsgView.topFolderName();
  uLong crc = crc32(0L, Z_NULL, 0);
  Bytef* buf = (Bytef*)topFolder.data();
  crc = crc32(crc, buf, topFolder.length());

  if(!shmBuffer_) {
    edm::LogError("FUDQMShmOutputService") 
      << " Error writing to shared memory as shm is not available";
  } else {
    bool ret = shmBuffer_->writeDqmEventData(runid, eventid, (unsigned int)crc,
                                             getpid(), fuGuidValue_, buffer, size);
    if(!ret) edm::LogError("FUShmDQMOutputService") << " Error with writing data to ShmBuffer";
  }

}

Member Data Documentation

Definition at line 90 of file FUShmDQMOutputService.h.

Referenced by postEndLumi(), and setAttachToShm().

Definition at line 75 of file FUShmDQMOutputService.h.

Referenced by FUShmDQMOutputService(), and postEndLumi().

Definition at line 77 of file FUShmDQMOutputService.h.

const std::string FUShmDQMOutputService::dqm [private]

Definition at line 85 of file FUShmDQMOutputService.h.

Referenced by postEndLumi().

Definition at line 71 of file FUShmDQMOutputService.h.

Referenced by postEndLumi().

Definition at line 88 of file FUShmDQMOutputService.h.

Referenced by FUShmDQMOutputService(), postEndLumi(), and writeShmDQMData().

bool FUShmDQMOutputService::fuIdsInitialized_ = false [static, private]

Initialize the static variables for the filter unit indentifiers.

Definition at line 87 of file FUShmDQMOutputService.h.

Referenced by FUShmDQMOutputService().

char FUShmDQMOutputService::host_name_[255] [private]

Definition at line 82 of file FUShmDQMOutputService.h.

Referenced by FUShmDQMOutputService(), and postEndLumi().

const std::string FUShmDQMOutputService::input [private]

Definition at line 84 of file FUShmDQMOutputService.h.

Referenced by postEndLumi().

Definition at line 67 of file FUShmDQMOutputService.h.

Referenced by FUShmDQMOutputService(), and postEndLumi().

Definition at line 70 of file FUShmDQMOutputService.h.

Referenced by postEndLumi().

Definition at line 68 of file FUShmDQMOutputService.h.

Referenced by FUShmDQMOutputService().

std::vector<char> FUShmDQMOutputService::messageBuffer_ [private]

Definition at line 66 of file FUShmDQMOutputService.h.

Referenced by FUShmDQMOutputService(), and postEndLumi().

xdata::UnsignedInteger32 FUShmDQMOutputService::nbUpdates_ [private]

Definition at line 81 of file FUShmDQMOutputService.h.

Referenced by postEndLumi(), preBeginRun(), publish(), and reset().

Definition at line 76 of file FUShmDQMOutputService.h.

Referenced by postEndLumi().

Definition at line 72 of file FUShmDQMOutputService.h.

Referenced by FUShmDQMOutputService(), and postEndLumi().

Definition at line 69 of file FUShmDQMOutputService.h.

Referenced by postEndLumi(), and preBeginRun().

Definition at line 74 of file FUShmDQMOutputService.h.

Referenced by FUShmDQMOutputService(), and postEndLumi().