Go to the documentation of this file.
00001 // $Id:,v 1.93 2008/12/19 23:32:36 biery Exp $
00003 #include <iostream>
00004 #include <iomanip>
00005 #include <sstream>
00006 #include <vector>
00007 #include <sys/stat.h>
00009 #include "EventFilter/StorageManager/interface/StorageManager.h"
00010 #include "EventFilter/StorageManager/interface/ConsumerPipe.h"
00011 #include "EventFilter/StorageManager/interface/ProgressMarker.h"
00012 #include "EventFilter/StorageManager/interface/Configurator.h"
00013 #include "EventFilter/StorageManager/interface/Parameter.h"
00014 #include "EventFilter/StorageManager/interface/FUProxy.h"
00016 #include "EventFilter/Utilities/interface/i2oEvfMsgs.h"
00017 #include "EventFilter/Utilities/interface/ModuleWebRegistry.h"
00018 #include "EventFilter/Utilities/interface/ModuleWebRegistry.h"
00019 #include "EventFilter/Utilities/interface/ParameterSetRetriever.h"
00021 #include "FWCore/Utilities/interface/DebugMacros.h"
00022 #include "FWCore/ServiceRegistry/interface/ServiceToken.h"
00023 #include "FWCore/ServiceRegistry/interface/Service.h"
00024 #include "FWCore/RootAutoLibraryLoader/interface/RootAutoLibraryLoader.h"
00025 #include "FWCore/PluginManager/interface/PluginManager.h"
00026 #include "FWCore/PluginManager/interface/standard.h"
00027 #include "FWCore/ParameterSet/interface/PythonProcessDesc.h"
00029 #include "IOPool/Streamer/interface/MsgHeader.h"
00030 #include "IOPool/Streamer/interface/InitMessage.h"
00031 #include "IOPool/Streamer/interface/OtherMessage.h"
00032 #include "IOPool/Streamer/interface/ConsRegMessage.h"
00033 #include "IOPool/Streamer/interface/HLTInfo.h"
00034 #include "IOPool/Streamer/interface/Utilities.h"
00035 #include "IOPool/Streamer/interface/StreamerInputSource.h"
00037 #include "xcept/tools.h"
00039 #include "i2o/Method.h"
00040 #include "i2o/utils/AddressMap.h"
00042 #include "toolbox/mem/Pool.h"
00044 #include "xcept/tools.h"
00046 #include "xgi/Method.h"
00048 #include "xoap/SOAPEnvelope.h"
00049 #include "xoap/SOAPBody.h"
00050 #include "xoap/domutils.h"
00052 #include "xdata/InfoSpaceFactory.h"
00054 #include "boost/lexical_cast.hpp"
00055 #include "boost/algorithm/string/case_conv.hpp"
00056 #include "cgicc/Cgicc.h"
00058 #include <sys/statfs.h>
00059 #include "zlib.h"
00061 namespace stor {
00062   extern bool getSMFC_exceptionStatus();
00063   extern std::string getSMFC_reason4Exception();
00064 }
00066 using namespace edm;
00067 using namespace std;
00068 using namespace stor;
00071 static void deleteSMBuffer(void* Ref)
00072 {
00073   // release the memory pool buffer
00074   // once the fragment collector is done with it
00075   stor::FragEntry* entry = (stor::FragEntry*)Ref;
00076   toolbox::mem::Reference *ref=(toolbox::mem::Reference*)entry->buffer_object_;
00077   ref->release();
00078 }
00080 std::string smutil_itos(int i)  // convert int to string
00081 {
00082   std::stringstream s;
00083   s << i;
00084   return s.str();
00085 }
00088 StorageManager::StorageManager(xdaq::ApplicationStub * s)
00089   throw (xdaq::exception::Exception) :
00090   xdaq::Application(s),
00091   fsm_(this), 
00092   reasonForFailedState_(),
00093   ah_(0), 
00094   exactFileSizeTest_(false),
00095   fileClosingTestInterval_(5),
00096   pushMode_(false), 
00097   reconfigurationRequested_(false),
00098   collateDQM_(false),
00099   archiveDQM_(false),
00100   archiveIntervalDQM_(0),
00101   filePrefixDQM_("/tmp/DQM"),
00102   purgeTimeDQM_(DEFAULT_PURGE_TIME),
00103   readyTimeDQM_(DEFAULT_READY_TIME),
00104   useCompressionDQM_(true),
00105   compressionLevelDQM_(1),
00106   mybuffer_(7000000),
00107   fairShareES_(false),
00108   connectedRBs_(0), 
00109   storedEvents_(0), 
00110   receivedEvents_(0), 
00111   receivedErrorEvents_(0), 
00112   dqmRecords_(0), 
00113   closedFiles_(0), 
00114   openFiles_(0), 
00115   receivedVolume_(0.),
00116   storedVolume_(0.),
00117   progressMarker_(ProgressMarker::instance()->idle()),
00118   lastEventSeen_(0),
00119   lastErrorEventSeen_(0),
00120   sm_cvs_version_("$Id:,v 1.93 2008/12/19 23:32:36 biery Exp $ $Name: V04-02-00-01 $")
00121 {  
00122   LOG4CPLUS_INFO(this->getApplicationLogger(),"Making StorageManager");
00124   ah_   = new edm::AssertHandler();
00125   fsm_.initialize<StorageManager>(this);
00127   // Careful with next line: state machine fsm_ has to be setup first
00128   setupFlashList();
00130   xdata::InfoSpace *ispace = getApplicationInfoSpace();
00132   ispace->fireItemAvailable("STparameterSet",&offConfig_);
00133   ispace->fireItemAvailable("runNumber",     &runNumber_);
00134   ispace->fireItemAvailable("stateName",     fsm_.stateName());
00135   ispace->fireItemAvailable("connectedRBs",  &connectedRBs_);
00136   ispace->fireItemAvailable("storedEvents",  &storedEvents_);
00137   ispace->fireItemAvailable("receivedEvents",&receivedEvents_);
00138   ispace->fireItemAvailable("receivedErrorEvents",&receivedErrorEvents_);
00139   ispace->fireItemAvailable("dqmRecords",    &dqmRecords_);
00140   ispace->fireItemAvailable("closedFiles",   &closedFiles_);
00141   ispace->fireItemAvailable("openFiles",     &openFiles_);
00142   ispace->fireItemAvailable("fileList",      &fileList_);
00143   ispace->fireItemAvailable("eventsInFile",  &eventsInFile_);
00144   ispace->fireItemAvailable("storedEventsInStream",  &storedEventsInStream_);
00145   ispace->fireItemAvailable("receivedEventsForOutMod",  &receivedEventsFromOutMod_);
00146   ispace->fireItemAvailable("fileSize",      &fileSize_);
00147   ispace->fireItemAvailable("namesOfStream",      &namesOfStream_);
00148   ispace->fireItemAvailable("namesOfOutMod",      &namesOfOutMod_);
00150   ispace->fireItemAvailable("rcmsStateListener", fsm_.rcmsStateListener());
00151   ispace->fireItemAvailable("foundRcmsStateListener", fsm_.foundRcmsStateListener());
00152   // 21-Nov-2008, KAB: the findRcmsStateListener call needs to go after the
00153   // calls to add the RCMS vars to the application infospace.
00154   fsm_.findRcmsStateListener();
00156   ispace->addItemRetrieveListener("closedFiles", this);
00157   ispace->addItemChangedListener("STparameterSet", this);
00159   // Bind specific messages to functions
00160   i2o::bind(this,
00161             &StorageManager::receiveRegistryMessage,
00162             I2O_SM_PREAMBLE,
00163             XDAQ_ORGANIZATION_ID);
00164   i2o::bind(this,
00165             &StorageManager::receiveDataMessage,
00166             I2O_SM_DATA,
00167             XDAQ_ORGANIZATION_ID);
00168   i2o::bind(this,
00169             &StorageManager::receiveErrorDataMessage,
00170             I2O_SM_ERROR,
00171             XDAQ_ORGANIZATION_ID);
00172   /* no longer used it seems? Don't delete yet
00173   i2o::bind(this,
00174             &StorageManager::receiveOtherMessage,
00175             I2O_SM_OTHER,
00176             XDAQ_ORGANIZATION_ID);
00177   */
00178   i2o::bind(this,
00179             &StorageManager::receiveDQMMessage,
00180             I2O_SM_DQM,
00181             XDAQ_ORGANIZATION_ID);
00183   // Bind web interface
00184   xgi::bind(this,&StorageManager::defaultWebPage,       "Default");
00185   xgi::bind(this,&StorageManager::css,                  "styles.css");
00186   xgi::bind(this,&StorageManager::rbsenderWebPage,      "rbsenderlist");
00187   xgi::bind(this,&StorageManager::streamerOutputWebPage,"streameroutput");
00188   xgi::bind(this,&StorageManager::eventdataWebPage,     "geteventdata");
00189   xgi::bind(this,&StorageManager::headerdataWebPage,    "getregdata");
00190   xgi::bind(this,&StorageManager::consumerWebPage,      "registerConsumer");
00191   xgi::bind(this,&StorageManager::consumerListWebPage,  "consumerList");
00192   xgi::bind(this,&StorageManager::DQMeventdataWebPage,  "getDQMeventdata");
00193   xgi::bind(this,&StorageManager::DQMconsumerWebPage,   "registerDQMConsumer");
00194   xgi::bind(this,&StorageManager::eventServerWebPage,   "EventServerStats");
00195   receivedFrames_ = 0;
00196   pool_is_set_    = 0;
00197   pool_           = 0;
00198   nLogicalDisk_   = 0;
00199   pushmode2proxy_ = false;
00201   // Variables needed for streamer file writing
00202   ispace->fireItemAvailable("pushMode2Proxy", &pushmode2proxy_);
00203   ispace->fireItemAvailable("collateDQM",     &collateDQM_);
00204   ispace->fireItemAvailable("archiveDQM",     &archiveDQM_);
00205   ispace->fireItemAvailable("archiveIntervalDQM",  &archiveIntervalDQM_);
00206   ispace->fireItemAvailable("purgeTimeDQM",   &purgeTimeDQM_);
00207   ispace->fireItemAvailable("readyTimeDQM",   &readyTimeDQM_);
00208   ispace->fireItemAvailable("filePrefixDQM",       &filePrefixDQM_);
00209   ispace->fireItemAvailable("useCompressionDQM",   &useCompressionDQM_);
00210   ispace->fireItemAvailable("compressionLevelDQM", &compressionLevelDQM_);
00211   ispace->fireItemAvailable("nLogicalDisk",        &nLogicalDisk_);
00213   boost::shared_ptr<stor::Parameter> smParameter_ = stor::Configurator::instance()->getParameter();
00214   fileCatalog_        = smParameter_ -> fileCatalog(); 
00215   fileName_           = smParameter_ -> fileName();
00216   filePath_           = smParameter_ -> filePath();
00217   maxFileSize_        = smParameter_ -> maxFileSize();
00218   setupLabel_         = smParameter_ -> setupLabel();
00219   highWaterMark_      = smParameter_ -> highWaterMark();
00220   lumiSectionTimeOut_ = smParameter_ -> lumiSectionTimeOut();
00221   exactFileSizeTest_  = smParameter_ -> exactFileSizeTest();
00223   ispace->fireItemAvailable("fileCatalog",        &fileCatalog_);
00224   ispace->fireItemAvailable("fileName",           &fileName_);
00225   ispace->fireItemAvailable("filePath",           &filePath_);
00226   ispace->fireItemAvailable("maxFileSize",        &maxFileSize_);
00227   ispace->fireItemAvailable("setupLabel",         &setupLabel_);
00228   ispace->fireItemAvailable("highWaterMark",      &highWaterMark_);
00229   ispace->fireItemAvailable("lumiSectionTimeOut", &lumiSectionTimeOut_);
00230   ispace->fireItemAvailable("exactFileSizeTest",  &exactFileSizeTest_);
00231   ispace->fireItemAvailable("fileClosingTestInterval",&fileClosingTestInterval_);
00233   // added for Event Server
00234   maxESEventRate_ = 100.0;  // hertz
00235   ispace->fireItemAvailable("maxESEventRate",&maxESEventRate_);
00236   maxESDataRate_ = 1024.0;  // MB/sec
00237   ispace->fireItemAvailable("maxESDataRate",&maxESDataRate_);
00238   activeConsumerTimeout_ = 60;  // seconds
00239   ispace->fireItemAvailable("activeConsumerTimeout",&activeConsumerTimeout_);
00240   idleConsumerTimeout_ = 120;  // seconds
00241   ispace->fireItemAvailable("idleConsumerTimeout",&idleConsumerTimeout_);
00242   consumerQueueSize_ = 5;
00243   ispace->fireItemAvailable("consumerQueueSize",&consumerQueueSize_);
00244   //ispace->fireItemAvailable("fairShareES",&fairShareES_);
00245   DQMmaxESEventRate_ = 1.0;  // hertz
00246   ispace->fireItemAvailable("DQMmaxESEventRate",&DQMmaxESEventRate_);
00247   DQMactiveConsumerTimeout_ = 60;  // seconds
00248   ispace->fireItemAvailable("DQMactiveConsumerTimeout",&DQMactiveConsumerTimeout_);
00249   DQMidleConsumerTimeout_ = 120;  // seconds
00250   ispace->fireItemAvailable("DQMidleConsumerTimeout",&DQMidleConsumerTimeout_);
00251   DQMconsumerQueueSize_ = 15;
00252   ispace->fireItemAvailable("DQMconsumerQueueSize",&DQMconsumerQueueSize_);
00253   esSelectedHLTOutputModule_ = "out4DQM";
00254   ispace->fireItemAvailable("esSelectedHLTOutputModule",&esSelectedHLTOutputModule_);
00256   // for performance measurements
00257   ispace->fireItemAvailable("receivedSamples4Stats",&samples_);
00258   ispace->fireItemAvailable("receivedPeriod4Stats",&period4samples_);
00259   samples_          = 1000; // measurements every 60MB (about) is the default
00260   period4samples_   = 5;
00261   instantBandwidth_ = 0.;
00262   instantRate_      = 0.;
00263   instantLatency_   = 0.;
00264   totalSamples_     = 0;
00265   duration_         = 0.;
00266   meanBandwidth_    = 0.;
00267   meanRate_         = 0.;
00268   meanLatency_      = 0.;
00270   instantBandwidth2_= 0.;
00271   instantRate2_     = 0.;
00272   instantLatency2_  = 0.;
00273   totalSamples2_    = 0;
00274   duration2_        = 0.;
00275   meanBandwidth2_   = 0.;
00276   meanRate2_        = 0.;
00277   meanLatency2_     = 0.;
00279   maxBandwidth_     = 0.;
00280   minBandwidth_     = 999999.;
00282   maxBandwidth2_    = 0.;
00283   minBandwidth2_    = 999999.; 
00285   pmeter_ = new stor::SMPerformanceMeter();
00286   pmeter_->init(samples_, period4samples_);
00288   string        xmlClass = getApplicationDescriptor()->getClassName();
00289   unsigned long instance = getApplicationDescriptor()->getInstance();
00290   ostringstream sourcename;
00291   // sourcename << xmlClass << "_" << instance;
00292   sourcename << instance;
00293   sourceId_ = sourcename.str();
00294   smParameter_ -> setSmInstance(sourceId_);
00296   storedEventsInStream_.reserve(20);
00297   storedEventsInStream_.clear();
00298   receivedEventsFromOutMod_.reserve(10);
00299   receivedEventsFromOutMod_.clear();
00300   receivedEventsMap_.clear();
00301   avEventSizeMap_.clear();
00302   avCompressRatioMap_.clear();
00303   modId2ModOutMap_.clear();
00304   storedEventsMap_.clear();
00306   // need the line below so that deserializeRegistry can run
00307   // in order to compare two registries (cannot compare byte-for-byte) (if we keep this)
00308   // need line below anyway in case we deserialize DQMEvents for collation
00309   edm::RootAutoLibraryLoader::enable();
00311   // set application icon for hyperdaq
00312   getApplicationDescriptor()->setAttribute("icon", "/evf/images/smicon.jpg");
00313 }
00315 StorageManager::~StorageManager()
00316 {
00317   delete ah_;
00318   delete pmeter_;
00319 }
00321 xoap::MessageReference
00322 StorageManager::ParameterGet(xoap::MessageReference message)
00323   throw (xoap::exception::Exception)
00324 {
00325   connectedRBs_.value_ = smrbsenders_.size();
00326   return Application::ParameterGet(message);
00327 }
00331 void StorageManager::receiveRegistryMessage(toolbox::mem::Reference *ref)
00332 {
00333   // get the memory pool pointer for statistics if not already set
00334   if(pool_is_set_ == 0)
00335   {
00336     pool_ = ref->getBuffer()->getPool();
00337     pool_is_set_ = 1;
00338   }
00340   I2O_MESSAGE_FRAME         *stdMsg  = (I2O_MESSAGE_FRAME*) ref->getDataLocation();
00343   FDEBUG(10) << "StorageManager: Received registry message from HLT " << msg->hltURL
00344              << " application " << msg->hltClassName << " id " << msg->hltLocalId
00345              << " instance " << msg->hltInstance << " tid " << msg->hltTid
00346              << " rbBufferID " << msg->rbBufferID << " outModID " << msg->outModID
00347              << " fuProcID " << msg->fuProcID  << " fuGUID 0x" << std::hex
00348              << msg->fuGUID << std::dec << std::endl;
00349   FDEBUG(10) << "StorageManager: registry size " << msg->dataSize << "\n";
00351   int len = msg->dataSize;
00353   // *** check the Storage Manager is in the Ready or Enabled state first!
00354   if(fsm_.stateName()->toString() != "Enabled" && fsm_.stateName()->toString() != "Ready" )
00355   {
00356     LOG4CPLUS_ERROR(this->getApplicationLogger(),
00357                        "Received INIT message but not in Ready/Enabled state! Current state = "
00358                        << fsm_.stateName()->toString() << " INIT from " << msg->hltURL
00359                        << " application " << msg->hltClassName);
00360     // just release the memory at least - is that what we want to do?
00361     ref->release();
00362     return;
00363   }
00365   // 04-Nov-2008, HWKC and KAB - make local copy of I2O message header so
00366   // that we can use that information even after the Reference is released.
00367   // Do *NOT* use the dataPtr() method of the localMsgCopy because this
00368   // copy operation doesn't include the data, only the header!
00370   const char* from = static_cast<const char*>(ref->getDataLocation());
00371   unsigned int msize = sizeof(I2O_SM_PREAMBLE_MESSAGE_FRAME);
00372   char* dest = (char*) &localMsgCopy;
00373   std::copy(from, from+msize, dest);
00374   localMsgCopy.dataSize = msize;
00376   // If running with local transfers, a chain of I2O frames when posted only has the
00377   // head frame sent. So a single frame can complete a chain for local transfers.
00378   // We need to test for this. Must be head frame, more than one frame
00379   // and next pointer must exist.
00380   int is_local_chain = 0;
00381   if(msg->frameCount == 0 && msg->numFrames > 1 && ref->getNextReference())
00382   {
00383     // this looks like a chain of frames (local transfer)
00384     toolbox::mem::Reference *head = ref;
00385     toolbox::mem::Reference *next = 0;
00386     // best to check the complete chain just in case!
00387     unsigned int tested_frames = 1;
00388     next = head;
00389     while((next=next->getNextReference())!=0) ++tested_frames;
00390     FDEBUG(10) << "StorageManager: INIT Head frame has " << tested_frames-1
00391                << " linked frames out of " << msg->numFrames-1 << std::endl;
00392     if(msg->numFrames == tested_frames)
00393     {
00394       // found a complete linked chain from the leading frame
00395       is_local_chain = 1;
00396       FDEBUG(10) << "StorageManager: Leading frame contains a complete linked chain"
00397                  << " - must be local transfer" << std::endl;
00398       FDEBUG(10) << "StorageManager: Breaking the chain" << std::endl;
00399       // break the chain and feed them to the fragment collector
00400       next = head;
00402       for(int iframe=0; iframe <(int)localMsgCopy.numFrames; ++iframe)
00403       {
00404          toolbox::mem::Reference *thisref=next;
00405          next = thisref->getNextReference();
00406          thisref->setNextReference(0);
00407          I2O_MESSAGE_FRAME          *thisstdMsg = (I2O_MESSAGE_FRAME*)thisref->getDataLocation();
00408          I2O_SM_PREAMBLE_MESSAGE_FRAME *thismsg = (I2O_SM_PREAMBLE_MESSAGE_FRAME*)thisstdMsg;
00410          // 04-Nov-2008, need to make a local copy of I2O header information.
00411          // Do *NOT* use the dataPtr() method of the thisMsgCopy because this
00412          // copy operation doesn't include the data, only the header!
00413          I2O_SM_PREAMBLE_MESSAGE_FRAME thisMsgCopy;
00414          from = static_cast<const char*>(thisref->getDataLocation());
00415          msize = sizeof(I2O_SM_PREAMBLE_MESSAGE_FRAME);
00416          dest = (char*) &thisMsgCopy;
00417          std::copy(from, from+msize, dest);
00418          thisMsgCopy.dataSize = msize;
00420          EventBuffer::ProducerBuffer b(jc_->getFragmentQueue());
00421          int thislen = thismsg->dataSize;
00422          // ***  must give it the 1 of N for this fragment (starts from 0 in i2o header)
00423          new (b.buffer()) stor::FragEntry(thisref, (char*)(thismsg->dataPtr()), thislen,
00424                                           thismsg->frameCount+1, thismsg->numFrames, Header::INIT, 
00425                                           0, thismsg->hltTid, thismsg->outModID,
00426                                           thismsg->fuProcID, thismsg->fuGUID);
00427          std::copy(thismsg->hltURL, thismsg->hltURL+MAX_I2O_SM_URLCHARS,
00428                    static_cast<stor::FragEntry*>(b.buffer())->hltURL_);
00429          std::copy(thismsg->hltClassName, thismsg->hltClassName+MAX_I2O_SM_URLCHARS,
00430                    static_cast<stor::FragEntry*>(b.buffer())->hltClassName_);
00431          static_cast<stor::FragEntry*>(b.buffer())->hltLocalId_ = thismsg->hltLocalId;
00432          static_cast<stor::FragEntry*>(b.buffer())->hltInstance_ = thismsg->hltInstance;
00433          static_cast<stor::FragEntry*>(b.buffer())->hltTid_ = thismsg->hltTid;
00434          static_cast<stor::FragEntry*>(b.buffer())->rbBufferID_ = thismsg->rbBufferID;
00435          b.commit(sizeof(stor::FragEntry));
00437          ++receivedFrames_;
00438          // for bandwidth performance measurements
00439          // Following is wrong for the last frame because frame sent is
00440          // is actually larger than the size taken by actual data
00441          unsigned long actualFrameSize = (unsigned long)sizeof(I2O_SM_PREAMBLE_MESSAGE_FRAME)
00442                                          +thislen;
00443          addMeasurement(actualFrameSize);
00445          // add this output module to the monitoring
00446          bool alreadyStoredOutMod = false;
00447          uint32 moduleId = thisMsgCopy.outModID;
00448          std::string dmoduleLabel("ID_" + smutil_itos(thisMsgCopy.outModID));
00449          if(modId2ModOutMap_.find(moduleId) != modId2ModOutMap_.end()) alreadyStoredOutMod = true;
00450          if(!alreadyStoredOutMod) {
00451            modId2ModOutMap_.insert(std::make_pair(moduleId,dmoduleLabel));
00452            receivedEventsMap_.insert(std::make_pair(dmoduleLabel,0));
00453            avEventSizeMap_.insert(std::make_pair(dmoduleLabel,
00454                    boost::shared_ptr<ForeverAverageCounter>(new ForeverAverageCounter()) ));
00455            avCompressRatioMap_.insert(std::make_pair(dmoduleLabel,
00456                    boost::shared_ptr<ForeverAverageCounter>(new ForeverAverageCounter()) ));
00457          }
00458       }
00460     } else {
00461       // should never get here!
00462       FDEBUG(10) << "StorageManager: INIT Head frame has fewer linked frames "
00463                  << "than expected: abnormal error! " << std::endl;
00464       LOG4CPLUS_ERROR(this->getApplicationLogger(),"INIT Head frame has fewer linked frames" 
00465                       << " than expected: abnormal error! ");
00466     }
00467   }
00469   if (is_local_chain == 0) 
00470   {
00471     // put pointers into fragment collector queue
00472     EventBuffer::ProducerBuffer b(jc_->getFragmentQueue());
00473     // must give it the 1 of N for this fragment (starts from 0 in i2o header)
00474     new (b.buffer()) stor::FragEntry(ref, (char*)(msg->dataPtr()), len,
00475                                      msg->frameCount+1, msg->numFrames, Header::INIT,
00476                                      0, msg->hltTid, msg->outModID,
00477                                      msg->fuProcID, msg->fuGUID);
00478     std::copy(msg->hltURL, msg->hltURL+MAX_I2O_SM_URLCHARS,
00479               static_cast<stor::FragEntry*>(b.buffer())->hltURL_);
00480     std::copy(msg->hltClassName, msg->hltClassName+MAX_I2O_SM_URLCHARS,
00481               static_cast<stor::FragEntry*>(b.buffer())->hltClassName_);
00482     static_cast<stor::FragEntry*>(b.buffer())->hltLocalId_ = msg->hltLocalId;
00483     static_cast<stor::FragEntry*>(b.buffer())->hltInstance_ = msg->hltInstance;
00484     static_cast<stor::FragEntry*>(b.buffer())->hltTid_ = msg->hltTid;
00485     static_cast<stor::FragEntry*>(b.buffer())->rbBufferID_ = msg->rbBufferID;
00486     b.commit(sizeof(stor::FragEntry));
00487     // Frame release is done in the deleter.
00488     ++receivedFrames_;
00489     // for bandwidth performance measurements
00490     unsigned long actualFrameSize = (unsigned long)sizeof(I2O_SM_PREAMBLE_MESSAGE_FRAME)
00491                                     + len;
00492     addMeasurement(actualFrameSize);
00494     // add this output module to the monitoring
00495     bool alreadyStoredOutMod = false;
00496     uint32 moduleId = localMsgCopy.outModID;
00497     std::string dmoduleLabel("ID_" + smutil_itos(localMsgCopy.outModID));
00498     if(modId2ModOutMap_.find(moduleId) != modId2ModOutMap_.end()) alreadyStoredOutMod = true;
00499     if(!alreadyStoredOutMod) {
00500       modId2ModOutMap_.insert(std::make_pair(moduleId,dmoduleLabel));
00501       receivedEventsMap_.insert(std::make_pair(dmoduleLabel,0));
00502       avEventSizeMap_.insert(std::make_pair(dmoduleLabel,
00503           boost::shared_ptr<ForeverAverageCounter>(new ForeverAverageCounter()) ));
00504       avCompressRatioMap_.insert(std::make_pair(dmoduleLabel,
00505           boost::shared_ptr<ForeverAverageCounter>(new ForeverAverageCounter()) ));
00506     }
00507   }
00509   if (  localMsgCopy.frameCount == localMsgCopy.numFrames-1 )
00510   {
00511     string hltClassName(localMsgCopy.hltClassName);
00512     sendDiscardMessage(localMsgCopy.rbBufferID, 
00513                        localMsgCopy.hltInstance, 
00514                        I2O_FU_DATA_DISCARD,
00515                        hltClassName);
00516   }
00517 }
00519 void StorageManager::receiveDataMessage(toolbox::mem::Reference *ref)
00520 {
00521   // get the memory pool pointer for statistics if not already set
00522   if(pool_is_set_ == 0)
00523   {
00524     pool_ = ref->getBuffer()->getPool();
00525     pool_is_set_ = 1;
00526   }
00528   I2O_MESSAGE_FRAME         *stdMsg =
00529     (I2O_MESSAGE_FRAME*)ref->getDataLocation();
00530   I2O_SM_DATA_MESSAGE_FRAME *msg    =
00531     (I2O_SM_DATA_MESSAGE_FRAME*)stdMsg;
00532   FDEBUG(10)   << "StorageManager: Received data message from HLT at " << msg->hltURL 
00533                << " application " << msg->hltClassName << " id " << msg->hltLocalId
00534                << " instance " << msg->hltInstance << " tid " << msg->hltTid
00535                << " rbBufferID " << msg->rbBufferID << " outModID " << msg->outModID
00536                << " fuProcID " << msg->fuProcID  << " fuGUID 0x" << std::hex
00537                << msg->fuGUID << std::dec << std::endl;
00538   FDEBUG(10)   << "                 for run " << msg->runID << " event " << msg->eventID
00539                << " total frames = " << msg->numFrames << std::endl;
00540   FDEBUG(10)   << "StorageManager: Frame " << msg->frameCount << " of " 
00541                << msg->numFrames-1 << std::endl;
00543   int len = msg->dataSize;
00545   // check the storage Manager is in the Ready state first!
00546   if(fsm_.stateName()->toString() != "Enabled")
00547   {
00548     LOG4CPLUS_ERROR(this->getApplicationLogger(),
00549                        "Received EVENT message but not in Enabled state! Current state = "
00550                        << fsm_.stateName()->toString() << " EVENT from" << msg->hltURL
00551                        << " application " << msg->hltClassName);
00552     // just release the memory at least - is that what we want to do?
00553     ref->release();
00554     return;
00555   }
00557   // 04-Nov-2008, HWKC and KAB - make local copy of I2O message header so
00558   // that we can use that information even after the Reference is released.
00559   // Do *NOT* use the dataPtr() method of the localMsgCopy because this
00560   // copy operation doesn't include the data, only the header!
00561   I2O_SM_DATA_MESSAGE_FRAME localMsgCopy;
00562   const char* from = static_cast<const char*>(ref->getDataLocation());
00563   unsigned int msize = sizeof(I2O_SM_DATA_MESSAGE_FRAME);
00564   char* dest = (char*) &localMsgCopy;
00565   std::copy(from, from+msize, dest);
00566   localMsgCopy.dataSize = msize;
00568   // If running with local transfers, a chain of I2O frames when posted only has the
00569   // head frame sent. So a single frame can complete a chain for local transfers.
00570   // We need to test for this. Must be head frame, more than one frame
00571   // and next pointer must exist.
00572   int is_local_chain = 0;
00573   if(msg->frameCount == 0 && msg->numFrames > 1 && ref->getNextReference())
00574   {
00575     // this looks like a chain of frames (local transfer)
00576     toolbox::mem::Reference *head = ref;
00577     toolbox::mem::Reference *next = 0;
00578     // best to check the complete chain just in case!
00579     unsigned int tested_frames = 1;
00580     next = head;
00581     while((next=next->getNextReference())!=0) ++tested_frames;
00582     FDEBUG(10) << "StorageManager: Head frame has " << tested_frames-1
00583                << " linked frames out of " << msg->numFrames-1 << std::endl;
00584     if(msg->numFrames == tested_frames)
00585     {
00586       // found a complete linked chain from the leading frame
00587       is_local_chain = 1;
00588       FDEBUG(10) << "StorageManager: Leading frame contains a complete linked chain"
00589                  << " - must be local transfer" << std::endl;
00590       FDEBUG(10) << "StorageManager: Breaking the chain" << std::endl;
00591       // break the chain and feed them to the fragment collector
00592       next = head;
00594       for(int iframe=0; iframe <(int)localMsgCopy.numFrames; ++iframe)
00595       {
00596          toolbox::mem::Reference *thisref=next;
00597          next = thisref->getNextReference();
00598          thisref->setNextReference(0);
00599          I2O_MESSAGE_FRAME         *thisstdMsg = (I2O_MESSAGE_FRAME*)thisref->getDataLocation();
00600          I2O_SM_DATA_MESSAGE_FRAME *thismsg    = (I2O_SM_DATA_MESSAGE_FRAME*)thisstdMsg;
00602          // 04-Nov-2008, need to make a local copy of I2O header information.
00603          // Do *NOT* use the dataPtr() method of the thisMsgCopy because this
00604          // copy operation doesn't include the data, only the header!
00605          I2O_SM_DATA_MESSAGE_FRAME thisMsgCopy;
00606          from = static_cast<const char*>(thisref->getDataLocation());
00607          msize = sizeof(I2O_SM_DATA_MESSAGE_FRAME);
00608          dest = (char*) &thisMsgCopy;
00609          std::copy(from, from+msize, dest);
00610          thisMsgCopy.dataSize = msize;
00612          EventBuffer::ProducerBuffer b(jc_->getFragmentQueue());
00613          int thislen = thismsg->dataSize;
00614          // ***  must give it the 1 of N for this fragment (starts from 0 in i2o header)
00615          new (b.buffer()) stor::FragEntry(thisref, (char*)(thismsg->dataPtr()), thislen,
00616                                           thismsg->frameCount+1, thismsg->numFrames, Header::EVENT, 
00617                                           thismsg->runID, thismsg->eventID, thismsg->outModID,
00618                                           thismsg->fuProcID, thismsg->fuGUID);
00619          b.commit(sizeof(stor::FragEntry));
00621          ++receivedFrames_;
00622          // for bandwidth performance measurements
00623          // Following is wrong for the last frame because frame sent is
00624          // is actually larger than the size taken by actual data
00625          unsigned long actualFrameSize = (unsigned long)sizeof(I2O_SM_DATA_MESSAGE_FRAME)
00626                                          +thislen;
00627          addMeasurement(actualFrameSize);
00629          // should only do this test if the first data frame from each FU?
00630          // check if run number is the same as that in Run configuration, complain otherwise !!!
00631          // this->runNumber_ comes from the RunBase class that StorageManager inherits from
00632          if(thisMsgCopy.runID != runNumber_)
00633          {
00634            LOG4CPLUS_ERROR(this->getApplicationLogger(),"Run Number from event stream = " << thisMsgCopy.runID
00635                            << " From " << thisMsgCopy.hltURL
00636                            << " Different from Run Number from configuration = " << runNumber_);
00637          }
00638          // for data sender list update
00639          // thisMsgCopy.frameCount start from 0, but in EventMsg header it starts from 1!
00640          bool isLocal = true;
00642          //update last event seen
00643          lastEventSeen_ = thisMsgCopy.eventID;
00645          int status = 
00646            smrbsenders_.updateSender4data(&thisMsgCopy.hltURL[0], &thisMsgCopy.hltClassName[0],
00647                                           thisMsgCopy.hltLocalId, thisMsgCopy.hltInstance, thisMsgCopy.hltTid,
00648                                           thisMsgCopy.runID, thisMsgCopy.eventID, thisMsgCopy.frameCount+1, thisMsgCopy.numFrames,
00649                                           thisMsgCopy.originalSize, isLocal, thisMsgCopy.outModID);
00651          //if(status == 1) ++(storedEvents_.value_);
00652          if(status == 1) {
00653            ++(receivedEvents_.value_);
00654            uint32 moduleId = thisMsgCopy.outModID;
00655            if (modId2ModOutMap_.find(moduleId) != modId2ModOutMap_.end()) {
00656              std::string moduleLabel = modId2ModOutMap_[moduleId];
00657              ++(receivedEventsMap_[moduleLabel]);
00658              avEventSizeMap_[moduleLabel]->addSample((double)thisMsgCopy.originalSize);
00659              // TODO: get the uncompressed size to find compression ratio for stats
00660            }
00661            else {
00662              LOG4CPLUS_WARN(this->getApplicationLogger(),
00663                             "StorageManager::receiveDataMessage: "
00664                             << "Unable to find output module label when "
00665                             << "accumulating statistics for event "
00666                             << thisMsgCopy.eventID << ", output module "
00667                             << thisMsgCopy.outModID << ".");
00668            }
00669          }
00671          if(status == -1) {
00672            LOG4CPLUS_ERROR(this->getApplicationLogger(),
00673                     "updateSender4data: Cannot find RB in Data Sender list!"
00674                     << " With URL "
00675                     << thisMsgCopy.hltURL << " class " << thisMsgCopy.hltClassName  << " instance "
00676                     << thisMsgCopy.hltInstance << " Tid " << thisMsgCopy.hltTid);
00677          }
00678       }
00680     } else {
00681       // should never get here!
00682       FDEBUG(10) << "StorageManager: Head frame has fewer linked frames "
00683                  << "than expected: abnormal error! " << std::endl;
00684       LOG4CPLUS_ERROR(this->getApplicationLogger(),"Head frame has fewer linked frames" 
00685                       << " than expected: abnormal error! ");
00686     }
00687   }
00689   if (is_local_chain == 0) 
00690   {
00691     // put pointers into fragment collector queue
00692     EventBuffer::ProducerBuffer b(jc_->getFragmentQueue());
00693     // must give it the 1 of N for this fragment (starts from 0 in i2o header)
00694     new (b.buffer()) stor::FragEntry(ref, (char*)(msg->dataPtr()), len,
00695                                      msg->frameCount+1, msg->numFrames, Header::EVENT, 
00696                                      msg->runID, msg->eventID, msg->outModID,
00697                                      msg->fuProcID, msg->fuGUID);
00698     b.commit(sizeof(stor::FragEntry));
00699     // Frame release is done in the deleter.
00700     ++receivedFrames_;
00701     // for bandwidth performance measurements
00702     unsigned long actualFrameSize = (unsigned long)sizeof(I2O_SM_DATA_MESSAGE_FRAME)
00703                                     + len;
00704     addMeasurement(actualFrameSize);
00706     // should only do this test if the first data frame from each FU?
00707     // check if run number is the same as that in Run configuration, complain otherwise !!!
00708     // this->runNumber_ comes from the RunBase class that StorageManager inherits from
00709     if(localMsgCopy.runID != runNumber_)
00710     {
00711       LOG4CPLUS_ERROR(this->getApplicationLogger(),"Run Number from event stream = "
00712                       << localMsgCopy.runID << " From " << localMsgCopy.hltURL
00713                       << " Different from Run Number from configuration = " << runNumber_);
00714     }
00716     //update last event seen
00717     lastEventSeen_ = localMsgCopy.eventID;
00719     // for data sender list update
00720     // localMsgCopy.frameCount start from 0, but in EventMsg header it starts from 1!
00721     bool isLocal = false;
00722     int status = 
00723       smrbsenders_.updateSender4data(&localMsgCopy.hltURL[0], &localMsgCopy.hltClassName[0],
00724                                      localMsgCopy.hltLocalId, localMsgCopy.hltInstance, localMsgCopy.hltTid,
00725                                      localMsgCopy.runID, localMsgCopy.eventID, localMsgCopy.frameCount+1, localMsgCopy.numFrames,
00726                                      localMsgCopy.originalSize, isLocal, localMsgCopy.outModID);
00728     //if(status == 1) ++(storedEvents_.value_);
00729     if(status == 1) {
00730       ++(receivedEvents_.value_);
00731       uint32 moduleId = localMsgCopy.outModID;
00732       if (modId2ModOutMap_.find(moduleId) != modId2ModOutMap_.end()) {
00733         std::string moduleLabel = modId2ModOutMap_[moduleId];
00734         ++(receivedEventsMap_[moduleLabel]);
00735         avEventSizeMap_[moduleLabel]->addSample((double)localMsgCopy.originalSize);
00736         // TODO: get the uncompressed size to find compression ratio for stats
00737       }
00738       else {
00739         LOG4CPLUS_WARN(this->getApplicationLogger(),
00740                        "StorageManager::receiveDataMessage: "
00741                        << "Unable to find output module label when "
00742                        << "accumulating statistics for event "
00743                        << localMsgCopy.eventID << ", output module "
00744                        << localMsgCopy.outModID << ".");
00745       }
00746     }
00747     if(status == -1) {
00748       LOG4CPLUS_ERROR(this->getApplicationLogger(),
00749                       "updateSender4data: Cannot find RB in Data Sender list!"
00750                       << " With URL "
00751                       << localMsgCopy.hltURL << " class " << localMsgCopy.hltClassName  << " instance "
00752                       << localMsgCopy.hltInstance << " Tid " << localMsgCopy.hltTid);
00753     }
00754   }
00756   if (  localMsgCopy.frameCount == localMsgCopy.numFrames-1 )
00757     {
00758       string hltClassName(localMsgCopy.hltClassName);
00759       sendDiscardMessage(localMsgCopy.rbBufferID, 
00760                          localMsgCopy.hltInstance, 
00761                          I2O_FU_DATA_DISCARD,
00762                          hltClassName);
00763     }
00764 }
00766 void StorageManager::receiveErrorDataMessage(toolbox::mem::Reference *ref)
00767 {
00768   // get the memory pool pointer for statistics if not already set
00769   if(pool_is_set_ == 0)
00770   {
00771     pool_ = ref->getBuffer()->getPool();
00772     pool_is_set_ = 1;
00773   }
00775   I2O_MESSAGE_FRAME         *stdMsg =
00776     (I2O_MESSAGE_FRAME*)ref->getDataLocation();
00777   I2O_SM_DATA_MESSAGE_FRAME *msg    =
00778     (I2O_SM_DATA_MESSAGE_FRAME*)stdMsg;
00779   FDEBUG(10)   << "StorageManager: Received error data message from HLT at " << msg->hltURL 
00780                << " application " << msg->hltClassName << " id " << msg->hltLocalId
00781                << " instance " << msg->hltInstance << " tid " << msg->hltTid
00782                << " rbBufferID " << msg->rbBufferID << " outModID " << msg->outModID
00783                << " fuProcID " << msg->fuProcID  << " fuGUID 0x" << std::hex
00784                << msg->fuGUID << std::dec << std::endl;
00785   FDEBUG(10)   << "                 for run " << msg->runID << " event " << msg->eventID
00786                << " total frames = " << msg->numFrames << std::endl;
00787   FDEBUG(10)   << "StorageManager: Frame " << msg->frameCount << " of " 
00788                << msg->numFrames-1 << std::endl;
00790   int len = msg->dataSize;
00792   // check the storage Manager is in the Ready state first!
00793   if(fsm_.stateName()->toString() != "Enabled")
00794   {
00795     LOG4CPLUS_ERROR(this->getApplicationLogger(),
00796                        "Received ERROR message but not in Enabled state! Current state = "
00797                        << fsm_.stateName()->toString() << " ERROR from" << msg->hltURL
00798                        << " application " << msg->hltClassName);
00799     // just release the memory at least - is that what we want to do?
00800     ref->release();
00801     return;
00802   }
00804   // 04-Nov-2008, HWKC and KAB - make local copy of I2O message header so
00805   // that we can use that information even after the Reference is released.
00806   // Do *NOT* use the dataPtr() method of the localMsgCopy because this
00807   // copy operation doesn't include the data, only the header!
00808   I2O_SM_DATA_MESSAGE_FRAME localMsgCopy;
00809   const char* from = static_cast<const char*>(ref->getDataLocation());
00810   unsigned int msize = sizeof(I2O_SM_DATA_MESSAGE_FRAME);
00811   char* dest = (char*) &localMsgCopy;
00812   std::copy(from, from+msize, dest);
00813   localMsgCopy.dataSize = msize;
00815   // If running with local transfers, a chain of I2O frames when posted only has the
00816   // head frame sent. So a single frame can complete a chain for local transfers.
00817   // We need to test for this. Must be head frame, more than one frame
00818   // and next pointer must exist.
00819   int is_local_chain = 0;
00820   if(msg->frameCount == 0 && msg->numFrames > 1 && ref->getNextReference())
00821   {
00822     // this looks like a chain of frames (local transfer)
00823     toolbox::mem::Reference *head = ref;
00824     toolbox::mem::Reference *next = 0;
00825     // best to check the complete chain just in case!
00826     unsigned int tested_frames = 1;
00827     next = head;
00828     while((next=next->getNextReference())!=0) ++tested_frames;
00829     FDEBUG(10) << "StorageManager: Head frame has " << tested_frames-1
00830                << " linked frames out of " << msg->numFrames-1 << std::endl;
00831     if(msg->numFrames == tested_frames)
00832     {
00833       // found a complete linked chain from the leading frame
00834       is_local_chain = 1;
00835       FDEBUG(10) << "StorageManager: Leading frame contains a complete linked chain"
00836                  << " - must be local transfer" << std::endl;
00837       FDEBUG(10) << "StorageManager: Breaking the chain" << std::endl;
00838       // break the chain and feed them to the fragment collector
00839       next = head;
00841       for(int iframe=0; iframe <(int)localMsgCopy.numFrames; ++iframe)
00842       {
00843          toolbox::mem::Reference *thisref=next;
00844          next = thisref->getNextReference();
00845          thisref->setNextReference(0);
00846          I2O_MESSAGE_FRAME         *thisstdMsg = (I2O_MESSAGE_FRAME*)thisref->getDataLocation();
00847          I2O_SM_DATA_MESSAGE_FRAME *thismsg    = (I2O_SM_DATA_MESSAGE_FRAME*)thisstdMsg;
00849          // 04-Nov-2008, need to make a local copy of I2O header information.
00850          // Do *NOT* use the dataPtr() method of the thisMsgCopy because this
00851          // copy operation doesn't include the data, only the header!
00852          I2O_SM_DATA_MESSAGE_FRAME thisMsgCopy;
00853          from = static_cast<const char*>(thisref->getDataLocation());
00854          msize = sizeof(I2O_SM_DATA_MESSAGE_FRAME);
00855          dest = (char*) &thisMsgCopy;
00856          std::copy(from, from+msize, dest);
00857          thisMsgCopy.dataSize = msize;
00859          EventBuffer::ProducerBuffer b(jc_->getFragmentQueue());
00860          int thislen = thismsg->dataSize;
00861          // ***  must give it the 1 of N for this fragment (starts from 0 in i2o header)
00862          new (b.buffer()) stor::FragEntry(thisref, (char*)(thismsg->dataPtr()), thislen,
00863                                           thismsg->frameCount+1, thismsg->numFrames, Header::ERROR_EVENT, 
00864                                           thismsg->runID, thismsg->eventID, thismsg->outModID,
00865                                           thismsg->fuProcID, thismsg->fuGUID);
00866          b.commit(sizeof(stor::FragEntry));
00868          ++receivedFrames_;
00869          // for bandwidth performance measurements
00870          // Following is wrong for the last frame because frame sent is
00871          // is actually larger than the size taken by actual data
00872          unsigned long actualFrameSize = (unsigned long)sizeof(I2O_SM_DATA_MESSAGE_FRAME)
00873                                          +thislen;
00874          addMeasurement(actualFrameSize);
00876          // should only do this test if the first data frame from each FU?
00877          // check if run number is the same as that in Run configuration, complain otherwise !!!
00878          // this->runNumber_ comes from the RunBase class that StorageManager inherits from
00879          if(thisMsgCopy.runID != runNumber_)
00880          {
00881            LOG4CPLUS_ERROR(this->getApplicationLogger(),"Run Number from error event stream = "
00882                            << thisMsgCopy.runID << " From " << thisMsgCopy.hltURL
00883                            << " Different from Run Number from configuration = " << runNumber_);
00884          }
00885          // for data sender list update
00886          // thisMsgCopy.frameCount start from 0, but in EventMsg header it starts from 1!
00887          //bool isLocal = true;
00889          //update last error event seen
00890          lastErrorEventSeen_ = thisMsgCopy.eventID;
00892          // TODO need to fix this as the outModId is not valid for error events
00893          /*
00894          int status = 
00895            smrbsenders_.updateSender4data(&thisMsgCopy.hltURL[0], &thisMsgCopy.hltClassName[0],
00896            thisMsgCopy.hltLocalId, thisMsgCopy.hltInstance, thisMsgCopy.hltTid,
00897            thisMsgCopy.runID, thisMsgCopy.eventID, thisMsgCopy.frameCount+1, thisMsgCopy.numFrames,
00898            thisMsgCopy.originalSize, isLocal, thisMsgCopy.outModID);
00899          */
00901          // 13-Aug-2008, KAB - for now, increment the receivedErrorEvent counter
00902          // independent of the result of the updateFUSender4data() call since we
00903          // know that the result is unlikely to be "success"
00904          //if(status == 1) {
00905            ++(receivedErrorEvents_.value_);
00906          //}
00908          /*
00909          if(status == -1) {
00910            LOG4CPLUS_ERROR(this->getApplicationLogger(),
00911                     "updateSender4data: Cannot find RB in Data Sender list!"
00912                     << " For Error Event With URL "
00913                     << thisMsgCopy.hltURL << " class " << thisMsgCopy.hltClassName  << " instance "
00914                     << thisMsgCopy.hltInstance << " Tid " << thisMsgCopy.hltTid);
00915          }
00916          */
00917       }
00919     } else {
00920       // should never get here!
00921       FDEBUG(10) << "StorageManager: Head frame has fewer linked frames "
00922                  << "than expected: abnormal error! " << std::endl;
00923     }
00924   }
00926   if (is_local_chain == 0) 
00927   {
00928     // put pointers into fragment collector queue
00929     EventBuffer::ProducerBuffer b(jc_->getFragmentQueue());
00930     // must give it the 1 of N for this fragment (starts from 0 in i2o header)
00931     new (b.buffer()) stor::FragEntry(ref, (char*)(msg->dataPtr()), len,
00932                                      msg->frameCount+1, msg->numFrames, Header::ERROR_EVENT, 
00933                                      msg->runID, msg->eventID, msg->outModID,
00934                                      msg->fuProcID, msg->fuGUID);
00935     b.commit(sizeof(stor::FragEntry));
00936     // Frame release is done in the deleter.
00937     ++receivedFrames_;
00938     // for bandwidth performance measurements
00939     unsigned long actualFrameSize = (unsigned long)sizeof(I2O_SM_DATA_MESSAGE_FRAME)
00940                                     + len;
00941     addMeasurement(actualFrameSize);
00943     // should only do this test if the first data frame from each FU?
00944     // check if run number is the same as that in Run configuration, complain otherwise !!!
00945     // this->runNumber_ comes from the RunBase class that StorageManager inherits from
00946     if(localMsgCopy.runID != runNumber_)
00947     {
00948       LOG4CPLUS_ERROR(this->getApplicationLogger(),"Run Number from error event stream = "
00949                       << localMsgCopy.runID << " From " << localMsgCopy.hltURL
00950                       << " Different from Run Number from configuration = " << runNumber_);
00951     }
00953     //update last error event seen
00954     lastErrorEventSeen_ = localMsgCopy.eventID;
00956     // for data sender list update
00957     // localMsgCopy.frameCount start from 0, but in EventMsg header it starts from 1!
00958     //bool isLocal = false;
00959     // TODO need to fix this as the outModId is not valid for error events
00960     /*
00961     int status = 
00962       smrbsenders_.updateSender4data(&localMsgCopy.hltURL[0], &localMsgCopy.hltClassName[0],
00963       localMsgCopy.hltLocalId, localMsgCopy.hltInstance, localMsgCopy.hltTid,
00964       localMsgCopy.runID, localMsgCopy.eventID, localMsgCopy.frameCount+1, localMsgCopy.numFrames,
00965       localMsgCopy.originalSize, isLocal, localMsgCopy.outModID);
00966     */
00968     // 13-Aug-2008, KAB - for now, increment the receivedErrorEvent counter
00969     // independent of the result of the updateFUSender4data() call since we
00970     // know that the result is unlikely to be "success"
00971     //if(status == 1) {
00972       ++(receivedErrorEvents_.value_);
00973     //}
00974     /*
00975     if(status == -1) {
00976       LOG4CPLUS_ERROR(this->getApplicationLogger(),
00977                       "updateSender4data: Cannot find RB in Data Sender list!"
00978                       << " For Error Event With URL "
00979                       << localMsgCopy.hltURL << " class " << localMsgCopy.hltClassName  << " instance "
00980                       << localMsgCopy.hltInstance << " Tid " << localMsgCopy.hltTid);
00981     }
00982     */
00983   }
00985   if (  localMsgCopy.frameCount == localMsgCopy.numFrames-1 )
00986     {
00987       string hltClassName(localMsgCopy.hltClassName);
00988       sendDiscardMessage(localMsgCopy.rbBufferID, 
00989                          localMsgCopy.hltInstance, 
00990                          I2O_FU_DATA_DISCARD,
00991                          hltClassName);
00992     }
00993 }
00995 void StorageManager::receiveDQMMessage(toolbox::mem::Reference *ref)
00996 {
00997   // get the memory pool pointer for statistics if not already set
00998   if(pool_is_set_ == 0)
00999   {
01000     pool_ = ref->getBuffer()->getPool();
01001     pool_is_set_ = 1;
01002   }
01004   I2O_MESSAGE_FRAME         *stdMsg =
01005     (I2O_MESSAGE_FRAME*)ref->getDataLocation();
01006   I2O_SM_DQM_MESSAGE_FRAME *msg    =
01007     (I2O_SM_DQM_MESSAGE_FRAME*)stdMsg;
01008   FDEBUG(10) << "StorageManager: Received DQM message from HLT at " << msg->hltURL 
01009              << " application " << msg->hltClassName << " id " << msg->hltLocalId
01010              << " instance " << msg->hltInstance << " tid " << msg->hltTid
01011              << " rbBufferID " << msg->rbBufferID << " folderID " << msg->folderID
01012              << " fuProcID " << msg->fuProcID  << " fuGUID 0x" << std::hex
01013             << msg->fuGUID << std::dec << std::endl;
01014   FDEBUG(10) << "                 for run " << msg->runID << " eventATUpdate = " << msg->eventAtUpdateID
01015              << " total frames = " << msg->numFrames << std::endl;
01016   FDEBUG(10) << "StorageManager: Frame " << msg->frameCount << " of " 
01017              << msg->numFrames-1 << std::endl;
01018   int len = msg->dataSize;
01019   FDEBUG(10) << "StorageManager: received DQM frame size = " << len << std::endl;
01021   // check the storage Manager is in the Ready state first!
01022   if(fsm_.stateName()->toString() != "Enabled")
01023   {
01024     LOG4CPLUS_ERROR(this->getApplicationLogger(),
01025                        "Received DQM message but not in Enabled state! Current state = "
01026                        << fsm_.stateName()->toString() << " DQMMessage from" << msg->hltURL
01027                        << " application " << msg->hltClassName);
01028     // just release the memory at least - is that what we want to do?
01029     ref->release();
01030     return;
01031   }
01032   ++(dqmRecords_.value_);
01034   // 04-Nov-2008, HWKC and KAB - make local copy of I2O message header so
01035   // that we can use that information even after the Reference is released.
01036   // Do *NOT* use the dataPtr() method of the localMsgCopy because this
01037   // copy operation doesn't include the data, only the header!
01038   I2O_SM_DQM_MESSAGE_FRAME localMsgCopy;
01039   const char* from = static_cast<const char*>(ref->getDataLocation());
01040   unsigned int msize = sizeof(I2O_SM_DQM_MESSAGE_FRAME);
01041   char* dest = (char*) &localMsgCopy;
01042   std::copy(from, from+msize, dest);
01043   localMsgCopy.dataSize = msize;
01045   // If running with local transfers, a chain of I2O frames when posted only has the
01046   // head frame sent. So a single frame can complete a chain for local transfers.
01047   // We need to test for this. Must be head frame, more than one frame
01048   // and next pointer must exist.
01049   // -- we have to break chains due to the way the FragmentCollector frees memory
01050   //    for each frame after processing each as the freeing a chain frees all memory
01051   //    in the chain
01052   int is_local_chain = 0;
01053   if(msg->frameCount == 0 && msg->numFrames > 1 && ref->getNextReference())
01054   {
01055     // this looks like a chain of frames (local transfer)
01056     toolbox::mem::Reference *head = ref;
01057     toolbox::mem::Reference *next = 0;
01058     // best to check the complete chain just in case!
01059     unsigned int tested_frames = 1;
01060     next = head;
01061     while((next=next->getNextReference())!=0) ++tested_frames;
01062     FDEBUG(10) << "StorageManager: DQM Head frame has " << tested_frames-1
01063                << " linked frames out of " << msg->numFrames-1 << std::endl;
01064     if(msg->numFrames == tested_frames)
01065     {
01066       // found a complete linked chain from the leading frame
01067       is_local_chain = 1;
01068       FDEBUG(10) << "StorageManager: Leading frame contains a complete linked chain"
01069                  << " - must be local transfer" << std::endl;
01070       FDEBUG(10) << "StorageManager: Breaking the chain" << std::endl;
01071       // break the chain and feed them to the fragment collector
01072       next = head;
01074       for(int iframe=0; iframe <(int)localMsgCopy.numFrames; ++iframe)
01075       {
01076          toolbox::mem::Reference *thisref=next;
01077          next = thisref->getNextReference();
01078          thisref->setNextReference(0);
01079          I2O_MESSAGE_FRAME         *thisstdMsg = (I2O_MESSAGE_FRAME*)thisref->getDataLocation();
01080          I2O_SM_DQM_MESSAGE_FRAME *thismsg    = (I2O_SM_DQM_MESSAGE_FRAME*)thisstdMsg;
01081          EventBuffer::ProducerBuffer b(jc_->getFragmentQueue());
01082          int thislen = thismsg->dataSize;
01083          // ***  must give it the 1 of N for this fragment (starts from 0 in i2o header)
01084          new (b.buffer()) stor::FragEntry(thisref, (char*)(thismsg->dataPtr()), thislen,
01085                                           thismsg->frameCount+1, thismsg->numFrames, Header::DQM_EVENT, 
01086                                           thismsg->runID, thismsg->eventAtUpdateID, thismsg->folderID,
01087                                           thismsg->fuProcID, thismsg->fuGUID);
01088          b.commit(sizeof(stor::FragEntry));
01090          // BE CAREFUL not to use thismsg after this point because it
01091          // may have been released already by the FragmentCollector!
01092          // If it needs to be used, make a copy.
01094          ++receivedFrames_;
01095          // for bandwidth performance measurements
01096          // Following is wrong for the last frame because frame sent is
01097          // is actually larger than the size taken by actual data
01098          unsigned long actualFrameSize = (unsigned long)sizeof(I2O_SM_DQM_MESSAGE_FRAME)
01099                                          +thislen;
01100          addMeasurement(actualFrameSize);
01102          // no data sender list update yet for DQM data, should add it here
01103       }
01105     } else {
01106       // should never get here!
01107       FDEBUG(10) << "StorageManager: DQM Head frame has fewer linked frames "
01108                  << "than expected: abnormal error! " << std::endl;
01109       LOG4CPLUS_ERROR(this->getApplicationLogger(),"DQM Head frame has fewer linked frames" 
01110                       << " than expected: abnormal error! ");
01111     }
01112   }
01114   if (is_local_chain == 0) 
01115   {
01116     // put pointers into fragment collector queue
01117     EventBuffer::ProducerBuffer b(jc_->getFragmentQueue());
01118     // must give it the 1 of N for this fragment (starts from 0 in i2o header)
01119     new (b.buffer()) stor::FragEntry(ref, (char*)(msg->dataPtr()), len,
01120                                      msg->frameCount+1, msg->numFrames, Header::DQM_EVENT, 
01121                                      msg->runID, msg->eventAtUpdateID, msg->folderID,
01122                                      msg->fuProcID, msg->fuGUID);
01123     b.commit(sizeof(stor::FragEntry));
01124     // Frame release is done in the deleter.
01125     ++receivedFrames_;
01126     // for bandwidth performance measurements
01127     unsigned long actualFrameSize = (unsigned long)sizeof(I2O_SM_DQM_MESSAGE_FRAME)
01128                                     + len;
01129     addMeasurement(actualFrameSize);
01131     // no data sender list update yet for DQM data, should add it here
01132   }
01134   if (  localMsgCopy.frameCount == localMsgCopy.numFrames-1 )
01135     {
01136       string hltClassName(localMsgCopy.hltClassName);
01137       sendDiscardMessage(localMsgCopy.rbBufferID, 
01138                          localMsgCopy.hltInstance, 
01139                          I2O_FU_DQM_DISCARD,
01140                          hltClassName);
01141     }
01142 }
01145 void StorageManager::addMeasurement(unsigned long size)
01146 {
01147   // for bandwidth performance measurements, first sample based
01148   if ( pmeter_->addSample(size) )
01149   {
01150     // Copy measurements for our record
01151     stor::SMPerfStats stats = pmeter_->getStats();
01153     instantBandwidth_= stats.shortTermCounter_->getValueRate();
01154     instantRate_     = stats.shortTermCounter_->getSampleRate();
01155     instantLatency_  = 1000000.0 / instantRate_;
01157     double now = ForeverCounter::getCurrentTime();
01158     totalSamples_    = stats.longTermCounter_->getSampleCount();
01159     duration_        = stats.longTermCounter_->getDuration(now);
01160     meanBandwidth_   = stats.longTermCounter_->getValueRate(now);
01161     meanRate_        = stats.longTermCounter_->getSampleRate(now);
01162     meanLatency_     = 1000000.0 / meanRate_;
01164     maxBandwidth_    = stats.maxBandwidth_;
01165     minBandwidth_    = stats.minBandwidth_;
01166   }
01168   // for time period bandwidth performance measurements
01169   if ( pmeter_->getStats().shortPeriodCounter_->hasValidResult() )
01170   {
01171     // Copy measurements for our record
01172     stor::SMPerfStats stats = pmeter_->getStats();
01174     instantBandwidth2_= stats.shortPeriodCounter_->getValueRate();
01175     instantRate2_     = stats.shortPeriodCounter_->getSampleRate();
01176     instantLatency2_  = 1000000.0 / instantRate2_;
01178     double now = ForeverCounter::getCurrentTime();
01179     totalSamples2_    = stats.longTermCounter_->getSampleCount();
01180     duration2_        = stats.longTermCounter_->getDuration(now);
01181     meanBandwidth2_   = stats.longTermCounter_->getValueRate(now);
01182     meanRate2_        = stats.longTermCounter_->getSampleRate(now);
01183     meanLatency2_     = 1000000.0 / meanRate2_;
01185     maxBandwidth2_    = stats.maxBandwidth2_;
01186     minBandwidth2_    = stats.minBandwidth2_;
01187   }
01188   receivedVolume_ = pmeter_->totalvolumemb();
01190   // TODO fixme: Find a better place to put this testing of the Fragment Collector thread status!
01191   // leave this for now until we have the transition available and have clean up code
01192   if(stor::getSMFC_exceptionStatus()) {
01193     // there was a fatal exception in the Fragmentation Collector and
01194     // we want to go to a fail state
01195     //reasonForFailedState_  = stor::getSMFC_reason4Exception();
01196     //fsm_.fireFailed(reasonForFailedState_,this);
01197     edm::LogError("StorageManager") << "Fatal problem in FragmentCollector thread detected! \n"
01198        << stor::getSMFC_reason4Exception();
01199     //@@EM added state transition to failed
01200     reasonForFailedState_ = stor::getSMFC_reason4Exception();
01201     fsm_.fireFailed(reasonForFailedState_,this);
01203   }
01204 }
01207 void StorageManager::defaultWebPage(xgi::Input *in, xgi::Output *out)
01208   throw (xgi::exception::Exception)
01209 {
01210   *out << "<html>"                                                   << endl;
01211   *out << "<head>"                                                   << endl;
01212   *out << "<link type=\"text/css\" rel=\"stylesheet\"";
01213   *out << " href=\"/" <<  getApplicationDescriptor()->getURN()
01214        << "/styles.css\"/>"                   << endl;
01215   *out << "<title>" << getApplicationDescriptor()->getClassName() << " instance "
01216        << getApplicationDescriptor()->getInstance()
01217        << "</title>"     << endl;
01218   *out << "</head><body>"                                            << endl;
01219     *out << "<table border=\"0\" width=\"100%\">"                      << endl;
01220     *out << "<tr>"                                                     << endl;
01221     *out << "  <td align=\"left\">"                                    << endl;
01222     *out << "    <img"                                                 << endl;
01223     *out << "     align=\"middle\""                                    << endl;
01224     *out << "     src=\"/evf/images/smicon.jpg\""                      << endl;
01225     *out << "     alt=\"main\""                                        << endl;
01226     *out << "     width=\"64\""                                        << endl;
01227     *out << "     height=\"64\""                                       << endl;
01228     *out << "     border=\"\"/>"                                       << endl;
01229     *out << "    <b>"                                                  << endl;
01230     *out << getApplicationDescriptor()->getClassName() << " instance "
01231          << getApplicationDescriptor()->getInstance()                  << endl;
01232     *out << "      " << fsm_.stateName()->toString()                   << endl;
01233     *out << "    </b>"                                                 << endl;
01234     *out << "  </td>"                                                  << endl;
01235     *out << "  <td width=\"32\">"                                      << endl;
01236     *out << "    <a href=\"/urn:xdaq-application:lid=3\">"             << endl;
01237     *out << "      <img"                                               << endl;
01238     *out << "       align=\"middle\""                                  << endl;
01239     *out << "       src=\"/hyperdaq/images/HyperDAQ.jpg\""    << endl;
01240     *out << "       alt=\"HyperDAQ\""                                  << endl;
01241     *out << "       width=\"32\""                                      << endl;
01242     *out << "       height=\"32\""                                      << endl;
01243     *out << "       border=\"\"/>"                                     << endl;
01244     *out << "    </a>"                                                 << endl;
01245     *out << "  </td>"                                                  << endl;
01246     *out << "  <td width=\"32\">"                                      << endl;
01247     *out << "  </td>"                                                  << endl;
01248     /* @@EM commented out till there is something to link to
01249     *out << "  <td width=\"32\">"                                      << endl;
01250     *out << "    <a href=\"/" << getApplicationDescriptor()->getURN()
01251          << "/debug\">"                   << endl;
01252     *out << "      <img"                                               << endl;
01253     *out << "       align=\"middle\""                                  << endl;
01254     *out << "       src=\"/evf/images/bugicon.jpg\""                   << endl;
01255     *out << "       alt=\"debug\""                                     << endl;
01256     *out << "       width=\"32\""                                      << endl;
01257     *out << "       height=\"32\""                                     << endl;
01258     *out << "       border=\"\"/>"                                     << endl;
01259     *out << "    </a>"                                                 << endl;
01260     *out << "  </td>"                                                  << endl;
01261     */
01262     *out << "</tr>"                                                    << endl;
01263     if(fsm_.stateName()->value_ == "Failed")
01264     {
01265       *out << "<tr>"                                         << endl;
01266       *out << " <td>"                                        << endl;
01267       *out << "<textarea rows=" << 5 << " cols=60 scroll=yes";
01268       *out << " readonly title=\"Reason For Failed\">"               << endl;
01269       *out << reasonForFailedState_                                  << endl;
01270       *out << "</textarea>"                                          << endl;
01271       *out << " </td>"                                       << endl;
01272       *out << "</tr>"                                        << endl;
01273     }
01274     *out << "</table>"                                                 << endl;
01276   *out << "<hr/>"                                                    << endl;
01277   *out << "<table>"                                                  << endl;
01278   *out << "<tr valign=\"top\">"                                      << endl;
01279   *out << "  <td>"                                                   << endl;
01281   *out << "<table frame=\"void\" rules=\"groups\" class=\"states\""      << endl;
01282   *out << " readonly title=\"Note: parts of this info updates every 10 sec !!!\">"<< endl;
01283   *out << "<colgroup> <colgroup align=\"right\">"                        << endl;
01284     *out << "  <tr>"                                                     << endl;
01285     *out << "    <th colspan=7>"                                         << endl;
01286     *out << "      " << "Storage Manager Statistics"                     << endl;
01287     *out << "    </th>"                                                  << endl;
01288     *out << "  </tr>"                                                    << endl;
01289         *out << "<tr>" << endl;
01290           *out << "<td >" << endl;
01291           *out << "Run Number" << endl;
01292           *out << "</td>" << endl;
01293           *out << "<td align=right>" << endl;
01294           *out << runNumber_ << endl;
01295           *out << "</td>" << endl;
01296           *out << "<td colspan=5>" << endl;
01297           *out << "</td>" << endl;
01298         *out << "</tr>" << endl;
01299         *out << "<tr class=\"special\">" << endl;
01300           *out << "<td >" << endl;
01301           *out << "Total (Non-unique) Events Received" << endl;
01302           *out << "</td>" << endl;
01303           *out << "<td align=right>" << endl;
01304           *out << receivedEvents_ << endl;
01305           *out << "</td>" << endl;
01306           *out << "<td colspan=5>" << endl;
01307           *out << "</td>" << endl;
01308         *out << "</tr>" << endl;
01309         *out << "<tr class=\"special\">" << endl;
01310           *out << "<td >" << endl;
01311           *out << "Output Module" << endl;
01312           *out << "</td>" << endl;
01313           *out << "<td align=center>" << endl;
01314           *out << "Events" << endl;
01315           *out << "</td>" << endl;
01316           *out << "<td align=center>" << endl;
01317           *out << "Size (MB)" << endl;
01318           *out << "</td>" << endl;
01319           *out << "<td align=center>" << endl;
01320           *out << "Size/Evt (KB)" << endl;
01321           *out << "</td>" << endl;
01322           *out << "<td align=center>" << endl;
01323           *out << "RMS (KB)" << endl;
01324           *out << "</td>" << endl;
01325           *out << "<td align=center>" << endl;
01326           *out << "Min (KB)" << endl;
01327           *out << "</td>" << endl;
01328           *out << "<td align=center>" << endl;
01329           *out << "Max (KB)" << endl;
01330           *out << "</td>" << endl;
01331         *out << "</tr>" << endl;
01332         boost::shared_ptr<InitMsgCollection> initMsgCollection;
01333         if(jc_.get() != NULL && jc_->getInitMsgCollection().get() != NULL) {
01334           initMsgCollection = jc_->getInitMsgCollection();
01335         }
01336         idMap_iter oi(modId2ModOutMap_.begin()), oe(modId2ModOutMap_.end());
01337         for( ; oi != oe; ++oi) {
01338           std::string outputModuleLabel = oi->second;
01339           if (initMsgCollection.get() != NULL &&
01340               initMsgCollection->getOutputModuleName(oi->first) != "") {
01341             outputModuleLabel = initMsgCollection->getOutputModuleName(oi->first);
01342           }
01343           *out << "<tr>" << endl;
01344             *out << "<td >" << endl;
01345             *out << outputModuleLabel << endl;
01346             *out << "</td>" << endl;
01347             *out << "<td align=right>" << endl;
01348             //*out << receivedEventsMap_[oi->second] << endl;
01349             *out << receivedEventsMap_[oi->second] << " (" << avEventSizeMap_[oi->second]->getSampleCount() << ") "<< endl;
01350             *out << "</td>" << endl;
01351             *out << "<td align=right>" << endl;
01352             *out << avEventSizeMap_[oi->second]->getValueSum()/(double)0x100000 << endl;
01353             *out << "</td>" << endl;
01354             *out << "<td align=right>" << endl;
01355             *out << avEventSizeMap_[oi->second]->getValueAverage()/(double)0x400 << endl;
01356             *out << "</td>" << endl;
01357             *out << "<td align=right>" << endl;
01358             *out << avEventSizeMap_[oi->second]->getValueRMS()/(double)0x400 << endl;
01359             *out << "</td>" << endl;
01360             *out << "<td align=right>" << endl;
01361             *out << avEventSizeMap_[oi->second]->getValueMin()/(double)0x400 << endl;
01362             *out << "</td>" << endl;
01363             *out << "<td align=right>" << endl;
01364             *out << avEventSizeMap_[oi->second]->getValueMax()/(double)0x400 << endl;
01365             *out << "</td>" << endl;
01366           *out << "</tr>" << endl;
01367         }
01368         *out << "<tr><td bgcolor=\"#999933\" height=\"1\" colspan=\"7\"></td></tr>" << endl;
01370         *out << "<tr class=\"special\">" << endl;
01371           *out << "<td >" << endl;
01372           *out << "Events Stored (updated every 10s)" << endl;
01373           *out << "</td>" << endl;
01374           *out << "<td align=right>" << endl;
01375           *out << storedEvents_ << endl;
01376           *out << "</td>" << endl;
01377           *out << "<td colspan=5>" << endl;
01378           *out << "</td>" << endl;
01379         *out << "</tr>" << endl;
01380         xdata::Vector<xdata::String>::iterator ni(namesOfStream_.begin());
01381         xdata::Vector<xdata::UnsignedInteger32>::iterator si(storedEventsInStream_.begin()), 
01382           se(storedEventsInStream_.end());
01383         for( ; si != se; ++si) {
01384           *out << "<tr>" << endl;
01385             *out << "<td >" << endl;
01386             *out << "Events Stored for Stream " << ni->value_ << endl;
01387             *out << "</td>" << endl;
01388             *out << "<td align=right>" << endl;
01389             *out << si->value_ << endl;
01390             *out << "</td>" << endl;
01391             *out << "<td colspan=5>" << endl;
01392             *out << "</td>" << endl;
01393             ++ni;
01394           *out << "</tr>" << endl;
01395         }
01396         *out << "<tr><td bgcolor=\"#999933\" height=\"1\" colspan=\"7\"></td></tr>" << endl;
01397         *out << "<tr>" << endl;
01398           *out << "<td >" << endl;
01399           *out << "Last Event ID" << endl;
01400           *out << "</td>" << endl;
01401           *out << "<td align=right>" << endl;
01402           *out << lastEventSeen_ << endl;
01403           *out << "</td>" << endl;
01404           *out << "<td colspan=5>" << endl;
01405           *out << "</td>" << endl;
01406         *out << "</tr>" << endl;
01407         *out << "<tr class=\"special\">" << endl;
01408           *out << "<td >" << endl;
01409           *out << "Error Events Received" << endl;
01410           *out << "</td>" << endl;
01411           *out << "<td align=right>" << endl;
01412           *out << receivedErrorEvents_ << endl;
01413           *out << "</td>" << endl;
01414           *out << "<td colspan=5>" << endl;
01415           *out << "</td>" << endl;
01416         *out << "</tr>" << endl;
01417         *out << "<tr>" << endl;
01418           *out << "<td >" << endl;
01419           *out << "Last Error Event ID" << endl;
01420           *out << "</td>" << endl;
01421           *out << "<td align=right>" << endl;
01422           *out << lastErrorEventSeen_ << endl;
01423           *out << "</td>" << endl;
01424           *out << "<td colspan=5>" << endl;
01425           *out << "</td>" << endl;
01426         *out << "</tr>" << endl;
01427         *out << "<tr><td bgcolor=\"#999933\" height=\"1\" colspan=\"7\"></td></tr>" << endl;
01428         int nD = nLogicalDisk_;
01429         if (nD == 0) nD=1;
01430         for(int i=0;i<nD;++i) {
01431            string path(filePath_);
01432            if(nLogicalDisk_>0) {
01433               std::ostringstream oss;
01434               oss << "/" << setfill('0') << std::setw(2) << i; 
01435               path += oss.str();
01436            }
01437            struct statfs64 buf;
01438            int retVal = statfs64(path.c_str(), &buf);
01439            double btotal = 0;
01440            double bfree = 0;
01441            unsigned int used = 0;
01442            if(retVal==0) {
01443               unsigned int blksize = buf.f_bsize;
01444               btotal = buf.f_blocks * blksize / 1024 / 1024 /1024;
01445               bfree  = buf.f_bavail  * blksize / 1024 / 1024 /1024;
01446               used   = (int)(100 * (1. - bfree / btotal)); 
01447            }
01448         *out << "<tr>" << endl;
01449           *out << "<td >" << endl;
01450           *out << "Disk " << i << " usage " << endl;
01451           *out << "</td>" << endl;
01452           if(used>89)
01453              *out << "<td align=right bgcolor=\"#EF5A10\">" << endl;
01454           else 
01455              *out << "<td align=right>" << endl;
01456           *out << used << "% (" << btotal-bfree << " of " << btotal << " GB)" << endl;
01457           *out << "</td>" << endl;
01458           *out << "<td colspan=5>" << endl;
01459           *out << "</td>" << endl;
01460         *out << "</tr>" << endl;
01461         }
01462         *out << "<tr>" << endl;
01463           *out << "<td >" << endl;
01464           *out << "# CopyWorker" << endl;
01465           *out << "</td>" << endl;
01466           *out << "<td align=right>" << endl;
01467           int ps1 = system("exit `ps ax | grep CopyWorker | grep perl | grep -v grep | wc -l`") / 256;
01468           *out << ps1 << endl;
01469           *out << "</td>" << endl;
01470           *out << "<td colspan=5>" << endl;
01471           *out << "</td>" << endl;
01472         *out << "</tr>" << endl;
01473         *out << "<tr>" << endl;
01474           *out << "<td >" << endl;
01475           *out << "# InjectWorker" << endl;
01476           *out << "</td>" << endl;
01477           *out << "<td align=right>" << endl;
01478           int ps2 = system("exit `ps ax | grep InjectWorker | grep perl | grep -v grep | wc -l`") / 256;
01479           *out << ps2 << endl;
01480           *out << "</td>" << endl;
01481           *out << "<td colspan=5>" << endl;
01482           *out << "</td>" << endl;
01483         *out << "</tr>" << endl;
01484     *out << "  <tr>"                                                   << endl;
01485     *out << "    <th colspan=7>"                                       << endl;
01486     *out << "      " << "Output Streams (updated only every 10 sec)"          << endl;
01487     *out << "    </th>"                                                << endl;
01488     *out << "  </tr>"                                                  << endl;
01489         *out << "<tr class=\"special\">"                               << endl;
01490         *out << "<td >" << endl;
01491         *out << "name" << endl;
01492         *out << "</td>" << endl;
01493         *out << "<td align=right>" << endl;
01494         *out << "nfiles" << endl;
01495         *out << "</td>" << endl;
01496         *out << "<td align=right>" << endl;
01497         *out << "nevents" << endl;
01498         *out << "</td>" << endl;
01499         *out << "<td align=right>" << endl;
01500         *out << "size (kB)" << endl;
01501         *out << "</td>" << endl;
01502         *out << "<td colspan=3>" << endl;
01503         *out << "</td>" << endl;
01504         *out << "</tr>" << endl;
01506     for(ismap it = streams_.begin(); it != streams_.end(); ++it)
01507       {
01508         *out << "<tr>" << endl;
01509         *out << "<td >" << endl;
01510         *out << (*it).first << endl;
01511         *out << "</td>" << endl;
01512         *out << "<td align=right>" << endl;
01513         *out << (*it).second.nclosedfiles_ << endl;
01514         *out << "</td>" << endl;
01515         *out << "<td align=right>" << endl;
01516         *out << (*it).second.nevents_ << endl;
01517         *out << "</td>" << endl;
01518         *out << "<td align=right>" << endl;
01519         *out << (*it).second.totSizeInkBytes_ << endl;
01520         *out << "</td>" << endl;
01521         *out << "<td colspan=3>" << endl;
01522         *out << "</td>" << endl;
01523         *out << "  </tr>" << endl;
01524       }
01525     *out << "</table>" << endl;
01527   *out << "<table frame=\"void\" rules=\"groups\" class=\"states\">" << endl;
01528   *out << "<colgroup> <colgroup align=\"rigth\">"                    << endl;
01529     *out << "  <tr>"                                                   << endl;
01530     *out << "    <th colspan=2>"                                       << endl;
01531     *out << "      " << "Received Data Statistics "                    << endl;
01532     *out << "    </th>"                                                << endl;
01533     *out << "  </tr>"                                                  << endl;
01535         *out << "<tr>" << endl;
01536         *out << "<th >" << endl;
01537         *out << "Parameter" << endl;
01538         *out << "</th>" << endl;
01539         *out << "<th>" << endl;
01540         *out << "Value" << endl;
01541         *out << "</th>" << endl;
01542         *out << "</tr>" << endl;
01543         *out << "<tr>" << endl;
01544           *out << "<td >" << endl;
01545           *out << "Frames Received" << endl;
01546           *out << "</td>" << endl;
01547           *out << "<td align=right>" << endl;
01548           *out << receivedFrames_ << endl;
01549           *out << "</td>" << endl;
01550         *out << "  </tr>" << endl;
01551         *out << "<tr>" << endl;
01552           *out << "<td >" << endl;
01553           *out << "DQM Records Received" << endl;
01554           *out << "</td>" << endl;
01555           *out << "<td align=right>" << endl;
01556           *out << dqmRecords_ << endl;
01557           *out << "</td>" << endl;
01558         *out << "  </tr>" << endl;
01559         if(pool_is_set_ == 1) 
01560         {
01561           *out << "<tr>" << endl;
01562             *out << "<td >" << endl;
01563             *out << "Memory Used (Bytes)" << endl;
01564             *out << "</td>" << endl;
01565             *out << "<td align=right>" << endl;
01566             *out << pool_->getMemoryUsage().getUsed() << endl;
01567             *out << "</td>" << endl;
01568           *out << "  </tr>" << endl;
01569         } else {
01570           *out << "<tr>" << endl;
01571             *out << "<td >" << endl;
01572             *out << "Memory Pool pointer not yet available" << endl;
01573             *out << "</td>" << endl;
01574           *out << "  </tr>" << endl;
01575         }
01576 // performance statistics
01577     *out << "  <tr>"                                                   << endl;
01578     *out << "    <th colspan=2>"                                       << endl;
01579     *out << "      " << "Statistics for last " << samples_ << " frames" << " (and last " << period4samples_ << " sec)" << endl;
01580     *out << "    </th>"                                                << endl;
01581     *out << "  </tr>"                                                  << endl;
01582         *out << "<tr>" << endl;
01583           *out << "<td >" << endl;
01584           *out << "Bandwidth (MB/s)" << endl;
01585           *out << "</td>" << endl;
01586           *out << "<td align=right>" << endl;
01587           *out << instantBandwidth_ << " (" << instantBandwidth2_ << ")" << endl;
01588           *out << "</td>" << endl;
01589         *out << "  </tr>" << endl;
01590         *out << "<tr>" << endl;
01591           *out << "<td >" << endl;
01592           *out << "Rate (Frames/s)" << endl;
01593           *out << "</td>" << endl;
01594           *out << "<td align=right>" << endl;
01595           *out << instantRate_ << " (" << instantRate2_ << ")" << endl;
01596           *out << "</td>" << endl;
01597         *out << "  </tr>" << endl;
01598         *out << "<tr>" << endl;
01599           *out << "<td >" << endl;
01600           *out << "Latency (us/frame)" << endl;
01601           *out << "</td>" << endl;
01602           *out << "<td align=right>" << endl;
01603           *out << instantLatency_ << " (" << instantLatency2_ << ")" << endl;
01604           *out << "</td>" << endl;
01605         *out << "  </tr>" << endl;
01606         *out << "<tr>" << endl;
01607           *out << "<td >" << endl;
01608           *out << "Maximum Bandwidth (MB/s)" << endl;
01609           *out << "</td>" << endl;
01610           *out << "<td align=right>" << endl;
01611           *out << maxBandwidth_ << " (" << maxBandwidth2_ << ")" << endl;
01612           *out << "</td>" << endl;
01613         *out << "  </tr>" << endl;
01614         *out << "<tr>" << endl;
01615           *out << "<td >" << endl;
01616           *out << "Minimum Bandwidth (MB/s)" << endl;
01617           *out << "</td>" << endl;
01618           *out << "<td align=right>" << endl;
01619           *out << minBandwidth_ << " (" << minBandwidth2_ << ")" << endl;
01620           *out << "</td>" << endl;
01621         *out << "  </tr>" << endl;
01622 // mean performance statistics for whole run
01623     *out << "  <tr>"                                                   << endl;
01624     *out << "    <th colspan=2>"                                       << endl;
01625     *out << "      " << "Mean Performance for " << totalSamples_ << " (" << totalSamples2_ << ")" << " frames, duration "
01626          << duration_ << " (" << duration2_ << ")" << " seconds" << endl;
01627     *out << "    </th>"                                                << endl;
01628     *out << "  </tr>"                                                  << endl;
01629         *out << "<tr>" << endl;
01630           *out << "<td >" << endl;
01631           *out << "Bandwidth (MB/s)" << endl;
01632           *out << "</td>" << endl;
01633           *out << "<td align=right>" << endl;
01634           *out << meanBandwidth_ << " (" << meanBandwidth2_ << ")" << endl;
01635           *out << "</td>" << endl;
01636         *out << "  </tr>" << endl;
01637         *out << "<tr>" << endl;
01638           *out << "<td >" << endl;
01639           *out << "Rate (Frames/s)" << endl;
01640           *out << "</td>" << endl;
01641           *out << "<td align=right>" << endl;
01642           *out << meanRate_ << " (" << meanRate2_ << ")" << endl;
01643           *out << "</td>" << endl;
01644         *out << "  </tr>" << endl;
01645         *out << "<tr>" << endl;
01646           *out << "<td >" << endl;
01647           *out << "Latency (us/frame)" << endl;
01648           *out << "</td>" << endl;
01649           *out << "<td align=right>" << endl;
01650           *out << meanLatency_ << " (" << meanLatency2_ << ")" << endl;
01651           *out << "</td>" << endl;
01652         *out << "  </tr>" << endl;
01653         *out << "<tr>" << endl;
01654           *out << "<td >" << endl;
01655           *out << "Total Volume Received (MB)" << endl;
01656           *out << "</td>" << endl;
01657           *out << "<td align=right>" << endl;
01658           *out << receivedVolume_ << endl;
01659           *out << "</td>" << endl;
01660         *out << "  </tr>" << endl;
01662   *out << "</table>" << endl;
01664 // statistics for stored data
01666   *out << "<table frame=\"void\" rules=\"groups\" class=\"states\">" << endl;
01667   *out << "<colgroup> <colgroup align=\"rigth\">"                    << endl;
01668     *out << "  <tr>"                                                   << endl;
01669     *out << "    <th colspan=2>"                                       << endl;
01670     *out << "      " << "Stored Data Statistics (updated every 10s) "                    << endl;
01671     *out << "    </th>"                                                << endl;
01672     *out << "  </tr>"                                                  << endl;
01674         *out << "<tr>" << endl;
01675         *out << "<th >" << endl;
01676         *out << "Parameter" << endl;
01677         *out << "</th>" << endl;
01678         *out << "<th>" << endl;
01679         *out << "Value" << endl;
01680         *out << "</th>" << endl;
01681         *out << "</tr>" << endl;
01682         *out << "<tr>" << endl;
01683           *out << "<td >" << endl;
01684           *out << "(Non-unique) Events Stored" << endl;
01685           *out << "</td>" << endl;
01686           *out << "<td align=right>" << endl;
01687           *out << store_totalSamples_ << endl;
01688           *out << "</td>" << endl;
01689         *out << "  </tr>" << endl;
01690 // performance statistics
01691     *out << "  <tr>"                                                   << endl;
01692     *out << "    <th colspan=2>"                                       << endl;
01693     *out << "      " << "Statistics for last " << store_samples_ << " events" << " (and last " << store_period4samples_ << " sec)" << endl;
01694     *out << "    </th>"                                                << endl;
01695     *out << "  </tr>"                                                  << endl;
01696         *out << "<tr>" << endl;
01697           *out << "<td >" << endl;
01698           *out << "Bandwidth (MB/s)" << endl;
01699           *out << "</td>" << endl;
01700           *out << "<td align=right>" << endl;
01701           *out << store_instantBandwidth_ << " (" << store_instantBandwidth2_ << ")" << endl;
01702           *out << "</td>" << endl;
01703         *out << "  </tr>" << endl;
01704         *out << "<tr>" << endl;
01705           *out << "<td >" << endl;
01706           *out << "Rate (Frames/s)" << endl;
01707           *out << "</td>" << endl;
01708           *out << "<td align=right>" << endl;
01709           *out << store_instantRate_ << " (" << store_instantRate2_ << ")" << endl;
01710           *out << "</td>" << endl;
01711         *out << "  </tr>" << endl;
01712         *out << "<tr>" << endl;
01713           *out << "<td >" << endl;
01714           *out << "Latency (us/frame)" << endl;
01715           *out << "</td>" << endl;
01716           *out << "<td align=right>" << endl;
01717           *out << store_instantLatency_ << " (" << store_instantLatency2_ << ")" << endl;
01718           *out << "</td>" << endl;
01719         *out << "  </tr>" << endl;
01720         *out << "<tr>" << endl;
01721           *out << "<td >" << endl;
01722           *out << "Maximum Bandwidth (MB/s)" << endl;
01723           *out << "</td>" << endl;
01724           *out << "<td align=right>" << endl;
01725           *out << store_maxBandwidth_ << " (" << store_maxBandwidth2_ << ")" << endl;
01726           *out << "</td>" << endl;
01727         *out << "  </tr>" << endl;
01728         *out << "<tr>" << endl;
01729           *out << "<td >" << endl;
01730           *out << "Minimum Bandwidth (MB/s)" << endl;
01731           *out << "</td>" << endl;
01732           *out << "<td align=right>" << endl;
01733           *out << store_minBandwidth_ << " (" << store_minBandwidth2_ << ")" << endl;
01734           *out << "</td>" << endl;
01735         *out << "  </tr>" << endl;
01736 // mean performance statistics for whole run
01737     *out << "  <tr>"                                                   << endl;
01738     *out << "    <th colspan=2>"                                       << endl;
01739     *out << "      " << "Mean Performance for " << store_totalSamples_ << " (" << store_totalSamples2_ << ")" << " events, duration "
01740          << store_duration_ << " (" << store_duration2_ << ")" << " seconds" << endl;
01741     *out << "    </th>"                                                << endl;
01742     *out << "  </tr>"                                                  << endl;
01743         *out << "<tr>" << endl;
01744           *out << "<td >" << endl;
01745           *out << "Bandwidth (MB/s)" << endl;
01746           *out << "</td>" << endl;
01747           *out << "<td align=right>" << endl;
01748           *out << store_meanBandwidth_ << " (" << store_meanBandwidth2_ << ")" << endl;
01749           *out << "</td>" << endl;
01750         *out << "  </tr>" << endl;
01751         *out << "<tr>" << endl;
01752           *out << "<td >" << endl;
01753           *out << "Rate (Frames/s)" << endl;
01754           *out << "</td>" << endl;
01755           *out << "<td align=right>" << endl;
01756           *out << store_meanRate_ << " (" << store_meanRate2_ << ")" << endl;
01757           *out << "</td>" << endl;
01758         *out << "  </tr>" << endl;
01759         *out << "<tr>" << endl;
01760           *out << "<td >" << endl;
01761           *out << "Latency (us/frame)" << endl;
01762           *out << "</td>" << endl;
01763           *out << "<td align=right>" << endl;
01764           *out << store_meanLatency_ << " (" << store_meanLatency2_ << ")" << endl;
01765           *out << "</td>" << endl;
01766         *out << "  </tr>" << endl;
01767         *out << "<tr>" << endl;
01768           *out << "<td >" << endl;
01769           *out << "Total Volume Received (MB)" << endl;
01770           *out << "</td>" << endl;
01771           *out << "<td align=right>" << endl;
01772           *out << store_receivedVolume_ << endl;
01773           *out << "</td>" << endl;
01774         *out << "  </tr>" << endl;
01776   *out << "</table>" << endl;
01778   *out << "  </td>"                                                  << endl;
01779   *out << "</table>"                                                 << endl;
01780 // now for RB sender list statistics
01781   *out << "<hr/>"                                                    << endl;
01782   *out << "<table>"                                                  << endl;
01783   *out << "<tr valign=\"top\">"                                      << endl;
01784   *out << "  <td>"                                                   << endl;
01786   *out << "<table frame=\"void\" rules=\"groups\" class=\"states\">" << endl;
01787   *out << "<colgroup> <colgroup align=\"rigth\">"                    << endl;
01788     *out << "  <tr>"                                                   << endl;
01789     *out << "    <th colspan=2>"                                       << endl;
01790     *out << "      " << "RB Sender Information"                            << endl;
01791     *out << "    </th>"                                                << endl;
01792     *out << "  </tr>"                                                  << endl;
01794     *out << "<tr>" << endl;
01795     *out << "<th >" << endl;
01796     *out << "Parameter" << endl;
01797     *out << "</th>" << endl;
01798     *out << "<th>" << endl;
01799     *out << "Value" << endl;
01800     *out << "</th>" << endl;
01801     *out << "</tr>" << endl;
01802         *out << "<tr>" << endl;
01803           *out << "<td >" << endl;
01804           *out << "Number of RB Senders" << endl;
01805           *out << "</td>" << endl;
01806           *out << "<td>" << endl;
01807           *out << smrbsenders_.numberOfRB() << endl;
01808           *out << "</td>" << endl;
01809         *out << "  </tr>" << endl;
01810         *out << "<tr>" << endl;
01811           *out << "<td >" << endl;
01812           *out << "Number of OM per FU" << endl;
01813           *out << "</td>" << endl;
01814           *out << "<td>" << endl;
01815           *out << smrbsenders_.numberOfOM() << endl;
01816           *out << "</td>" << endl;
01817         *out << "  </tr>" << endl;
01818         *out << "<tr>" << endl;
01819           *out << "<td >" << endl;
01820           *out << "Sanity check number in sender list" << endl;
01821           *out << "</td>" << endl;
01822           *out << "<td>" << endl;
01823           *out << smrbsenders_.size() << endl;
01824           *out << "</td>" << endl;
01825         *out << "  </tr>" << endl;
01827   *out << "</table>" << endl;
01829   *out << "<table frame=\"void\" rules=\"groups\" class=\"states\">" << endl;
01830   *out << "<colgroup> <colgroup align=\"rigth\">"                    << endl;
01831     *out << "  <tr>"                                                   << endl;
01832     *out << "    <th colspan=2>"                                       << endl;
01833     *out << "      " << "SM Configuration Information "                << endl;
01834     *out << "    </th>"                                                << endl;
01835     *out << "  </tr>"                                                  << endl;
01837     *out << "<tr>" << endl;
01838     *out << "<th >" << endl;
01839     *out << "Parameter" << endl;
01840     *out << "</th>" << endl;
01841     *out << "<th>" << endl;
01842     *out << "Value" << endl;
01843     *out << "</th>" << endl;
01844     *out << "</tr>" << endl;
01845     *out << "<tr>" << endl;
01846       *out << "<td >" << endl;
01847       *out << "SM CVS Version" << endl;
01848       *out << "</td>" << endl;
01849       *out << "<td>" << endl;
01850       *out << sm_cvs_version_ << endl;
01851       *out << "</td>" << endl;
01852     *out << "  </tr>" << endl;
01853     *out << "<tr class=\"special\">" << endl;
01854       *out << "<td colspan=2>" << endl;
01855       *out << "SM cfg string" << endl;
01856       *out << "</td>" << endl;
01857     *out << "  </tr>" << endl;
01858     *out << "<tr>"                                           << endl;
01859       *out << " <td colspan=2>"                                      << endl;
01860       *out << "<textarea rows=" << 10 << " cols=100 scroll=yes";
01861       *out << " readonly title=\"SM config\">"               << endl;
01862       *out << smConfigString_                                  << endl;
01863       *out << "</textarea>"                                          << endl;
01864       *out << " </td>"                                       << endl;
01865     *out << "</tr>"                                          << endl;
01867   *out << "</table>" << endl;
01869   *out << "  </td>"                                                  << endl;
01870   *out << "</table>"                                                 << endl;
01871   //---- separate pages for RB senders and Streamer Output
01872   *out << "<hr/>"                                                 << endl;
01873   std::string url = getApplicationDescriptor()->getContextDescriptor()->getURL();
01874   std::string urn = getApplicationDescriptor()->getURN();
01875   *out << "<a href=\"" << url << "/" << urn << "/rbsenderlist" << "\">" 
01876        << "RB Sender list web page" << "</a>" << endl;
01877   *out << "<hr/>"                                                 << endl;
01878   *out << "<a href=\"" << url << "/" << urn << "/streameroutput" << "\">" 
01879        << "Streamer Output Status web page" << "</a>" << endl;
01880   *out << "<hr/>"                                                 << endl;
01881   *out << "<a href=\"" << url << "/" << urn << "/EventServerStats?update=off"
01882        << "\">Event Server Statistics" << "</a>" << endl;
01883   /* --- leave these here to debug event server problems
01884   *out << "<a href=\"" << url << "/" << urn << "/geteventdata" << "\">" 
01885        << "Get an event via a web page" << "</a>" << endl;
01886   *out << "<hr/>"                                                 << endl;
01887   *out << "<a href=\"" << url << "/" << urn << "/getregdata" << "\">" 
01888        << "Get a header via a web page" << "</a>" << endl;
01889   */
01891   *out << "</body>"                                                  << endl;
01892   *out << "</html>"                                                  << endl;
01893 }
01897 void StorageManager::rbsenderWebPage(xgi::Input *in, xgi::Output *out)
01898   throw (xgi::exception::Exception)
01899 {
01900   *out << "<html>"                                                   << endl;
01901   *out << "<head>"                                                   << endl;
01902   *out << "<link type=\"text/css\" rel=\"stylesheet\"";
01903   *out << " href=\"/" <<  getApplicationDescriptor()->getURN()
01904        << "/styles.css\"/>"                   << endl;
01905   *out << "<title>" << getApplicationDescriptor()->getClassName() << " instance "
01906        << getApplicationDescriptor()->getInstance()
01907        << "</title>"     << endl;
01908   *out << "</head><body>"                                            << endl;
01909     *out << "<table border=\"0\" width=\"100%\">"                      << endl;
01910     *out << "<tr>"                                                     << endl;
01911     *out << "  <td align=\"left\">"                                    << endl;
01912     *out << "    <img"                                                 << endl;
01913     *out << "     align=\"middle\""                                    << endl;
01914     *out << "     src=\"/rubuilder/fu/images/fu64x64.gif\""     << endl;
01915     *out << "     alt=\"main\""                                        << endl;
01916     *out << "     width=\"64\""                                        << endl;
01917     *out << "     height=\"64\""                                       << endl;
01918     *out << "     border=\"\"/>"                                       << endl;
01919     *out << "    <b>"                                                  << endl;
01920     *out << getApplicationDescriptor()->getClassName() << " instance "
01921          << getApplicationDescriptor()->getInstance()                  << endl;
01922     *out << "      " << fsm_.stateName()->toString()                   << endl;
01923     *out << "    </b>"                                                 << endl;
01924     *out << "  </td>"                                                  << endl;
01925     *out << "  <td width=\"32\">"                                      << endl;
01926     *out << "    <a href=\"/urn:xdaq-application:lid=3\">"             << endl;
01927     *out << "      <img"                                               << endl;
01928     *out << "       align=\"middle\""                                  << endl;
01929     *out << "       src=\"/hyperdaq/images/HyperDAQ.jpg\""    << endl;
01930     *out << "       alt=\"HyperDAQ\""                                  << endl;
01931     *out << "       width=\"32\""                                      << endl;
01932     *out << "       height=\"32\""                                      << endl;
01933     *out << "       border=\"\"/>"                                     << endl;
01934     *out << "    </a>"                                                 << endl;
01935     *out << "  </td>"                                                  << endl;
01936     *out << "  <td width=\"32\">"                                      << endl;
01937     *out << "  </td>"                                                  << endl;
01938     *out << "  <td width=\"32\">"                                      << endl;
01939     *out << "    <a href=\"/" << getApplicationDescriptor()->getURN()
01940          << "/debug\">"                   << endl;
01941     *out << "      <img"                                               << endl;
01942     *out << "       align=\"middle\""                                  << endl;
01943     *out << "       src=\"/rubuilder/fu/images/debug32x32.gif\""         << endl;
01944     *out << "       alt=\"debug\""                                     << endl;
01945     *out << "       width=\"32\""                                      << endl;
01946     *out << "       height=\"32\""                                     << endl;
01947     *out << "       border=\"\"/>"                                     << endl;
01948     *out << "    </a>"                                                 << endl;
01949     *out << "  </td>"                                                  << endl;
01950     *out << "</tr>"                                                    << endl;
01951     if(fsm_.stateName()->value_ == "Failed")
01952     {
01953       *out << "<tr>"                                         << endl;
01954       *out << " <td>"                                        << endl;
01955       *out << "<textarea rows=" << 5 << " cols=60 scroll=yes";
01956       *out << " readonly title=\"Reason For Failed\">"               << endl;
01957       *out << reasonForFailedState_                                  << endl;
01958       *out << "</textarea>"                                          << endl;
01959       *out << " </td>"                                       << endl;
01960       *out << "</tr>"                                        << endl;
01961     }
01962     *out << "</table>"                                                 << endl;
01964   *out << "<hr/>"                                                    << endl;
01966 // now for RB sender list statistics
01967   *out << "<table>"                                                  << endl;
01968   *out << "<tr valign=\"top\">"                                      << endl;
01969   *out << "  <td>"                                                   << endl;
01971   *out << "<table frame=\"void\" rules=\"groups\" class=\"states\">" << endl;
01972   *out << "<colgroup> <colgroup align=\"rigth\">"                    << endl;
01973     *out << "  <tr>"                                                   << endl;
01974     *out << "    <th colspan=2>"                                       << endl;
01975     *out << "      " << "RBxFUxOM Sender List"                            << endl;
01976     *out << "    </th>"                                                << endl;
01977     *out << "  </tr>"                                                  << endl;
01979     *out << "<tr>" << endl;
01980     *out << "<th >" << endl;
01981     *out << "Parameter" << endl;
01982     *out << "</th>" << endl;
01983     *out << "<th>" << endl;
01984     *out << "Value" << endl;
01985     *out << "</th>" << endl;
01986     *out << "</tr>" << endl;
01987         *out << "<tr>" << endl;
01988           *out << "<td >" << endl;
01989           *out << "Number of RB Senders" << endl;
01990           *out << "</td>" << endl;
01991           *out << "<td>" << endl;
01992           *out << smrbsenders_.numberOfRB() << endl;
01993           *out << "</td>" << endl;
01994         *out << "  </tr>" << endl;
01995         *out << "<tr>" << endl;
01996           *out << "<td >" << endl;
01997           *out << "Number of OM per FU" << endl;
01998           *out << "</td>" << endl;
01999           *out << "<td>" << endl;
02000           *out << smrbsenders_.numberOfOM() << endl;
02001           *out << "</td>" << endl;
02002         *out << "  </tr>" << endl;
02003         *out << "<tr>" << endl;
02004           *out << "<td >" << endl;
02005           *out << "Number in list of senders" << endl;
02006           *out << "</td>" << endl;
02007           *out << "<td>" << endl;
02008           *out << smrbsenders_.size() << endl;
02009           *out << "</td>" << endl;
02010         *out << "  </tr>" << endl;
02011     std::vector<boost::shared_ptr<SMFUSenderStats> > vrbstats = smrbsenders_.getSenderStats();
02012     if(!vrbstats.empty()) {
02013       for(vector<boost::shared_ptr<SMFUSenderStats> >::iterator pos = vrbstats.begin();
02014           pos != vrbstats.end(); ++pos)
02015       {
02016         *out << "<tr>" << endl;
02017           *out << "<td >" << endl;
02018           *out << "RB Sender URL" << endl;
02019           *out << "</td>" << endl;
02020           *out << "<td align=right>" << endl;
02021           char hlturl[MAX_I2O_SM_URLCHARS];
02022           copy(&(((*pos)->hltURL_)->at(0)), 
02023                &(((*pos)->hltURL_)->at(0)) + ((*pos)->hltURL_)->size(),
02024                hlturl);
02025           hlturl[((*pos)->hltURL_)->size()] = '\0';
02026           *out << hlturl << endl;
02027           *out << "</td>" << endl;
02028         *out << "  </tr>" << endl;
02029         *out << "<tr>" << endl;
02030           *out << "<td >" << endl;
02031           *out << "RB Sender Class Name" << endl;
02032           *out << "</td>" << endl;
02033           *out << "<td align=right>" << endl;
02034           char hltclass[MAX_I2O_SM_URLCHARS];
02035           copy(&(((*pos)->hltClassName_)->at(0)), 
02036                &(((*pos)->hltClassName_)->at(0)) + ((*pos)->hltClassName_)->size(),
02037                hltclass);
02038           hltclass[((*pos)->hltClassName_)->size()] = '\0';
02039           *out << hltclass << endl;
02040           *out << "</td>" << endl;
02041         *out << "  </tr>" << endl;
02042         *out << "<tr>" << endl;
02043           *out << "<td >" << endl;
02044           *out << "RB Sender Instance" << endl;
02045           *out << "</td>" << endl;
02046           *out << "<td align=right>" << endl;
02047           *out << (*pos)->hltInstance_ << endl;
02048           *out << "</td>" << endl;
02049         *out << "  </tr>" << endl;
02050         *out << "<tr>" << endl;
02051           *out << "<td >" << endl;
02052           *out << "RB Sender Local ID" << endl;
02053           *out << "</td>" << endl;
02054           *out << "<td align=right>" << endl;
02055           *out << (*pos)->hltLocalId_ << endl;
02056           *out << "</td>" << endl;
02057         *out << "  </tr>" << endl;
02058         *out << "<tr>" << endl;
02059           *out << "<td >" << endl;
02060           *out << "RB Sender Tid" << endl;
02061           *out << "</td>" << endl;
02062           *out << "<td align=right>" << endl;
02063           *out << (*pos)->hltTid_ << endl;
02064           *out << "</td>" << endl;
02065         *out << "  </tr>" << endl;
02066         *out << "<tr>" << endl;
02067           *out << "<td >" << endl;
02068           *out << "FU Sender id (shared memory id!)" << endl;
02069           *out << "</td>" << endl;
02070           *out << "<td align=right>" << endl;
02071           *out << (*pos)->rbBufferID_ << endl;
02072           *out << "</td>" << endl;
02073         *out << "  </tr>" << endl;
02074         *out << "<tr>" << endl;
02075           *out << "<td >" << endl;
02076           *out << "Number of registries received (output modules)" << endl;
02077           *out << "</td>" << endl;
02078           *out << "<td align=right>" << endl;
02079           *out << (*pos)->registryCollection_.outModName_.size() << endl;
02080           *out << "</td>" << endl;
02081         *out << "  </tr>" << endl;
02082         *out << "<tr><td bgcolor=\"#999933\" height=\"1\" colspan=\"2\"></td></tr>" << endl;
02083         // Loop over number of registries
02084         if(!(*pos)->registryCollection_.outModName_.empty()) {
02085           for(vector<std::string>::iterator idx = (*pos)->registryCollection_.outModName_.begin();
02086               idx != (*pos)->registryCollection_.outModName_.end(); ++idx)
02087           {
02088             *out << "<tr>" << endl;
02089               *out << "<td >" << endl;
02090               *out << "Output Module Name" << endl;
02091               *out << "</td>" << endl;
02092               *out << "<td align=right>" << endl;
02093               *out << (*idx) << endl;
02094               *out << "</td>" << endl;
02095             *out << "  </tr>" << endl;
02096             *out << "<tr>" << endl;
02097               *out << "<td >" << endl;
02098               *out << "Output Module Id" << endl;
02099               *out << "</td>" << endl;
02100               *out << "<td align=right>" << endl;
02101               *out << (*pos)->registryCollection_.outModName2ModId_[*idx] << endl;
02102               *out << "</td>" << endl;
02103             *out << "  </tr>" << endl;
02104             *out << "<tr>" << endl;
02105               *out << "<td >" << endl;
02106               *out << "Product registry size (bytes)" << endl;
02107               *out << "</td>" << endl;
02108               *out << "<td align=right>" << endl;
02109               *out << (*pos)->registryCollection_.registrySizeMap_[*idx] << endl;
02110               *out << "</td>" << endl;
02111             *out << "  </tr>" << endl;
02112             *out << "<tr><td bgcolor=\"#999933\" height=\"1\" colspan=\"2\"></td></tr>" << endl;
02113           }
02114         }
02115         *out << "<tr>" << endl;
02116           *out << "<td>" << endl;
02117           *out << "Connection Status" << endl;
02118           *out << "</td>" << endl;
02119           *out << "<td align=right>" << endl;
02120           *out << (*pos)->connectStatus_ << endl;
02121           *out << "</td>" << endl;
02122         *out << "  </tr>" << endl;
02123         if((*pos)->connectStatus_ > 1) {
02124           *out << "<tr>" << endl;
02125             *out << "<td >" << endl;
02126             *out << "Time since last data frame (ms)" << endl;
02127             *out << "</td>" << endl;
02128             *out << "<td align=right>" << endl;
02129             *out << (*pos)->timeWaited_ << endl;
02130             *out << "</td>" << endl;
02131           *out << "  </tr>" << endl;
02132           *out << "<tr>" << endl;
02133             *out << "<td >" << endl;
02134             *out << "Run number" << endl;
02135             *out << "</td>" << endl;
02136             *out << "<td align=right>" << endl;
02137             *out << (*pos)->runNumber_ << endl;
02138             *out << "</td>" << endl;
02139           *out << "  </tr>" << endl;
02140           *out << "<tr>" << endl;
02141             *out << "<td >" << endl;
02142             *out << "Running locally" << endl;
02143             *out << "</td>" << endl;
02144             *out << "<td align=right>" << endl;
02145             if((*pos)->isLocal_) {
02146               *out << "Yes" << endl;
02147             } else {
02148               *out << "No" << endl;
02149             }
02150             *out << "</td>" << endl;
02151           *out << "  </tr>" << endl;
02152           *out << "<tr>" << endl;
02153             *out << "<td >" << endl;
02154             *out << "Frames received" << endl;
02155             *out << "</td>" << endl;
02156             *out << "<td align=right>" << endl;
02157             *out << (*pos)->framesReceived_ << endl;
02158             *out << "</td>" << endl;
02159           *out << "  </tr>" << endl;
02160           *out << "<tr>" << endl;
02161             *out << "<td >" << endl;
02162             *out << "Events received" << endl;
02163             *out << "</td>" << endl;
02164             *out << "<td align=right>" << endl;
02165             *out << (*pos)->eventsReceived_ << endl;
02166             *out << "</td>" << endl;
02167           *out << "  </tr>" << endl;
02168           *out << "<tr>" << endl;
02169             *out << "<td >" << endl;
02170             *out << "Total Bytes received" << endl;
02171             *out << "</td>" << endl;
02172             *out << "<td align=right>" << endl;
02173             *out << (*pos)->totalSizeReceived_ << endl;
02174             *out << "</td>" << endl;
02175           *out << "  </tr>" << endl;
02176           *out << "<tr><td bgcolor=\"#999933\" height=\"1\" colspan=\"2\"></td></tr>" << endl;
02177           // Loop over number of output modules
02178           if(!(*pos)->registryCollection_.outModName_.empty()) {
02179             for(vector<std::string>::iterator idx = (*pos)->registryCollection_.outModName_.begin();
02180                 idx != (*pos)->registryCollection_.outModName_.end(); ++idx)
02181             {
02182               *out << "<tr>" << endl;
02183                 *out << "<td >" << endl;
02184                 *out << "Output Module Name" << endl;
02185                 *out << "</td>" << endl;
02186                 *out << "<td align=right>" << endl;
02187                 *out << (*idx) << endl;
02188                 *out << "</td>" << endl;
02189               *out << "  </tr>" << endl;
02190               *out << "<tr>" << endl;
02191                 *out << "<td >" << endl;
02192                 *out << "Frames received" << endl;
02193                 *out << "</td>" << endl;
02194                 *out << "<td align=right>" << endl;
02195                 *out << (*pos)->datCollection_.framesReceivedMap_[*idx] << endl;
02196                 *out << "</td>" << endl;
02197               *out << "  </tr>" << endl;
02198               *out << "<tr>" << endl;
02199                 *out << "<td >" << endl;
02200                 *out << "Events received" << endl;
02201                 *out << "</td>" << endl;
02202                 *out << "<td align=right>" << endl;
02203                 *out << (*pos)->datCollection_.eventsReceivedMap_[*idx] << endl;
02204                 *out << "</td>" << endl;
02205               *out << "  </tr>" << endl;
02206               *out << "<tr>" << endl;
02207                 *out << "<td >" << endl;
02208                 *out << "Total Bytes received" << endl;
02209                 *out << "</td>" << endl;
02210                 *out << "<td align=right>" << endl;
02211                 *out << (*pos)->datCollection_.totalSizeReceivedMap_[*idx] << endl;
02212                 *out << "</td>" << endl;
02213               *out << "  </tr>" << endl;
02214               *out << "<tr><td bgcolor=\"#999933\" height=\"1\" colspan=\"2\"></td></tr>" << endl;
02215             }
02216           }
02217           if((*pos)->eventsReceived_ > 0) {
02218             *out << "<tr>" << endl;
02219               *out << "<td >" << endl;
02220               *out << "Last frame latency (us)" << endl;
02221               *out << "</td>" << endl;
02222               *out << "<td align=right>" << endl;
02223               *out << (*pos)->lastLatency_ << endl;
02224               *out << "</td>" << endl;
02225             *out << "  </tr>" << endl;
02226             *out << "<tr>" << endl;
02227               *out << "<td >" << endl;
02228               *out << "Average event size (Bytes)" << endl;
02229               *out << "</td>" << endl;
02230               *out << "<td align=right>" << endl;
02231               *out << (*pos)->totalSizeReceived_/(*pos)->eventsReceived_ << endl;
02232               *out << "</td>" << endl;
02233               *out << "<tr>" << endl;
02234                 *out << "<td >" << endl;
02235                 *out << "Last Run Number" << endl;
02236                 *out << "</td>" << endl;
02237                 *out << "<td align=right>" << endl;
02238                 *out << (*pos)->lastRunID_ << endl;
02239                 *out << "</td>" << endl;
02240               *out << "  </tr>" << endl;
02241               *out << "<tr>" << endl;
02242                 *out << "<td >" << endl;
02243                 *out << "Last Event Number" << endl;
02244                 *out << "</td>" << endl;
02245                 *out << "<td align=right>" << endl;
02246                 *out << (*pos)->lastEventID_ << endl;
02247                 *out << "</td>" << endl;
02248               *out << "  </tr>" << endl;
02249             } // events received endif
02250           *out << "  </tr>" << endl;
02251           *out << "<tr>" << endl;
02252             *out << "<td >" << endl;
02253             *out << "Total out of order frames" << endl;
02254             *out << "</td>" << endl;
02255             *out << "<td align=right>" << endl;
02256             *out << (*pos)->totalOutOfOrder_ << endl;
02257             *out << "</td>" << endl;
02258           *out << "  </tr>" << endl;
02259           *out << "<tr>" << endl;
02260             *out << "<td >" << endl;
02261             *out << "Total Bad Events" << endl;
02262             *out << "</td>" << endl;
02263             *out << "<td align=right>" << endl;
02264             *out << (*pos)->totalBadEvents_ << endl;
02265             *out << "</td>" << endl;
02266           *out << "  </tr>" << endl;
02267         } // connect status endif
02268       } // Sender list loop
02269     } //sender size test endif
02271   *out << "</table>" << endl;
02273   *out << "  </td>"                                                  << endl;
02274   *out << "</table>"                                                 << endl;
02276   *out << "</body>"                                                  << endl;
02277   *out << "</html>"                                                  << endl;
02278 }
02282 void StorageManager::streamerOutputWebPage(xgi::Input *in, xgi::Output *out)
02283   throw (xgi::exception::Exception)
02284 {
02285   *out << "<html>"                                                   << endl;
02286   *out << "<head>"                                                   << endl;
02287   *out << "<link type=\"text/css\" rel=\"stylesheet\"";
02288   *out << " href=\"/" <<  getApplicationDescriptor()->getURN()
02289        << "/styles.css\"/>"                   << endl;
02290   *out << "<title>" << getApplicationDescriptor()->getClassName() << " instance "
02291        << getApplicationDescriptor()->getInstance()
02292        << "</title>"     << endl;
02293   *out << "</head><body>"                                            << endl;
02294     *out << "<table border=\"0\" width=\"100%\">"                      << endl;
02295     *out << "<tr>"                                                     << endl;
02296     *out << "  <td align=\"left\">"                                    << endl;
02297     *out << "    <img"                                                 << endl;
02298     *out << "     align=\"middle\""                                    << endl;
02299     *out << "     src=\"/rubuilder/fu/images/fu64x64.gif\""     << endl;
02300     *out << "     alt=\"main\""                                        << endl;
02301     *out << "     width=\"64\""                                        << endl;
02302     *out << "     height=\"64\""                                       << endl;
02303     *out << "     border=\"\"/>"                                       << endl;
02304     *out << "    <b>"                                                  << endl;
02305     *out << getApplicationDescriptor()->getClassName() << " instance "
02306          << getApplicationDescriptor()->getInstance()                  << endl;
02307     *out << "      " << fsm_.stateName()->toString()                   << endl;
02308     *out << "    </b>"                                                 << endl;
02309     *out << "  </td>"                                                  << endl;
02310     *out << "  <td width=\"32\">"                                      << endl;
02311     *out << "    <a href=\"/urn:xdaq-application:lid=3\">"             << endl;
02312     *out << "      <img"                                               << endl;
02313     *out << "       align=\"middle\""                                  << endl;
02314     *out << "       src=\"/hyperdaq/images/HyperDAQ.jpg\""    << endl;
02315     *out << "       alt=\"HyperDAQ\""                                  << endl;
02316     *out << "       width=\"32\""                                      << endl;
02317     *out << "       height=\"32\""                                      << endl;
02318     *out << "       border=\"\"/>"                                     << endl;
02319     *out << "    </a>"                                                 << endl;
02320     *out << "  </td>"                                                  << endl;
02321     *out << "  <td width=\"32\">"                                      << endl;
02322     *out << "  </td>"                                                  << endl;
02323     *out << "  <td width=\"32\">"                                      << endl;
02324     *out << "    <a href=\"/" << getApplicationDescriptor()->getURN()
02325          << "/debug\">"                   << endl;
02326     *out << "      <img"                                               << endl;
02327     *out << "       align=\"middle\""                                  << endl;
02328     *out << "       src=\"/rubuilder/fu/images/debug32x32.gif\""       << endl;
02329     *out << "       alt=\"debug\""                                     << endl;
02330     *out << "       width=\"32\""                                      << endl;
02331     *out << "       height=\"32\""                                     << endl;
02332     *out << "       border=\"\"/>"                                     << endl;
02333     *out << "    </a>"                                                 << endl;
02334     *out << "  </td>"                                                  << endl;
02335     *out << "</tr>"                                                    << endl;
02336     if(fsm_.stateName()->value_ == "Failed")
02337     {
02338       *out << "<tr>"                                         << endl;
02339       *out << " <td>"                                        << endl;
02340       *out << "<textarea rows=" << 5 << " cols=60 scroll=yes";
02341       *out << " readonly title=\"Reason For Failed\">"               << endl;
02342       *out << reasonForFailedState_                                  << endl;
02343       *out << "</textarea>"                                          << endl;
02344       *out << " </td>"                                       << endl;
02345       *out << "</tr>"                                        << endl;
02346     }
02347     *out << "</table>"                                                 << endl;
02349     *out << "<hr/>"                                                    << endl;
02351     // should first test if jc_ is valid
02352     if(jc_.get() != NULL && jc_->getInitMsgCollection().get() != NULL &&
02353        jc_->getInitMsgCollection()->size() > 0) {
02354       boost::mutex::scoped_lock sl(halt_lock_);
02355       if(jc_.use_count() != 0) {
02356         std::list<std::string>& files = jc_->get_filelist();
02357         if(files.size() > 0 )
02358           {
02359             if(files.size() > 249 )
02360               *out << "<P>250 last files (most recent first):</P>\n" << endl;
02361             else 
02362               *out << "<P>Files (most recent first):</P>\n" << endl;
02363             *out << "<pre># pathname nevts size" << endl;
02364             int c=0;
02365             for(list<string>::reverse_iterator it = files.rbegin(); it != files.rend(); ++it) {
02366               *out <<*it << endl;
02367               ++c;
02368               if(c>249) break;
02369             }
02370           }
02371       }
02372     }
02374   *out << "</body>"                                                  << endl;
02375   *out << "</html>"                                                  << endl;
02376 }
02380 void StorageManager::eventdataWebPage(xgi::Input *in, xgi::Output *out)
02381   throw (xgi::exception::Exception)
02382 {
02383   // default the message length to zero
02384   int len=0;
02386   // determine the consumer ID from the event request
02387   // message, if it is available.
02388   unsigned int consumerId = 0;
02389   int consumerInitMsgCount = -1;
02390   std::string lengthString = in->getenv("CONTENT_LENGTH");
02391   unsigned long contentLength = std::atol(lengthString.c_str());
02392   if (contentLength > 0) 
02393     {
02394       auto_ptr< vector<char> > bufPtr(new vector<char>(contentLength));
02395       in->read(&(*bufPtr)[0], contentLength);
02396       OtherMessageView requestMessage(&(*bufPtr)[0]);
02397       if (requestMessage.code() == Header::EVENT_REQUEST)
02398         {
02399           uint8 *bodyPtr = requestMessage.msgBody();
02400           consumerId = convert32(bodyPtr);
02401           if (requestMessage.bodySize() >= (2 * sizeof(char_uint32)))
02402             {
02403               bodyPtr += sizeof(char_uint32);
02404               consumerInitMsgCount = convert32(bodyPtr);
02405             }
02406         }
02407     }
02409   // first test if StorageManager is in Enabled state and registry is filled
02410   // this must be the case for valid data to be present
02411   if(fsm_.stateName()->toString() == "Enabled" && jc_.get() != NULL &&
02412      jc_->getInitMsgCollection().get() != NULL &&
02413      jc_->getInitMsgCollection()->size() > 0)
02414   {
02415     boost::shared_ptr<EventServer> eventServer = jc_->getEventServer();
02416     if (eventServer.get() != NULL)
02417     {
02418       // if we've stored a "registry warning" in the consumer pipe, send
02419       // that instead of an event so that the consumer can react to
02420       // the warning
02421       boost::shared_ptr<ConsumerPipe> consPtr =
02422         eventServer->getConsumer(consumerId);
02423       if (consPtr.get() != NULL && consPtr->hasRegistryWarning())
02424       {
02425         std::vector<char> registryWarning = consPtr->getRegistryWarning();
02426         const char* from = &registryWarning[0];
02427         unsigned int msize = registryWarning.size();
02428         if(mybuffer_.capacity() < msize) mybuffer_.resize(msize);
02429         unsigned char* pos = (unsigned char*) &mybuffer_[0];
02431         copy(from,from+msize,pos);
02432         len = msize;
02433         consPtr->clearRegistryWarning();
02434       }
02435       // if the consumer is an instance of the proxy server and
02436       // it knows about fewer INIT messages than we do, tell it
02437       // that new INIT message(s) are available
02438       else if (consPtr.get() != NULL && consPtr->isProxyServer() &&
02439                consumerInitMsgCount >= 0 &&
02440                jc_->getInitMsgCollection()->size() > consumerInitMsgCount)
02441       {
02442         OtherMessageBuilder othermsg(&mybuffer_[0],
02443                                      Header::NEW_INIT_AVAILABLE);
02444         len = othermsg.size();
02445       }
02446       // otherwise try to send an event
02447       else
02448       {
02449         boost::shared_ptr< std::vector<char> > bufPtr =
02450           eventServer->getEvent(consumerId);
02451         if (bufPtr.get() != NULL)
02452         {
02453           EventMsgView msgView(&(*bufPtr)[0]);
02455           unsigned char* from = msgView.startAddress();
02456           unsigned int dsize = msgView.size();
02457           if(mybuffer_.capacity() < dsize) mybuffer_.resize(dsize);
02458           unsigned char* pos = (unsigned char*) &mybuffer_[0];
02460           copy(from,from+dsize,pos);
02461           len = dsize;
02462           FDEBUG(10) << "sending event " << msgView.event() << std::endl;
02463         }
02464       }
02465     }
02467     out->getHTTPResponseHeader().addHeader("Content-Type", "application/octet-stream");
02468     out->getHTTPResponseHeader().addHeader("Content-Transfer-Encoding", "binary");
02469     out->write((char*) &mybuffer_[0],len);
02470   } // else send end of run as reponse
02471   else
02472     {
02473       OtherMessageBuilder othermsg(&mybuffer_[0],Header::DONE);
02474       len = othermsg.size();
02475       //std::cout << "making other message code = " << othermsg.code()
02476       //          << " and size = " << othermsg.size() << std::endl;
02478       out->getHTTPResponseHeader().addHeader("Content-Type", "application/octet-stream");
02479       out->getHTTPResponseHeader().addHeader("Content-Transfer-Encoding", "binary");
02480       out->write((char*) &mybuffer_[0],len);
02481     }
02483   // How to block if there is no data
02484   // How to signal if end, and there will be no more data?
02486 }
02490 void StorageManager::headerdataWebPage(xgi::Input *in, xgi::Output *out)
02491   throw (xgi::exception::Exception)
02492 {
02493   unsigned int len = 0;
02495   // determine the consumer ID from the header request
02496   // message, if it is available.
02497   auto_ptr< vector<char> > httpsPostData;
02498   unsigned int consumerId = 0;
02499   std::string lengthString = in->getenv("CONTENT_LENGTH");
02500   unsigned long contentLength = std::atol(lengthString.c_str());
02501   if (contentLength > 0) {
02502     auto_ptr< vector<char> > bufPtr(new vector<char>(contentLength));
02503     in->read(&(*bufPtr)[0], contentLength);
02504     OtherMessageView requestMessage(&(*bufPtr)[0]);
02505     if (requestMessage.code() == Header::HEADER_REQUEST)
02506     {
02507       uint8 *bodyPtr = requestMessage.msgBody();
02508       consumerId = convert32(bodyPtr);
02509     }
02511     // save the post data for use outside the "if" block scope in case it is
02512     // useful later (it will still get deleted at the end of the method)
02513     httpsPostData = bufPtr;
02514   }
02516   // first test if StorageManager is in Enabled state and registry is filled
02517   // this must be the case for valid data to be present
02518   if(fsm_.stateName()->toString() == "Enabled" && jc_.get() != NULL &&
02519      jc_->getInitMsgCollection().get() != NULL &&
02520      jc_->getInitMsgCollection()->size() > 0)
02521     {
02522       std::string errorString;
02523       InitMsgSharedPtr serializedProds;
02524       boost::shared_ptr<EventServer> eventServer = jc_->getEventServer();
02525       if (eventServer.get() != NULL)
02526       {
02527         boost::shared_ptr<ConsumerPipe> consPtr =
02528           eventServer->getConsumer(consumerId);
02529         if (consPtr.get() != NULL)
02530         {
02531           // limit this (and other) interaction with the InitMsgCollection
02532           // to a single thread so that we can present a coherent
02533           // picture to consumers
02534           boost::mutex::scoped_lock sl(consumerInitMsgLock_);
02535           boost::shared_ptr<InitMsgCollection> initMsgCollection =
02536             jc_->getInitMsgCollection();
02538           try
02539           {
02540             if (consPtr->isProxyServer())
02541             {
02542               // If the INIT message collection has more than one element,
02543               // we build up a special response message that contains all
02544               // of the INIT messages in the collection (code = INIT_SET).
02545               // We can use an InitMsgBuffer to do this (and assign it
02546               // to the serializedProds local variable) because it
02547               // is really just a vector of char (it doesn't have any
02548               // internal structure that limits it to being used just for
02549               // single INIT messages).
02550               if (initMsgCollection->size() > 1)
02551               {
02552                 serializedProds = initMsgCollection->getFullCollection();
02553               }
02554               else
02555               {
02556                 serializedProds = initMsgCollection->getLastElement();
02557               }
02558             }
02559             else
02560             {
02561               std::string hltOMLabel = consPtr->getHLTOutputSelection();
02562               serializedProds =
02563                 initMsgCollection->getElementForOutputModule(hltOMLabel);
02564             }
02565             if (serializedProds.get() != NULL)
02566             {
02567               uint8* regPtr = &(*serializedProds)[0];
02568               HeaderView hdrView(regPtr);
02570               // if the response that we're sending is an INIT_SET rather
02571               // than a single INIT message, we simply use the first INIT
02572               // message in the collection to initialize the local 
02573               // ConsumerPipe.  Since all we need is the
02574               // full trigger list, any of the INIT messages should be fine
02575               // (because all of them should have the same full trigger list).
02576               if (hdrView.code() == Header::INIT_SET) {
02577                 OtherMessageView otherView(&(*serializedProds)[0]);
02578                 regPtr = otherView.msgBody();
02579               }
02581               Strings triggerNameList;
02582               InitMsgView initView(regPtr);
02583               initView.hltTriggerNames(triggerNameList);
02585               uint32 outputModuleId;
02586               if (initView.protocolVersion() >= 6) {
02587                 outputModuleId = initView.outputModuleId();
02588               }
02589               else {
02590                 std::string moduleLabel = initView.outputModuleLabel();
02591                 uLong crc = crc32(0L, Z_NULL, 0);
02592                 Bytef* crcbuf = (Bytef*);
02593                 crc = crc32(crc, crcbuf, moduleLabel.length());
02594                 outputModuleId = static_cast<uint32>(crc);
02595               }
02596               consPtr->initializeSelection(triggerNameList,
02597                                            outputModuleId);
02598             }
02599           }
02600           catch (const edm::Exception& excpt)
02601           {
02602             errorString = excpt.what();
02603           }
02604           catch (const cms::Exception& excpt)
02605           {
02606             //errorString.append(excpt.what());
02607             errorString.append("ERROR: The configuration for this ");
02608             errorString.append("consumer does not specify an HLT output ");
02609             errorString.append("module.\nPlease specify one of the HLT ");
02610             errorString.append("output modules listed below as the ");
02611             errorString.append("SelectHLTOutput parameter ");
02612             errorString.append("in the InputSource configuration.\n");
02613             errorString.append(initMsgCollection->getSelectionHelpString());
02614             errorString.append("\n");
02615           }
02616         }
02617       }
02618       if (errorString.length() > 0) {
02619         len = errorString.length();
02620       }
02621       else if (serializedProds.get() != NULL) {
02622         len = serializedProds->size();
02623       }
02624       else {
02625         len = 0;
02626       }
02627       if (mybuffer_.capacity() < len) mybuffer_.resize(len);
02628       if (errorString.length() > 0) {
02629         const char *errorBytes = errorString.c_str();
02630         for (unsigned int i=0; i<len; ++i) mybuffer_[i]=errorBytes[i];
02631       }
02632       else if (serializedProds.get() != NULL) {
02633         for (unsigned int i=0; i<len; ++i) mybuffer_[i]=(*serializedProds)[i];
02634       }
02635     }
02637   out->getHTTPResponseHeader().addHeader("Content-Type", "application/octet-stream");
02638   out->getHTTPResponseHeader().addHeader("Content-Transfer-Encoding", "binary");
02639   out->write((char*) &mybuffer_[0],len);
02641   // How to block if there is no header data
02642   // How to signal if not yet started, so there is no registry yet?
02643 }
02645 void StorageManager::consumerListWebPage(xgi::Input *in, xgi::Output *out)
02646   throw (xgi::exception::Exception)
02647 {
02648   char buffer[65536];
02650   out->getHTTPResponseHeader().addHeader("Content-Type", "application/xml");
02651   sprintf(buffer,
02652           "<?xml version=\"1.0\" encoding=\"iso-8859-1\"?>\n<Monitor>\n");
02653   out->write(buffer,strlen(buffer));
02655   if(fsm_.stateName()->toString() == "Enabled")
02656   {
02657     sprintf(buffer, "<ConsumerList>\n");
02658     out->write(buffer,strlen(buffer));
02660     boost::shared_ptr<EventServer> eventServer;
02661     if (jc_.get() != NULL)
02662     {
02663       eventServer = jc_->getEventServer();
02664     }
02665     if (eventServer.get() != NULL)
02666     {
02667       std::map< uint32, boost::shared_ptr<ConsumerPipe> > consumerTable = 
02668         eventServer->getConsumerTable();
02669       std::map< uint32, boost::shared_ptr<ConsumerPipe> >::const_iterator 
02670         consumerIter;
02671       for (consumerIter = consumerTable.begin();
02672            consumerIter != consumerTable.end();
02673            ++consumerIter)
02674       {
02675         boost::shared_ptr<ConsumerPipe> consumerPipe = consumerIter->second;
02676         sprintf(buffer, "<Consumer>\n");
02677         out->write(buffer,strlen(buffer));
02679         if (consumerPipe->isProxyServer()) {
02680           sprintf(buffer, "<Name>Proxy Server</Name>\n");
02681         }
02682         else {
02683           sprintf(buffer, "<Name>%s</Name>\n",
02684                   consumerPipe->getConsumerName().c_str());
02685         }
02686         out->write(buffer,strlen(buffer));
02688         sprintf(buffer, "<ID>%d</ID>\n", consumerPipe->getConsumerId());
02689         out->write(buffer,strlen(buffer));
02691         sprintf(buffer, "<Time>%d</Time>\n", 
02692                 (int)consumerPipe->getLastEventRequestTime());
02693         out->write(buffer,strlen(buffer));
02695         sprintf(buffer, "<Host>%s</Host>\n", 
02696                 consumerPipe->getHostName().c_str());
02697         out->write(buffer,strlen(buffer));
02699         sprintf(buffer, "<Events>%d</Events>\n", consumerPipe->getEvents());
02700         out->write(buffer,strlen(buffer));
02702         sprintf(buffer, "<Failed>%d</Failed>\n", 
02703                 consumerPipe->getPushEventFailures());
02704         out->write(buffer,strlen(buffer));
02706         sprintf(buffer, "<Idle>%d</Idle>\n", consumerPipe->isIdle());
02707         out->write(buffer,strlen(buffer));
02709         sprintf(buffer, "<Disconnected>%d</Disconnected>\n", 
02710                 consumerPipe->isDisconnected());
02711         out->write(buffer,strlen(buffer));
02713         sprintf(buffer, "<Ready>%d</Ready>\n", consumerPipe->isReadyForEvent());
02714         out->write(buffer,strlen(buffer));
02716         sprintf(buffer, "</Consumer>\n");
02717         out->write(buffer,strlen(buffer));
02718       }
02719     }
02720     boost::shared_ptr<DQMEventServer> dqmServer;
02721     if (jc_.get() != NULL)
02722     {
02723       dqmServer = jc_->getDQMEventServer();
02724     }
02725     if (dqmServer.get() != NULL)
02726     {
02727       std::map< uint32, boost::shared_ptr<DQMConsumerPipe> > dqmTable = 
02728         dqmServer->getConsumerTable();
02729       std::map< uint32, boost::shared_ptr<DQMConsumerPipe> >::const_iterator 
02730         dqmIter;
02731       for (dqmIter = dqmTable.begin();
02732            dqmIter != dqmTable.end();
02733            ++dqmIter)
02734       {
02735         boost::shared_ptr<DQMConsumerPipe> dqmPipe = dqmIter->second;
02736         sprintf(buffer, "<DQMConsumer>\n");
02737         out->write(buffer,strlen(buffer));
02739         if (dqmPipe->isProxyServer()) {
02740           sprintf(buffer, "<Name>Proxy Server</Name>\n");
02741         }
02742         else {
02743           sprintf(buffer, "<Name>%s</Name>\n",
02744                   dqmPipe->getConsumerName().c_str());
02745         }
02746         out->write(buffer,strlen(buffer));
02748         sprintf(buffer, "<ID>%d</ID>\n", dqmPipe->getConsumerId());
02749         out->write(buffer,strlen(buffer));
02751         sprintf(buffer, "<Time>%d</Time>\n", 
02752                 (int)dqmPipe->getLastEventRequestTime());
02753         out->write(buffer,strlen(buffer));
02755         sprintf(buffer, "<Host>%s</Host>\n", 
02756                 dqmPipe->getHostName().c_str());
02757         out->write(buffer,strlen(buffer));
02759         sprintf(buffer, "<Events>%d</Events>\n", dqmPipe->getEvents());
02760         out->write(buffer,strlen(buffer));
02762         sprintf(buffer, "<Failed>%d</Failed>\n", 
02763                 dqmPipe->getPushEventFailures());
02764         out->write(buffer,strlen(buffer));
02766         sprintf(buffer, "<Idle>%d</Idle>\n", dqmPipe->isIdle());
02767         out->write(buffer,strlen(buffer));
02769         sprintf(buffer, "<Disconnected>%d</Disconnected>\n", 
02770                 dqmPipe->isDisconnected());
02771         out->write(buffer,strlen(buffer));
02773         sprintf(buffer, "<Ready>%d</Ready>\n", dqmPipe->isReadyForEvent());
02774         out->write(buffer,strlen(buffer));
02776         sprintf(buffer, "</DQMConsumer>\n");
02777         out->write(buffer,strlen(buffer));
02778       }
02779     }
02780     sprintf(buffer, "</ConsumerList>\n");
02781     out->write(buffer,strlen(buffer));
02782   }
02783   sprintf(buffer, "</Monitor>");
02784   out->write(buffer,strlen(buffer));
02785 }
02788 void StorageManager::eventServerWebPage(xgi::Input *in, xgi::Output *out)
02789   throw (xgi::exception::Exception)
02790 {
02791   // We should make the HTML header and the page banner common
02792   std::string url =
02793     getApplicationDescriptor()->getContextDescriptor()->getURL();
02794   std::string urn = getApplicationDescriptor()->getURN();
02796   // determine whether we're automatically updating the page
02797   // --> if the SM is not enabled, assume that users want updating turned
02798   // --> ON so that they don't A) think that is is ON (when it's not) and
02799   // --> B) wait forever thinking that something is wrong.
02800   //bool autoUpdate = true;
02801   // 11-Jun-2008, KAB - changed auto update default to OFF
02802   bool autoUpdate = false;
02803   if(fsm_.stateName()->toString() == "Enabled") {
02804     cgicc::Cgicc cgiWrapper(in);
02805     cgicc::const_form_iterator updateRef = cgiWrapper.getElement("update");
02806     if (updateRef != cgiWrapper.getElements().end()) {
02807       std::string updateString =
02808         boost::algorithm::to_lower_copy(updateRef->getValue());
02809       if (updateString == "off") {
02810         autoUpdate = false;
02811       }
02812       else {
02813         autoUpdate = true;
02814       }
02815     }
02816   }
02818   *out << "<html>" << std::endl;
02819   *out << "<head>" << std::endl;
02820   if (autoUpdate) {
02821     *out << "<meta https-equiv=\"refresh\" content=\"10\">" << std::endl;
02822   }
02823   *out << "<link type=\"text/css\" rel=\"stylesheet\"";
02824   *out << " href=\"/" << urn << "/styles.css\"/>" << std::endl;
02825   *out << "<title>" << getApplicationDescriptor()->getClassName()
02826        << " Instance " << getApplicationDescriptor()->getInstance()
02827        << "</title>" << std::endl;
02828   *out << "<style type=\"text/css\">" << std::endl;
02829   *out << "  .noBotMarg {margin-bottom:0px;}" << std::endl;
02830   *out << "</style>" << std::endl;
02831   *out << "</head><body>" << std::endl;
02833   *out << "<table border=\"1\" width=\"100%\">"                      << endl;
02834   *out << "<tr>"                                                     << endl;
02835   *out << "  <td align=\"left\">"                                    << endl;
02836   *out << "    <img"                                                 << endl;
02837   *out << "     align=\"middle\""                                    << endl;
02838   *out << "     src=\"/evf/images/smicon.jpg\""                      << endl;
02839   *out << "     alt=\"main\""                                        << endl;
02840   *out << "     width=\"64\""                                        << endl;
02841   *out << "     height=\"64\""                                       << endl;
02842   *out << "     border=\"\"/>"                                       << endl;
02843   *out << "    <b>"                                                  << endl;
02844   *out << getApplicationDescriptor()->getClassName() << " Instance "
02845        << getApplicationDescriptor()->getInstance();
02846   *out << ", State is " << fsm_.stateName()->toString()              << endl;
02847   *out << "    </b>"                                                 << endl;
02848   *out << "  </td>"                                                  << endl;
02849   *out << "  <td width=\"32\">"                                      << endl;
02850   *out << "    <a href=\"/urn:xdaq-application:lid=3\">"             << endl;
02851   *out << "      <img"                                               << endl;
02852   *out << "       align=\"middle\""                                  << endl;
02853   *out << "       src=\"/hyperdaq/images/HyperDAQ.jpg\""             << endl;
02854   *out << "       alt=\"HyperDAQ\""                                  << endl;
02855   *out << "       width=\"32\""                                      << endl;
02856   *out << "       height=\"32\""                                     << endl;
02857   *out << "       border=\"\"/>"                                     << endl;
02858   *out << "    </a>"                                                 << endl;
02859   *out << "  </td>"                                                  << endl;
02860   *out << "</tr>"                                                    << endl;
02861   if(fsm_.stateName()->value_ == "Failed")
02862   {
02863     *out << "<tr>"                                                   << endl;
02864     *out << " <td>"                                                  << endl;
02865     *out << "<textarea rows=" << 5 << " cols=60 scroll=yes";
02866     *out << " readonly title=\"Reason For Failed\">"                 << endl;
02867     *out << reasonForFailedState_                                    << endl;
02868     *out << "</textarea>"                                            << endl;
02869     *out << " </td>"                                                 << endl;
02870     *out << "</tr>"                                                  << endl;
02871   }
02872   *out << "</table>"                                                 << endl;
02874   if(fsm_.stateName()->toString() == "Enabled")
02875   {
02876     boost::shared_ptr<EventServer> eventServer;
02877     boost::shared_ptr<InitMsgCollection> initMsgCollection;
02878     if (jc_.get() != NULL)
02879     {
02880       eventServer = jc_->getEventServer();
02881       initMsgCollection = jc_->getInitMsgCollection();
02882     }
02883     if (eventServer.get() != NULL && initMsgCollection.get() != NULL)
02884     {
02885       if (initMsgCollection->size() > 0)
02886       {
02887         int displayedConsumerCount = 0;
02888         double eventSum = 0.0;
02889         double eventRateSum = 0.0;
02890         double dataRateSum = 0.0;
02892         double now = ForeverCounter::getCurrentTime();
02893         *out << "<table border=\"0\" width=\"100%\">" << std::endl;
02894         *out << "<tr>" << std::endl;
02895         *out << "  <td width=\"25%\" align=\"center\">" << std::endl;
02896         *out << "  </td>" << std::endl;
02897         *out << "    &nbsp;" << std::endl;
02898         *out << "  <td width=\"50%\" align=\"center\">" << std::endl;
02899         *out << "    <font size=\"+2\"><b>Event Server Statistics</b></font>"
02900              << std::endl;
02901         *out << "    <br/>" << std::endl;
02902         *out << "    Data rates are reported in MB/sec." << std::endl;
02903         *out << "    <br/>" << std::endl;
02904         *out << "    Maximum input event rate is "
02905              << eventServer->getMaxEventRate() << " Hz." << std::endl;
02906         *out << "    <br/>" << std::endl;
02907         *out << "    Maximum input data rate is "
02908              << eventServer->getMaxDataRate() << " MB/sec." << std::endl;
02909         *out << "    <br/>" << std::endl;
02910         *out << "    Consumer queue size is " << consumerQueueSize_
02911              << "." << std::endl;
02912         *out << "    <br/>" << std::endl;
02913         *out << "    Selected HLT output module is "
02914              << eventServer->getHLTOutputSelection()
02915              << "." << std::endl;
02916         //*out << "    Fair-share event serving is ";
02917         //if (fairShareES_) {
02918         //  *out << "ON." << std::endl;
02919         //}
02920         //else {
02921         //  *out << "OFF." << std::endl;
02922         //}
02923         *out << "  </td>" << std::endl;
02924         *out << "  <td width=\"25%\" align=\"center\">" << std::endl;
02925         if (autoUpdate) {
02926           *out << "    <a href=\"" << url << "/" << urn
02927                << "/EventServerStats?update=off\">Turn updating OFF</a>"
02928                << std::endl;
02929         }
02930         else {
02931           *out << "    <a href=\"" << url << "/" << urn
02932                << "/EventServerStats?update=on\">Turn updating ON</a>"
02933                << std::endl;
02934         }
02935         *out << "    <br/><br/>" << std::endl;
02936         *out << "    <a href=\"" << url << "/" << urn
02937              << "\">Back to SM Status</a>"
02938              << std::endl;
02939         *out << "  </td>" << std::endl;
02940         *out << "</tr>" << std::endl;
02941         *out << "</table>" << std::endl;
02943         *out << "<h3>Event Server:</h3>" << std::endl;
02944         *out << "<h4 class=\"noBotMarg\">Input Events, Recent Results:</h4>" << std::endl;
02945         *out << "<font size=\"-1\">(Events can be double-counted if they are sent by multiple output modules.)</font><br/><br/>" << std::endl;
02946         *out << "<table border=\"1\" width=\"100%\">" << std::endl;
02947         *out << "<tr>" << std::endl;
02948         *out << "  <th>HLT Output Module</th>" << std::endl;
02949         *out << "  <th>Event Count</th>" << std::endl;
02950         *out << "  <th>Event Rate</th>" << std::endl;
02951         *out << "  <th>Data Rate</th>" << std::endl;
02952         *out << "  <th>Duration (sec)</th>" << std::endl;
02953         *out << "</tr>" << std::endl;
02955         eventSum = 0.0;
02956         eventRateSum = 0.0;
02957         dataRateSum = 0.0;
02958         for (int idx = 0; idx < initMsgCollection->size(); ++idx) {
02959           InitMsgSharedPtr serializedProds = initMsgCollection->getElementAt(idx);
02960           InitMsgView initView(&(*serializedProds)[0]);
02961           uint32 outputModuleId = initView.outputModuleId();
02963           eventSum += eventServer->getEventCount(EventServer::SHORT_TERM_STATS,
02964                                                  EventServer::INPUT_STATS,
02965                                                  outputModuleId, now);
02966           eventRateSum += eventServer->getEventRate(EventServer::SHORT_TERM_STATS,
02967                                                     EventServer::INPUT_STATS,
02968                                                     outputModuleId, now);
02969           dataRateSum += eventServer->getDataRate(EventServer::SHORT_TERM_STATS,
02970                                                   EventServer::INPUT_STATS,
02971                                                   outputModuleId, now);
02973           *out << "<tr>" << std::endl;
02974           *out << "  <td align=\"center\">" << initView.outputModuleLabel()
02975                << "</td>" << std::endl;
02976           *out << "  <td align=\"center\">"
02977                << eventServer->getEventCount(EventServer::SHORT_TERM_STATS,
02978                                              EventServer::INPUT_STATS,
02979                                              outputModuleId, now)
02980                << "</td>" << std::endl;
02981           *out << "  <td align=\"center\">"
02982                << eventServer->getEventRate(EventServer::SHORT_TERM_STATS,
02983                                             EventServer::INPUT_STATS,
02984                                             outputModuleId, now)
02985                << "</td>" << std::endl;
02986           *out << "  <td align=\"center\">"
02987                << eventServer->getDataRate(EventServer::SHORT_TERM_STATS,
02988                                            EventServer::INPUT_STATS,
02989                                            outputModuleId, now)
02990                << "</td>" << std::endl;
02991           *out << "  <td align=\"center\">"
02992                << eventServer->getDuration(EventServer::SHORT_TERM_STATS,
02993                                            EventServer::INPUT_STATS,
02994                                            outputModuleId, now)
02995                << "</td>" << std::endl;
02996           *out << "</tr>" << std::endl;
02997         }
02999         // add a row with the totals
03000         if (initMsgCollection->size() > 1) {
03001           *out << "<tr>" << std::endl;
03002           *out << "  <td align=\"center\">Totals</td>" << std::endl;
03003           *out << "  <td align=\"center\">" << eventSum << "</td>" << std::endl;
03004           *out << "  <td align=\"center\">" << eventRateSum << "</td>" << std::endl;
03005           *out << "  <td align=\"center\">" << dataRateSum << "</td>" << std::endl;
03006           *out << "  <td align=\"center\">&nbsp;</td>" << std::endl;
03007           *out << "</tr>" << std::endl;
03008         }
03009         *out << "</table>" << std::endl;
03011         *out << "<h4 class=\"noBotMarg\">Accepted Unique Events, Recent Results:</h4>" << std::endl;
03012         *out << "<font size=\"-1\">(Events can be double-counted if they are sent by multiple output modules.)</font><br/><br/>" << std::endl;
03013         *out << "<table border=\"1\" width=\"100%\">" << std::endl;
03014         *out << "<tr>" << std::endl;
03015         *out << "  <th>HLT Output Module</th>" << std::endl;
03016         *out << "  <th>Event Count</th>" << std::endl;
03017         *out << "  <th>Event Rate</th>" << std::endl;
03018         *out << "  <th>Data Rate</th>" << std::endl;
03019         *out << "  <th>Duration (sec)</th>" << std::endl;
03020         *out << "</tr>" << std::endl;
03022         eventSum = 0.0;
03023         eventRateSum = 0.0;
03024         dataRateSum = 0.0;
03025         for (int idx = 0; idx < initMsgCollection->size(); ++idx) {
03026           InitMsgSharedPtr serializedProds = initMsgCollection->getElementAt(idx);
03027           InitMsgView initView(&(*serializedProds)[0]);
03028           uint32 outputModuleId = initView.outputModuleId();
03030           eventSum += eventServer->getEventCount(EventServer::SHORT_TERM_STATS,
03031                                                  EventServer::UNIQUE_ACCEPT_STATS,
03032                                                  outputModuleId, now);
03033           eventRateSum += eventServer->getEventRate(EventServer::SHORT_TERM_STATS,
03034                                                     EventServer::UNIQUE_ACCEPT_STATS,
03035                                                     outputModuleId, now);
03036           dataRateSum += eventServer->getDataRate(EventServer::SHORT_TERM_STATS,
03037                                                   EventServer::UNIQUE_ACCEPT_STATS,
03038                                                   outputModuleId, now);
03040           *out << "<tr>" << std::endl;
03041           *out << "  <td align=\"center\">" << initView.outputModuleLabel()
03042                << "</td>" << std::endl;
03043           *out << "  <td align=\"center\">"
03044                << eventServer->getEventCount(EventServer::SHORT_TERM_STATS,
03045                                              EventServer::UNIQUE_ACCEPT_STATS,
03046                                              outputModuleId, now)
03047                << "</td>" << std::endl;
03048           *out << "  <td align=\"center\">"
03049                << eventServer->getEventRate(EventServer::SHORT_TERM_STATS,
03050                                             EventServer::UNIQUE_ACCEPT_STATS,
03051                                             outputModuleId, now)
03052                << "</td>" << std::endl;
03053           *out << "  <td align=\"center\">"
03054                << eventServer->getDataRate(EventServer::SHORT_TERM_STATS,
03055                                            EventServer::UNIQUE_ACCEPT_STATS,
03056                                            outputModuleId, now)
03057                << "</td>" << std::endl;
03058           *out << "  <td align=\"center\">"
03059                << eventServer->getDuration(EventServer::SHORT_TERM_STATS,
03060                                            EventServer::UNIQUE_ACCEPT_STATS,
03061                                            outputModuleId, now)
03062                << "</td>" << std::endl;
03063           *out << "</tr>" << std::endl;
03064         }
03066         // add a row with the totals
03067         if (initMsgCollection->size() > 1) {
03068           *out << "<tr>" << std::endl;
03069           *out << "  <td align=\"center\">Totals</td>" << std::endl;
03070           *out << "  <td align=\"center\">" << eventSum << "</td>" << std::endl;
03071           *out << "  <td align=\"center\">" << eventRateSum << "</td>" << std::endl;
03072           *out << "  <td align=\"center\">" << dataRateSum << "</td>" << std::endl;
03073           *out << "  <td align=\"center\">&nbsp;</td>" << std::endl;
03074           *out << "</tr>" << std::endl;
03075         }
03076         *out << "</table>" << std::endl;
03078         *out << "<h4 class=\"noBotMarg\">Accepted Events To All Consumers, Recent Results:</h4>" << std::endl;
03079         *out << "<font size=\"-1\">(Events can be double-counted if they are sent by multiple output modules or if they are sent to multiple consumers.)</font><br/><br/>" << std::endl;
03080         *out << "<table border=\"1\" width=\"100%\">" << std::endl;
03081         *out << "<tr>" << std::endl;
03082         *out << "  <th>HLT Output Module</th>" << std::endl;
03083         *out << "  <th>Event Count</th>" << std::endl;
03084         *out << "  <th>Event Rate</th>" << std::endl;
03085         *out << "  <th>Data Rate</th>" << std::endl;
03086         *out << "  <th>Duration (sec)</th>" << std::endl;
03087         *out << "</tr>" << std::endl;
03089         eventSum = 0.0;
03090         eventRateSum = 0.0;
03091         dataRateSum = 0.0;
03092         for (int idx = 0; idx < initMsgCollection->size(); ++idx) {
03093           InitMsgSharedPtr serializedProds = initMsgCollection->getElementAt(idx);
03094           InitMsgView initView(&(*serializedProds)[0]);
03095           uint32 outputModuleId = initView.outputModuleId();
03097           eventSum += eventServer->getEventCount(EventServer::SHORT_TERM_STATS,
03098                                                  EventServer::OUTPUT_STATS,
03099                                                  outputModuleId, now);
03100           eventRateSum += eventServer->getEventRate(EventServer::SHORT_TERM_STATS,
03101                                                     EventServer::OUTPUT_STATS,
03102                                                     outputModuleId, now);
03103           dataRateSum += eventServer->getDataRate(EventServer::SHORT_TERM_STATS,
03104                                                   EventServer::OUTPUT_STATS,
03105                                                   outputModuleId, now);
03107           *out << "<tr>" << std::endl;
03108           *out << "  <td align=\"center\">" << initView.outputModuleLabel()
03109                << "</td>" << std::endl;
03110           *out << "  <td align=\"center\">"
03111                << eventServer->getEventCount(EventServer::SHORT_TERM_STATS,
03112                                              EventServer::OUTPUT_STATS,
03113                                              outputModuleId, now)
03114                << "</td>" << std::endl;
03115           *out << "  <td align=\"center\">"
03116                << eventServer->getEventRate(EventServer::SHORT_TERM_STATS,
03117                                             EventServer::OUTPUT_STATS,
03118                                             outputModuleId, now)
03119                << "</td>" << std::endl;
03120           *out << "  <td align=\"center\">"
03121                << eventServer->getDataRate(EventServer::SHORT_TERM_STATS,
03122                                            EventServer::OUTPUT_STATS,
03123                                            outputModuleId, now)
03124                << "</td>" << std::endl;
03125           *out << "  <td align=\"center\">"
03126                << eventServer->getDuration(EventServer::SHORT_TERM_STATS,
03127                                            EventServer::OUTPUT_STATS,
03128                                            outputModuleId, now)
03129                << "</td>" << std::endl;
03130           *out << "</tr>" << std::endl;
03131         }
03133         // add a row with the totals
03134         if (initMsgCollection->size() > 1) {
03135           *out << "<tr>" << std::endl;
03136           *out << "  <td align=\"center\">Totals</td>" << std::endl;
03137           *out << "  <td align=\"center\">" << eventSum << "</td>" << std::endl;
03138           *out << "  <td align=\"center\">" << eventRateSum << "</td>" << std::endl;
03139           *out << "  <td align=\"center\">" << dataRateSum << "</td>" << std::endl;
03140           *out << "  <td align=\"center\">&nbsp;</td>" << std::endl;
03141           *out << "</tr>" << std::endl;
03142         }
03143         *out << "</table>" << std::endl;
03145         *out << "<h4 class=\"noBotMarg\">Input Events, Full Results:</h4>" << std::endl;
03146         *out << "<font size=\"-1\">(Events can be double-counted if they are sent by multiple output modules.)</font><br/><br/>" << std::endl;
03147         *out << "<table border=\"1\" width=\"100%\">" << std::endl;
03148         *out << "<tr>" << std::endl;
03149         *out << "  <th>HLT Output Module</th>" << std::endl;
03150         *out << "  <th>Event Count</th>" << std::endl;
03151         *out << "  <th>Event Rate</th>" << std::endl;
03152         *out << "  <th>Data Rate</th>" << std::endl;
03153         *out << "  <th>Duration (sec)</th>" << std::endl;
03154         *out << "</tr>" << std::endl;
03156         eventSum = 0.0;
03157         eventRateSum = 0.0;
03158         dataRateSum = 0.0;
03159         for (int idx = 0; idx < initMsgCollection->size(); ++idx) {
03160           InitMsgSharedPtr serializedProds = initMsgCollection->getElementAt(idx);
03161           InitMsgView initView(&(*serializedProds)[0]);
03162           uint32 outputModuleId = initView.outputModuleId();
03164           eventSum += eventServer->getEventCount(EventServer::LONG_TERM_STATS,
03165                                                  EventServer::INPUT_STATS,
03166                                                  outputModuleId, now);
03167           eventRateSum += eventServer->getEventRate(EventServer::LONG_TERM_STATS,
03168                                                     EventServer::INPUT_STATS,
03169                                                     outputModuleId, now);
03170           dataRateSum += eventServer->getDataRate(EventServer::LONG_TERM_STATS,
03171                                                   EventServer::INPUT_STATS,
03172                                                   outputModuleId, now);
03174           *out << "<tr>" << std::endl;
03175           *out << "  <td align=\"center\">" << initView.outputModuleLabel()
03176                << "</td>" << std::endl;
03177           *out << "  <td align=\"center\">"
03178                << eventServer->getEventCount(EventServer::LONG_TERM_STATS,
03179                                              EventServer::INPUT_STATS,
03180                                              outputModuleId, now)
03181                << "</td>" << std::endl;
03182           *out << "  <td align=\"center\">"
03183                << eventServer->getEventRate(EventServer::LONG_TERM_STATS,
03184                                             EventServer::INPUT_STATS,
03185                                             outputModuleId, now)
03186                << "</td>" << std::endl;
03187           *out << "  <td align=\"center\">"
03188                << eventServer->getDataRate(EventServer::LONG_TERM_STATS,
03189                                            EventServer::INPUT_STATS,
03190                                            outputModuleId, now)
03191                << "</td>" << std::endl;
03192           *out << "  <td align=\"center\">"
03193                << eventServer->getDuration(EventServer::LONG_TERM_STATS,
03194                                            EventServer::INPUT_STATS,
03195                                            outputModuleId, now)
03196                << "</td>" << std::endl;
03197           *out << "</tr>" << std::endl;
03198         }
03200         // add a row with the totals
03201         if (initMsgCollection->size() > 1) {
03202           *out << "<tr>" << std::endl;
03203           *out << "  <td align=\"center\">Totals</td>" << std::endl;
03204           *out << "  <td align=\"center\">" << eventSum << "</td>" << std::endl;
03205           *out << "  <td align=\"center\">" << eventRateSum << "</td>" << std::endl;
03206           *out << "  <td align=\"center\">" << dataRateSum << "</td>" << std::endl;
03207           *out << "  <td align=\"center\">&nbsp;</td>" << std::endl;
03208           *out << "</tr>" << std::endl;
03209         }
03210         *out << "</table>" << std::endl;
03212         *out << "<h4 class=\"noBotMarg\">Accepted Unique Events, Full Results:</h4>" << std::endl;
03213         *out << "<font size=\"-1\">(Events can be double-counted if they are sent by multiple output modules.)</font><br/><br/>" << std::endl;
03214         *out << "<table border=\"1\" width=\"100%\">" << std::endl;
03215         *out << "<tr>" << std::endl;
03216         *out << "  <th>HLT Output Module</th>" << std::endl;
03217         *out << "  <th>Event Count</th>" << std::endl;
03218         *out << "  <th>Event Rate</th>" << std::endl;
03219         *out << "  <th>Data Rate</th>" << std::endl;
03220         *out << "  <th>Duration (sec)</th>" << std::endl;
03221         *out << "</tr>" << std::endl;
03223         eventSum = 0.0;
03224         eventRateSum = 0.0;
03225         dataRateSum = 0.0;
03226         for (int idx = 0; idx < initMsgCollection->size(); ++idx) {
03227           InitMsgSharedPtr serializedProds = initMsgCollection->getElementAt(idx);
03228           InitMsgView initView(&(*serializedProds)[0]);
03229           uint32 outputModuleId = initView.outputModuleId();
03231           eventSum += eventServer->getEventCount(EventServer::LONG_TERM_STATS,
03232                                                  EventServer::UNIQUE_ACCEPT_STATS,
03233                                                  outputModuleId, now);
03234           eventRateSum += eventServer->getEventRate(EventServer::LONG_TERM_STATS,
03235                                                     EventServer::UNIQUE_ACCEPT_STATS,
03236                                                     outputModuleId, now);
03237           dataRateSum += eventServer->getDataRate(EventServer::LONG_TERM_STATS,
03238                                                   EventServer::UNIQUE_ACCEPT_STATS,
03239                                                   outputModuleId, now);
03241           *out << "<tr>" << std::endl;
03242           *out << "  <td align=\"center\">" << initView.outputModuleLabel()
03243                << "</td>" << std::endl;
03244           *out << "  <td align=\"center\">"
03245                << eventServer->getEventCount(EventServer::LONG_TERM_STATS,
03246                                              EventServer::UNIQUE_ACCEPT_STATS,
03247                                              outputModuleId, now)
03248                << "</td>" << std::endl;
03249           *out << "  <td align=\"center\">"
03250                << eventServer->getEventRate(EventServer::LONG_TERM_STATS,
03251                                             EventServer::UNIQUE_ACCEPT_STATS,
03252                                             outputModuleId, now)
03253                << "</td>" << std::endl;
03254           *out << "  <td align=\"center\">"
03255                << eventServer->getDataRate(EventServer::LONG_TERM_STATS,
03256                                            EventServer::UNIQUE_ACCEPT_STATS,
03257                                            outputModuleId, now)
03258                << "</td>" << std::endl;
03259           *out << "  <td align=\"center\">"
03260                << eventServer->getDuration(EventServer::LONG_TERM_STATS,
03261                                            EventServer::UNIQUE_ACCEPT_STATS,
03262                                            outputModuleId, now)
03263                << "</td>" << std::endl;
03264           *out << "</tr>" << std::endl;
03265         }
03267         // add a row with the totals
03268         if (initMsgCollection->size() > 1) {
03269           *out << "<tr>" << std::endl;
03270           *out << "  <td align=\"center\">Totals</td>" << std::endl;
03271           *out << "  <td align=\"center\">" << eventSum << "</td>" << std::endl;
03272           *out << "  <td align=\"center\">" << eventRateSum << "</td>" << std::endl;
03273           *out << "  <td align=\"center\">" << dataRateSum << "</td>" << std::endl;
03274           *out << "  <td align=\"center\">&nbsp;</td>" << std::endl;
03275           *out << "</tr>" << std::endl;
03276         }
03277         *out << "</table>" << std::endl;
03279         *out << "<h4 class=\"noBotMarg\">Accepted Events To All Consumers, Full Results:</h4>" << std::endl;
03280         *out << "<font size=\"-1\">(Events can be double-counted if they are sent by multiple output modules or if they are sent to multiple consumers.)</font><br/><br/>" << std::endl;
03281         *out << "<table border=\"1\" width=\"100%\">" << std::endl;
03282         *out << "<tr>" << std::endl;
03283         *out << "  <th>HLT Output Module</th>" << std::endl;
03284         *out << "  <th>Event Count</th>" << std::endl;
03285         *out << "  <th>Event Rate</th>" << std::endl;
03286         *out << "  <th>Data Rate</th>" << std::endl;
03287         *out << "  <th>Duration (sec)</th>" << std::endl;
03288         *out << "</tr>" << std::endl;
03290         eventSum = 0.0;
03291         eventRateSum = 0.0;
03292         dataRateSum = 0.0;
03293         for (int idx = 0; idx < initMsgCollection->size(); ++idx) {
03294           InitMsgSharedPtr serializedProds = initMsgCollection->getElementAt(idx);
03295           InitMsgView initView(&(*serializedProds)[0]);
03296           uint32 outputModuleId = initView.outputModuleId();
03298           eventSum += eventServer->getEventCount(EventServer::LONG_TERM_STATS,
03299                                                  EventServer::OUTPUT_STATS,
03300                                                  outputModuleId, now);
03301           eventRateSum += eventServer->getEventRate(EventServer::LONG_TERM_STATS,
03302                                                     EventServer::OUTPUT_STATS,
03303                                                     outputModuleId, now);
03304           dataRateSum += eventServer->getDataRate(EventServer::LONG_TERM_STATS,
03305                                                   EventServer::OUTPUT_STATS,
03306                                                   outputModuleId, now);
03308           *out << "<tr>" << std::endl;
03309           *out << "  <td align=\"center\">" << initView.outputModuleLabel()
03310                << "</td>" << std::endl;
03311           *out << "  <td align=\"center\">"
03312                << eventServer->getEventCount(EventServer::LONG_TERM_STATS,
03313                                              EventServer::OUTPUT_STATS,
03314                                              outputModuleId, now)
03315                << "</td>" << std::endl;
03316           *out << "  <td align=\"center\">"
03317                << eventServer->getEventRate(EventServer::LONG_TERM_STATS,
03318                                             EventServer::OUTPUT_STATS,
03319                                             outputModuleId, now)
03320                << "</td>" << std::endl;
03321           *out << "  <td align=\"center\">"
03322                << eventServer->getDataRate(EventServer::LONG_TERM_STATS,
03323                                            EventServer::OUTPUT_STATS,
03324                                            outputModuleId, now)
03325                << "</td>" << std::endl;
03326           *out << "  <td align=\"center\">"
03327                << eventServer->getDuration(EventServer::LONG_TERM_STATS,
03328                                            EventServer::OUTPUT_STATS,
03329                                            outputModuleId, now)
03330                << "</td>" << std::endl;
03331           *out << "</tr>" << std::endl;
03332         }
03334         // add a row with the totals
03335         if (initMsgCollection->size() > 1) {
03336           *out << "<tr>" << std::endl;
03337           *out << "  <td align=\"center\">Totals</td>" << std::endl;
03338           *out << "  <td align=\"center\">" << eventSum << "</td>" << std::endl;
03339           *out << "  <td align=\"center\">" << eventRateSum << "</td>" << std::endl;
03340           *out << "  <td align=\"center\">" << dataRateSum << "</td>" << std::endl;
03341           *out << "  <td align=\"center\">&nbsp;</td>" << std::endl;
03342           *out << "</tr>" << std::endl;
03343         }
03344         *out << "</table>" << std::endl;
03346         *out << "<h4>Timing:</h4>" << std::endl;
03347         *out << "<table border=\"1\" width=\"100%\">" << std::endl;
03348         *out << "<tr>" << std::endl;
03349         *out << "  <th>&nbsp;</th>" << std::endl;
03350         *out << "  <th>CPU Time<br/>(sec)</th>" << std::endl;
03351         *out << "  <th>CPU Time<br/>Percent</th>" << std::endl;
03352         *out << "  <th>Real Time<br/>(sec)</th>" << std::endl;
03353         *out << "  <th>Real Time<br/>Percent</th>" << std::endl;
03354         *out << "  <th>Duration (sec)</th>" << std::endl;
03355         *out << "</tr>" << std::endl;
03356         *out << "<tr>" << std::endl;
03357         *out << "  <td align=\"center\">Recent Results</td>" << std::endl;
03358         *out << "  <td align=\"center\">"
03359              << eventServer->getInternalTime(EventServer::SHORT_TERM_STATS,
03360                                              EventServer::CPUTIME,
03361                                              now)
03362              << "</td>" << std::endl;
03363         *out << "  <td align=\"center\">"
03364              << 100 * eventServer->getTimeFraction(EventServer::SHORT_TERM_STATS,
03365                                                    EventServer::CPUTIME,
03366                                                    now)
03367              << "</td>" << std::endl;
03368         *out << "  <td align=\"center\">"
03369              << eventServer->getInternalTime(EventServer::SHORT_TERM_STATS,
03370                                              EventServer::REALTIME,
03371                                              now)
03372              << "</td>" << std::endl;
03373         *out << "  <td align=\"center\">"
03374              << 100 * eventServer->getTimeFraction(EventServer::SHORT_TERM_STATS,
03375                                                    EventServer::REALTIME,
03376                                                    now)
03377              << "</td>" << std::endl;
03378         *out << "  <td align=\"center\">"
03379              << eventServer->getTotalTime(EventServer::SHORT_TERM_STATS,
03380                                           EventServer::REALTIME,
03381                                           now)
03382              << "</td>" << std::endl;
03383         *out << "</tr>" << std::endl;
03384         *out << "<tr>" << std::endl;
03385         *out << "  <td align=\"center\">Full Results</td>" << std::endl;
03386         *out << "  <td align=\"center\">"
03387              << eventServer->getInternalTime(EventServer::LONG_TERM_STATS,
03388                                              EventServer::CPUTIME,
03389                                              now)
03390              << "</td>" << std::endl;
03391         *out << "  <td align=\"center\">"
03392              << 100 * eventServer->getTimeFraction(EventServer::LONG_TERM_STATS,
03393                                                    EventServer::CPUTIME,
03394                                                    now)
03395              << "</td>" << std::endl;
03396         *out << "  <td align=\"center\">"
03397              << eventServer->getInternalTime(EventServer::LONG_TERM_STATS,
03398                                              EventServer::REALTIME,
03399                                              now)
03400              << "</td>" << std::endl;
03401         *out << "  <td align=\"center\">"
03402              << 100 * eventServer->getTimeFraction(EventServer::LONG_TERM_STATS,
03403                                                    EventServer::REALTIME,
03404                                                    now)
03405              << "</td>" << std::endl;
03406         *out << "  <td align=\"center\">"
03407              << eventServer->getTotalTime(EventServer::LONG_TERM_STATS,
03408                                           EventServer::REALTIME,
03409                                           now)
03410              << "</td>" << std::endl;
03411         *out << "</tr>" << std::endl;
03412         *out << "</table>" << std::endl;
03414         *out << "<h3>Consumers:</h3>" << std::endl;
03415         std::map< uint32, boost::shared_ptr<ConsumerPipe> > consumerTable = 
03416           eventServer->getConsumerTable();
03417         if (consumerTable.size() == 0)
03418         {
03419           *out << "No consumers are currently registered with "
03420                << "this Storage Manager instance.<br/>" << std::endl;
03421         }
03422         else
03423         {
03424           std::map< uint32, boost::shared_ptr<ConsumerPipe> >::const_iterator 
03425             consumerIter;
03427           // ************************************************************
03428           // * Consumer summary table
03429           // ************************************************************
03430           *out << "<h4>Summary:</h4>" << std::endl;
03431           *out << "<table border=\"1\" width=\"100%\">" << std::endl;
03432           *out << "<tr>" << std::endl;
03433           *out << "  <th>ID</th>" << std::endl;
03434           *out << "  <th>Name</th>" << std::endl;
03435           *out << "  <th>State</th>" << std::endl;
03436           *out << "  <th>Requested<br/>Rate</th>" << std::endl;
03437           *out << "  <th>Requested HLT<br/>Output Module</th>" << std::endl;
03438           *out << "  <th>Trigger<br/>Request</th>" << std::endl;
03439           *out << "</tr>" << std::endl;
03441           for (consumerIter = consumerTable.begin();
03442                consumerIter != consumerTable.end();
03443                ++consumerIter)
03444           {
03445             boost::shared_ptr<ConsumerPipe> consPtr = consumerIter->second;
03446             *out << "<tr>" << std::endl;
03447             *out << "  <td align=\"center\">" << consPtr->getConsumerId()
03448                  << "</td>" << std::endl;
03450             *out << "  <td align=\"center\">";
03451             if (consPtr->isProxyServer()) {
03452               *out << "Proxy Server";
03453             }
03454             else {
03455               *out << consPtr->getConsumerName();
03456             }
03457             *out << "</td>" << std::endl;
03459             *out << "  <td align=\"center\">";
03460             if (consPtr->isDisconnected()) {
03461               *out << "Disconnected";
03462             }
03463             else if (consPtr->isIdle()) {
03464               *out << "Idle";
03465             }
03466             else {
03467               *out << "Active";
03468             }
03469             *out << "</td>" << std::endl;
03471             *out << "  <td align=\"center\">" << consPtr->getRateRequest()
03472                  << " Hz</td>" << std::endl;
03473             if (consPtr->isProxyServer()) {
03474               *out << "  <td align=\"center\">&lt;all&gt;</td>" << std::endl;
03475             }
03476             else {
03477               std::string hltOut = consPtr->getHLTOutputSelection();
03478               if (hltOut.empty()) {
03479                 *out << "  <td align=\"center\">&lt;none&gt;</td>" << std::endl;
03480               }
03481               else {
03482                 *out << "  <td align=\"center\">" << hltOut
03483                      << "</td>" << std::endl;
03484               }
03485             }
03486             *out << "  <td align=\"center\">"
03487                  << InitMsgCollection::stringsToText(consPtr->getTriggerSelection(), 5)
03488                  << "</td>" << std::endl;
03490             *out << "</tr>" << std::endl;
03491           }
03492           *out << "</table>" << std::endl;
03494           // ************************************************************
03495           // * Recent results for queued events
03496           // ************************************************************
03497           *out << "<h4>Queued Events, Recent Results:</h4>" << std::endl;
03498           *out << "<table border=\"1\" width=\"100%\">" << std::endl;
03499           *out << "<tr>" << std::endl;
03500           *out << "  <th>ID</th>" << std::endl;
03501           *out << "  <th>Name</th>" << std::endl;
03502           *out << "  <th>Event Count</th>" << std::endl;
03503           *out << "  <th>Event Rate</th>" << std::endl;
03504           *out << "  <th>Data Rate</th>" << std::endl;
03505           *out << "  <th>Duration<br/>(sec)</th>" << std::endl;
03506           *out << "  <th>Average<br/>Queue Size</th>" << std::endl;
03507           *out << "</tr>" << std::endl;
03509           displayedConsumerCount = 0;
03510           eventSum = 0.0;
03511           eventRateSum = 0.0;
03512           dataRateSum = 0.0;
03513           for (consumerIter = consumerTable.begin();
03514                consumerIter != consumerTable.end();
03515                ++consumerIter)
03516           {
03517             boost::shared_ptr<ConsumerPipe> consPtr = consumerIter->second;
03518             if (consPtr->isDisconnected()) {continue;}
03520             ++displayedConsumerCount;
03521             eventSum += consPtr->getEventCount(ConsumerPipe::SHORT_TERM,
03522                                                ConsumerPipe::QUEUED_EVENTS,
03523                                                now);
03524             eventRateSum += consPtr->getEventRate(ConsumerPipe::SHORT_TERM,
03525                                                   ConsumerPipe::QUEUED_EVENTS,
03526                                                   now);
03527             dataRateSum += consPtr->getDataRate(ConsumerPipe::SHORT_TERM,
03528                                                 ConsumerPipe::QUEUED_EVENTS,
03529                                                 now);
03531             *out << "<tr>" << std::endl;
03532             *out << "  <td align=\"center\">" << consPtr->getConsumerId()
03533                  << "</td>" << std::endl;
03534             *out << "  <td align=\"center\">";
03535             if (consPtr->isProxyServer()) {
03536               *out << "Proxy Server";
03537             }
03538             else {
03539               *out << consPtr->getConsumerName();
03540             }
03541             *out << "</td>" << std::endl;
03543             *out << "  <td align=\"center\">"
03544                  << consPtr->getEventCount(ConsumerPipe::SHORT_TERM,
03545                                            ConsumerPipe::QUEUED_EVENTS,
03546                                            now)
03547                  << "</td>" << std::endl;
03548             *out << "  <td align=\"center\">"
03549                  << consPtr->getEventRate(ConsumerPipe::SHORT_TERM,
03550                                           ConsumerPipe::QUEUED_EVENTS,
03551                                           now)
03552                  << "</td>" << std::endl;
03553             *out << "  <td align=\"center\">"
03554                  << consPtr->getDataRate(ConsumerPipe::SHORT_TERM,
03555                                          ConsumerPipe::QUEUED_EVENTS,
03556                                          now)
03557                  << "</td>" << std::endl;
03558             *out << "  <td align=\"center\">"
03559                  << consPtr->getDuration(ConsumerPipe::SHORT_TERM,
03560                                          ConsumerPipe::QUEUED_EVENTS,
03561                                          now)
03562                  << "</td>" << std::endl;
03563             *out << "  <td align=\"center\">"
03564                  << consPtr->getAverageQueueSize(ConsumerPipe::SHORT_TERM,
03565                                                  ConsumerPipe::QUEUED_EVENTS,
03566                                                  now)
03567                  << "</td>" << std::endl;
03568             *out << "</tr>" << std::endl;
03569           }
03571           // add a row with the totals
03572           if (displayedConsumerCount > 1) {
03573             *out << "<tr>" << std::endl;
03574             *out << "  <td align=\"center\">&nbsp;</td>" << std::endl;
03575             *out << "  <td align=\"center\">Totals</td>" << std::endl;
03576             *out << "  <td align=\"center\">" << eventSum << "</td>" << std::endl;
03577             *out << "  <td align=\"center\">" << eventRateSum << "</td>" << std::endl;
03578             *out << "  <td align=\"center\">" << dataRateSum << "</td>" << std::endl;
03579             *out << "  <td align=\"center\">&nbsp;</td>" << std::endl;
03580             *out << "  <td align=\"center\">&nbsp;</td>" << std::endl;
03581             *out << "</tr>" << std::endl;
03582           }
03583           *out << "</table>" << std::endl;
03585           // ************************************************************
03586           // * Recent results for served events
03587           // ************************************************************
03588           *out << "<h4>Served Events, Recent Results:</h4>" << std::endl;
03589           *out << "<table border=\"1\" width=\"100%\">" << std::endl;
03590           *out << "<tr>" << std::endl;
03591           *out << "  <th>ID</th>" << std::endl;
03592           *out << "  <th>Name</th>" << std::endl;
03593           *out << "  <th>Event Count</th>" << std::endl;
03594           *out << "  <th>Event Rate</th>" << std::endl;
03595           *out << "  <th>Data Rate</th>" << std::endl;
03596           *out << "  <th>Duration (sec)</th>" << std::endl;
03597           *out << "</tr>" << std::endl;
03599           displayedConsumerCount = 0;
03600           eventSum = 0.0;
03601           eventRateSum = 0.0;
03602           dataRateSum = 0.0;
03603           for (consumerIter = consumerTable.begin();
03604                consumerIter != consumerTable.end();
03605                ++consumerIter)
03606           {
03607             boost::shared_ptr<ConsumerPipe> consPtr = consumerIter->second;
03608             if (consPtr->isDisconnected()) {continue;}
03610             ++displayedConsumerCount;
03611             eventSum += consPtr->getEventCount(ConsumerPipe::SHORT_TERM,
03612                                                ConsumerPipe::SERVED_EVENTS,
03613                                                now);
03614             eventRateSum += consPtr->getEventRate(ConsumerPipe::SHORT_TERM,
03615                                                   ConsumerPipe::SERVED_EVENTS,
03616                                                   now);
03617             dataRateSum += consPtr->getDataRate(ConsumerPipe::SHORT_TERM,
03618                                                 ConsumerPipe::SERVED_EVENTS,
03619                                                 now);
03621             *out << "<tr>" << std::endl;
03622             *out << "  <td align=\"center\">" << consPtr->getConsumerId()
03623                  << "</td>" << std::endl;
03624             *out << "  <td align=\"center\">";
03625             if (consPtr->isProxyServer()) {
03626               *out << "Proxy Server";
03627             }
03628             else {
03629               *out << consPtr->getConsumerName();
03630             }
03631             *out << "</td>" << std::endl;
03633             *out << "  <td align=\"center\">"
03634                  << consPtr->getEventCount(ConsumerPipe::SHORT_TERM,
03635                                            ConsumerPipe::SERVED_EVENTS,
03636                                            now)
03637                  << "</td>" << std::endl;
03638             *out << "  <td align=\"center\">"
03639                  << consPtr->getEventRate(ConsumerPipe::SHORT_TERM,
03640                                           ConsumerPipe::SERVED_EVENTS,
03641                                           now)
03642                  << "</td>" << std::endl;
03643             *out << "  <td align=\"center\">"
03644                  << consPtr->getDataRate(ConsumerPipe::SHORT_TERM,
03645                                          ConsumerPipe::SERVED_EVENTS,
03646                                          now)
03647                  << "</td>" << std::endl;
03648             *out << "  <td align=\"center\">"
03649                  << consPtr->getDuration(ConsumerPipe::SHORT_TERM,
03650                                          ConsumerPipe::SERVED_EVENTS,
03651                                          now)
03652                  << "</td>" << std::endl;
03653             *out << "</tr>" << std::endl;
03654           }
03656           // add a row with the totals
03657           if (displayedConsumerCount > 1) {
03658             *out << "<tr>" << std::endl;
03659             *out << "  <td align=\"center\">&nbsp;</td>" << std::endl;
03660             *out << "  <td align=\"center\">Totals</td>" << std::endl;
03661             *out << "  <td align=\"center\">" << eventSum << "</td>" << std::endl;
03662             *out << "  <td align=\"center\">" << eventRateSum << "</td>" << std::endl;
03663             *out << "  <td align=\"center\">" << dataRateSum << "</td>" << std::endl;
03664             *out << "  <td align=\"center\">&nbsp;</td>" << std::endl;
03665             *out << "</tr>" << std::endl;
03666           }
03667           *out << "</table>" << std::endl;
03669           // ************************************************************
03670           // * Full results for queued events
03671           // ************************************************************
03672           *out << "<h4>Queued Events, Full Results:</h4>" << std::endl;
03673           *out << "<table border=\"1\" width=\"100%\">" << std::endl;
03674           *out << "<tr>" << std::endl;
03675           *out << "  <th>ID</th>" << std::endl;
03676           *out << "  <th>Name</th>" << std::endl;
03677           *out << "  <th>Event Count</th>" << std::endl;
03678           *out << "  <th>Event Rate</th>" << std::endl;
03679           *out << "  <th>Data Rate</th>" << std::endl;
03680           *out << "  <th>Duration<br/>(sec)</th>" << std::endl;
03681           *out << "  <th>Average<br/>Queue Size</th>" << std::endl;
03682           *out << "</tr>" << std::endl;
03684           displayedConsumerCount = 0;
03685           eventSum = 0.0;
03686           eventRateSum = 0.0;
03687           dataRateSum = 0.0;
03688           for (consumerIter = consumerTable.begin();
03689                consumerIter != consumerTable.end();
03690                ++consumerIter)
03691           {
03692             boost::shared_ptr<ConsumerPipe> consPtr = consumerIter->second;
03693             if (consPtr->isDisconnected()) {continue;}
03695             ++displayedConsumerCount;
03696             eventSum += consPtr->getEventCount(ConsumerPipe::LONG_TERM,
03697                                                ConsumerPipe::QUEUED_EVENTS,
03698                                                now);
03699             eventRateSum += consPtr->getEventRate(ConsumerPipe::LONG_TERM,
03700                                                   ConsumerPipe::QUEUED_EVENTS,
03701                                                   now);
03702             dataRateSum += consPtr->getDataRate(ConsumerPipe::LONG_TERM,
03703                                                 ConsumerPipe::QUEUED_EVENTS,
03704                                                 now);
03706             *out << "<tr>" << std::endl;
03707             *out << "  <td align=\"center\">" << consPtr->getConsumerId()
03708                  << "</td>" << std::endl;
03709             *out << "  <td align=\"center\">";
03710             if (consPtr->isProxyServer()) {
03711               *out << "Proxy Server";
03712             }
03713             else {
03714               *out << consPtr->getConsumerName();
03715             }
03716             *out << "</td>" << std::endl;
03718             *out << "  <td align=\"center\">"
03719                  << consPtr->getEventCount(ConsumerPipe::LONG_TERM,
03720                                            ConsumerPipe::QUEUED_EVENTS,
03721                                            now)
03722                  << "</td>" << std::endl;
03723             *out << "  <td align=\"center\">"
03724                  << consPtr->getEventRate(ConsumerPipe::LONG_TERM,
03725                                           ConsumerPipe::QUEUED_EVENTS,
03726                                           now)
03727                  << "</td>" << std::endl;
03728             *out << "  <td align=\"center\">"
03729                  << consPtr->getDataRate(ConsumerPipe::LONG_TERM,
03730                                          ConsumerPipe::QUEUED_EVENTS,
03731                                          now)
03732                  << "</td>" << std::endl;
03733             *out << "  <td align=\"center\">"
03734                  << consPtr->getDuration(ConsumerPipe::LONG_TERM,
03735                                          ConsumerPipe::QUEUED_EVENTS,
03736                                          now)
03737                  << "</td>" << std::endl;
03738             *out << "  <td align=\"center\">"
03739                  << consPtr->getAverageQueueSize(ConsumerPipe::LONG_TERM,
03740                                                  ConsumerPipe::QUEUED_EVENTS,
03741                                                  now)
03742                  << "</td>" << std::endl;
03743             *out << "</tr>" << std::endl;
03744           }
03746           // add a row with the totals
03747           if (displayedConsumerCount > 1) {
03748             *out << "<tr>" << std::endl;
03749             *out << "  <td align=\"center\">&nbsp;</td>" << std::endl;
03750             *out << "  <td align=\"center\">Totals</td>" << std::endl;
03751             *out << "  <td align=\"center\">" << eventSum << "</td>" << std::endl;
03752             *out << "  <td align=\"center\">" << eventRateSum << "</td>" << std::endl;
03753             *out << "  <td align=\"center\">" << dataRateSum << "</td>" << std::endl;
03754             *out << "  <td align=\"center\">&nbsp;</td>" << std::endl;
03755             *out << "  <td align=\"center\">&nbsp;</td>" << std::endl;
03756             *out << "</tr>" << std::endl;
03757           }
03758           *out << "</table>" << std::endl;
03760           // ************************************************************
03761           // * Full results for served events
03762           // ************************************************************
03763           *out << "<h4>Served Events, Full Results:</h4>" << std::endl;
03764           *out << "<table border=\"1\" width=\"100%\">" << std::endl;
03765           *out << "<tr>" << std::endl;
03766           *out << "  <th>ID</th>" << std::endl;
03767           *out << "  <th>Name</th>" << std::endl;
03768           *out << "  <th>Event Count</th>" << std::endl;
03769           *out << "  <th>Event Rate</th>" << std::endl;
03770           *out << "  <th>Data Rate</th>" << std::endl;
03771           *out << "  <th>Duration (sec)</th>" << std::endl;
03772           *out << "</tr>" << std::endl;
03774           displayedConsumerCount = 0;
03775           eventSum = 0.0;
03776           eventRateSum = 0.0;
03777           dataRateSum = 0.0;
03778           for (consumerIter = consumerTable.begin();
03779                consumerIter != consumerTable.end();
03780                ++consumerIter)
03781           {
03782             boost::shared_ptr<ConsumerPipe> consPtr = consumerIter->second;
03783             if (consPtr->isDisconnected ()) {continue;}
03785             ++displayedConsumerCount;
03786             eventSum += consPtr->getEventCount(ConsumerPipe::LONG_TERM,
03787                                                ConsumerPipe::SERVED_EVENTS,
03788                                                now);
03789             eventRateSum += consPtr->getEventRate(ConsumerPipe::LONG_TERM,
03790                                                   ConsumerPipe::SERVED_EVENTS,
03791                                                   now);
03792             dataRateSum += consPtr->getDataRate(ConsumerPipe::LONG_TERM,
03793                                                 ConsumerPipe::SERVED_EVENTS,
03794                                                 now);
03796             *out << "<tr>" << std::endl;
03797             *out << "  <td align=\"center\">" << consPtr->getConsumerId()
03798                  << "</td>" << std::endl;
03799             *out << "  <td align=\"center\">";
03800             if (consPtr->isProxyServer()) {
03801               *out << "Proxy Server";
03802             }
03803             else {
03804               *out << consPtr->getConsumerName();
03805             }
03806             *out << "</td>" << std::endl;
03808             *out << "  <td align=\"center\">"
03809                  << consPtr->getEventCount(ConsumerPipe::LONG_TERM,
03810                                            ConsumerPipe::SERVED_EVENTS,
03811                                            now)
03812                  << "</td>" << std::endl;
03813             *out << "  <td align=\"center\">"
03814                  << consPtr->getEventRate(ConsumerPipe::LONG_TERM,
03815                                           ConsumerPipe::SERVED_EVENTS,
03816                                           now)
03817                  << "</td>" << std::endl;
03818             *out << "  <td align=\"center\">"
03819                  << consPtr->getDataRate(ConsumerPipe::LONG_TERM,
03820                                          ConsumerPipe::SERVED_EVENTS,
03821                                          now)
03822                  << "</td>" << std::endl;
03823             *out << "  <td align=\"center\">"
03824                  << consPtr->getDuration(ConsumerPipe::LONG_TERM,
03825                                          ConsumerPipe::SERVED_EVENTS,
03826                                          now)
03827                  << "</td>" << std::endl;
03828             *out << "</tr>" << std::endl;
03829           }
03831           // add a row with the totals
03832           if (displayedConsumerCount > 1) {
03833             *out << "<tr>" << std::endl;
03834             *out << "  <td align=\"center\">&nbsp;</td>" << std::endl;
03835             *out << "  <td align=\"center\">Totals</td>" << std::endl;
03836             *out << "  <td align=\"center\">" << eventSum << "</td>" << std::endl;
03837             *out << "  <td align=\"center\">" << eventRateSum << "</td>" << std::endl;
03838             *out << "  <td align=\"center\">" << dataRateSum << "</td>" << std::endl;
03839             *out << "  <td align=\"center\">&nbsp;</td>" << std::endl;
03840             *out << "</tr>" << std::endl;
03841           }
03842           *out << "</table>" << std::endl;
03843         }
03844       }
03845       else
03846       {
03847         *out << "<br/>Waiting for INIT messages from the filter units...<br/>"
03848              << std::endl;
03849       }
03850     }
03851     else
03852     {
03853       *out << "<br/>The system is unable to fetch the Event Server "
03854            << "instance or the Init Message Collection instance. "
03855            << "This is a (very) unexpected error and could "
03856            << "be caused by an uninitialized JobController.<br/>"
03857            << std::endl;
03858     }
03860     if(jc_->getInitMsgCollection().get() != NULL &&
03861        jc_->getInitMsgCollection()->size() > 0)
03862     {
03863       boost::shared_ptr<InitMsgCollection> initMsgCollection =
03864         jc_->getInitMsgCollection();
03865       *out << "<h3>HLT Trigger Paths:</h3>" << std::endl;
03866       *out << "<table border=\"1\" width=\"100%\">" << std::endl;
03868       {
03869         InitMsgSharedPtr serializedProds = initMsgCollection->getLastElement();
03870         InitMsgView initView(&(*serializedProds)[0]);
03871         Strings triggerNameList;
03872         initView.hltTriggerNames(triggerNameList);
03874         *out << "<tr>" << std::endl;
03875         *out << "  <td align=\"left\" valign=\"top\">"
03876              << "Full Trigger List</td>" << std::endl;
03877         *out << "  <td align=\"left\" valign=\"top\">"
03878              << InitMsgCollection::stringsToText(triggerNameList, 0)
03879              << "</td>" << std::endl;
03880         *out << "</tr>" << std::endl;
03881       }
03883       for (int idx = 0; idx < initMsgCollection->size(); ++idx) {
03884         InitMsgSharedPtr serializedProds = initMsgCollection->getElementAt(idx);
03885         InitMsgView initView(&(*serializedProds)[0]);
03886         Strings triggerSelectionList;
03887         initView.hltTriggerSelections(triggerSelectionList);
03889         *out << "<tr>" << std::endl;
03890         *out << "  <td align=\"left\" valign=\"top\">"
03891              << initView.outputModuleLabel()
03892              << " Output Module</td>" << std::endl;
03893         *out << "  <td align=\"left\" valign=\"top\">"
03894              << InitMsgCollection::stringsToText(triggerSelectionList, 0)
03895              << "</td>" << std::endl;
03896         *out << "</tr>" << std::endl;
03897       }
03899       *out << "</table>" << std::endl;
03900     }
03901   }
03902   else
03903   {
03904     *out << "<br/>Event server statistics are only available when the "
03905          << "Storage Manager is in the Enabled state.<br/>" << std::endl;
03906   }
03908   *out << "<br/><hr/>" << std::endl;
03909   char timeString[64];
03910   time_t now = time(0);
03911   strftime(timeString, 60, "%d-%b-%Y %H:%M:%S %Z", localtime(&now));
03912   *out << "Last updated: " << timeString << std::endl;;
03913   *out << "</body>" << std::endl;
03914   *out << "</html>" << std::endl;
03915 }
03918 void StorageManager::consumerWebPage(xgi::Input *in, xgi::Output *out)
03919   throw (xgi::exception::Exception)
03920 {
03921   if(fsm_.stateName()->toString() == "Enabled")
03922   { // what is the right place for this?
03924   std::string consumerName = "None provided";
03925   std::string consumerPriority = "normal";
03926   std::string consumerRequest = "<>";
03927   std::string consumerHost = in->getenv("REMOTE_HOST");
03929   // read the consumer registration message from the https input stream
03930   std::string lengthString = in->getenv("CONTENT_LENGTH");
03931   unsigned long contentLength = std::atol(lengthString.c_str());
03932   if (contentLength > 0)
03933   {
03934     auto_ptr< vector<char> > bufPtr(new vector<char>(contentLength));
03935     in->read(&(*bufPtr)[0], contentLength);
03936     ConsRegRequestView requestMessage(&(*bufPtr)[0]);
03937     consumerName = requestMessage.getConsumerName();
03938     consumerPriority = requestMessage.getConsumerPriority();
03939     std::string reqString = requestMessage.getRequestParameterSet();
03940     if (reqString.size() >= 2) consumerRequest = reqString;
03941   }
03943   // resize the local buffer, if needed, to handle a minimal response message
03944   unsigned int responseSize = 200;
03945   if (mybuffer_.capacity() < responseSize) mybuffer_.resize(responseSize);
03947   // fetch the event server
03948   // (it and/or the job controller may not have been created yet)
03949   boost::shared_ptr<EventServer> eventServer;
03950   if (jc_.get() != NULL)
03951   {
03952     eventServer = jc_->getEventServer();
03953   }
03955   // if no event server, tell the consumer that we're not ready
03956   if (eventServer.get() == NULL)
03957   {
03958     // build the registration response into the message buffer
03959     ConsRegResponseBuilder respMsg(&mybuffer_[0], mybuffer_.capacity(),
03960                                    ConsRegResponseBuilder::ES_NOT_READY, 0);
03961     // debug message so that compiler thinks respMsg is used
03962     FDEBUG(20) << "Registration response size =  " <<
03963       respMsg.size() << std::endl;
03964   }
03965   else
03966   {
03967     // resize the local buffer, if needed, to handle a full response message
03968     int mapStringSize = eventServer->getSelectionTableStringSize();
03969     responseSize += (int) (2.5 * mapStringSize);
03970     if (mybuffer_.capacity() < responseSize) mybuffer_.resize(responseSize);
03972     // fetch the event selection request from the consumer request
03973     edm::ParameterSet requestParamSet(consumerRequest);
03974     Strings selectionRequest =
03975       EventSelector::getEventSelectionVString(requestParamSet);
03976     Strings modifiedRequest =
03977       eventServer->updateTriggerSelectionForStreams(selectionRequest);
03979     // pull the rate request out of the consumer parameter set, too
03980     double maxEventRequestRate =
03981       requestParamSet.getUntrackedParameter<double>("maxEventRequestRate", 1.0);
03983     // pull the HLT output module selection out of the PSet
03984     // (default is empty string)
03985     std::string hltOMLabel =
03986       requestParamSet.getUntrackedParameter<std::string>("SelectHLTOutput",
03987                                                          std::string());
03989     // create the local consumer interface and add it to the event server
03990     boost::shared_ptr<ConsumerPipe>
03991       consPtr(new ConsumerPipe(consumerName, consumerPriority,
03992                                activeConsumerTimeout_.value_,
03993                                idleConsumerTimeout_.value_,
03994                                modifiedRequest, maxEventRequestRate,
03995                                hltOMLabel,
03996                                consumerHost, consumerQueueSize_));
03997     eventServer->addConsumer(consPtr);
03998     // over-ride pushmode if not set in StorageManager
03999     if(("PushMode") == 0) && !pushMode_)
04000         consPtr->setPushMode(false);
04002     // build the registration response into the message buffer
04003     ConsRegResponseBuilder respMsg(&mybuffer_[0], mybuffer_.capacity(),
04004                                    0, consPtr->getConsumerId());
04006     // add the stream selection table to the proxy server response
04007     if (consPtr->isProxyServer()) {
04008       respMsg.setStreamSelectionTable(eventServer->getStreamSelectionTable());
04009     }
04011     // debug message so that compiler thinks respMsg is used
04012     FDEBUG(20) << "Registration response size =  " <<
04013       respMsg.size() << std::endl;
04014   }
04016   // send the response
04017   ConsRegResponseView responseMessage(&mybuffer_[0]);
04018   unsigned int len = responseMessage.size();
04020   out->getHTTPResponseHeader().addHeader("Content-Type", "application/octet-stream");
04021   out->getHTTPResponseHeader().addHeader("Content-Transfer-Encoding", "binary");
04022   out->write((char*) &mybuffer_[0],len);
04024   } else { // is this the right thing to send?
04025    // In wrong state for this message - return zero length stream, should return Msg NOTREADY
04026    int len = 0;
04027    out->getHTTPResponseHeader().addHeader("Content-Type", "application/octet-stream");
04028    out->getHTTPResponseHeader().addHeader("Content-Transfer-Encoding", "binary");
04029    out->write((char*) &mybuffer_[0],len);
04030   }
04032 }
04035 void StorageManager::DQMeventdataWebPage(xgi::Input *in, xgi::Output *out)
04036   throw (xgi::exception::Exception)
04037 {
04038   // default the message length to zero
04039   int len=0;
04041   // determine the consumer ID from the event request
04042   // message, if it is available.
04043   unsigned int consumerId = 0;
04044   std::string lengthString = in->getenv("CONTENT_LENGTH");
04045   unsigned int contentLength = std::atol(lengthString.c_str());
04046   if (contentLength > 0) 
04047   {
04048     auto_ptr< vector<char> > bufPtr(new vector<char>(contentLength));
04049     in->read(&(*bufPtr)[0], contentLength);
04050     OtherMessageView requestMessage(&(*bufPtr)[0]);
04051     if (requestMessage.code() == Header::DQMEVENT_REQUEST)
04052     {
04053       uint8 *bodyPtr = requestMessage.msgBody();
04054       consumerId = convert32(bodyPtr);
04055     }
04056   }
04058   // first test if StorageManager is in Enabled state and this is a valid request
04059   // there must also be DQM data available
04060   if(fsm_.stateName()->toString() == "Enabled" && consumerId != 0)
04061   {
04062     boost::shared_ptr<DQMEventServer> eventServer;
04063     if (jc_.get() != NULL)
04064     {
04065       eventServer = jc_->getDQMEventServer();
04066     }
04067     if (eventServer.get() != NULL)
04068     {
04069       boost::shared_ptr< std::vector<char> > bufPtr =
04070         eventServer->getDQMEvent(consumerId);
04071       if (bufPtr.get() != NULL)
04072       {
04073         DQMEventMsgView msgView(&(*bufPtr)[0]);
04075         // what if mybuffer_ is used in multiple threads? Can it happen?
04076         unsigned char* from = msgView.startAddress();
04077         unsigned int dsize = msgView.size();
04078         if(mybuffer_.capacity() < dsize) mybuffer_.resize(dsize);
04079         unsigned char* pos = (unsigned char*) &mybuffer_[0];
04081         copy(from,from+dsize,pos);
04082         len = dsize;
04083         FDEBUG(10) << "sending update at event " << msgView.eventNumberAtUpdate() << std::endl;
04084       }
04085     }
04087     // check if zero length is sent when there is no valid data
04088     // i.e. on getDQMEvent, can already send zero length if request is invalid
04089     out->getHTTPResponseHeader().addHeader("Content-Type", "application/octet-stream");
04090     out->getHTTPResponseHeader().addHeader("Content-Transfer-Encoding", "binary");
04091     out->write((char*) &mybuffer_[0],len);
04092   } // else send DONE as reponse (could be end of a run)
04093   else
04094   {
04095     // not an event request or not in enabled state, just send DONE message
04096     OtherMessageBuilder othermsg(&mybuffer_[0],Header::DONE);
04097     len = othermsg.size();
04099     out->getHTTPResponseHeader().addHeader("Content-Type", "application/octet-stream");
04100     out->getHTTPResponseHeader().addHeader("Content-Transfer-Encoding", "binary");
04101     out->write((char*) &mybuffer_[0],len);
04102   }
04104 }
04107 void StorageManager::DQMconsumerWebPage(xgi::Input *in, xgi::Output *out)
04108   throw (xgi::exception::Exception)
04109 {
04110   if(fsm_.stateName()->toString() == "Enabled")
04111   { // We need to be in the enabled state
04113     std::string consumerName = "None provided";
04114     std::string consumerPriority = "normal";
04115     std::string consumerRequest = "*";
04116     std::string consumerHost = in->getenv("REMOTE_HOST");
04118     // read the consumer registration message from the https input stream
04119     std::string lengthString = in->getenv("CONTENT_LENGTH");
04120     unsigned int contentLength = std::atol(lengthString.c_str());
04121     if (contentLength > 0)
04122     {
04123       auto_ptr< vector<char> > bufPtr(new vector<char>(contentLength));
04124       in->read(&(*bufPtr)[0], contentLength);
04125       ConsRegRequestView requestMessage(&(*bufPtr)[0]);
04126       consumerName = requestMessage.getConsumerName();
04127       consumerPriority = requestMessage.getConsumerPriority();
04128       // for DQM consumers top folder name is stored in the "parameteSet"
04129       std::string reqFolder = requestMessage.getRequestParameterSet();
04130       if (reqFolder.size() >= 1) consumerRequest = reqFolder;
04131     }
04133     // create the buffer to hold the registration reply message
04134     const int BUFFER_SIZE = 100;
04135     char msgBuff[BUFFER_SIZE];
04137     // fetch the DQMevent server
04138     // (it and/or the job controller may not have been created yet
04139     //  if not in the enabled state)
04140     boost::shared_ptr<DQMEventServer> eventServer;
04141     if (jc_.get() != NULL)
04142     {
04143       eventServer = jc_->getDQMEventServer();
04144     }
04146     // if no event server, tell the consumer that we're not ready
04147     if (eventServer.get() == NULL)
04148     {
04149       // build the registration response into the message buffer
04150       ConsRegResponseBuilder respMsg(msgBuff, BUFFER_SIZE,
04151                                      ConsRegResponseBuilder::ES_NOT_READY, 0);
04152       // debug message so that compiler thinks respMsg is used
04153       FDEBUG(20) << "Registration response size =  " <<
04154         respMsg.size() << std::endl;
04155     }
04156     else
04157     {
04158       // create the local consumer interface and add it to the event server
04159       boost::shared_ptr<DQMConsumerPipe>
04160         consPtr(new DQMConsumerPipe(consumerName, consumerPriority,
04161                                     DQMactiveConsumerTimeout_.value_,
04162                                     DQMidleConsumerTimeout_.value_,
04163                                     consumerRequest, consumerHost,
04164                                     DQMconsumerQueueSize_));
04165       eventServer->addConsumer(consPtr);
04166       // over-ride pushmode if not set in StorageManager
04167       if(("PushMode") == 0) && !pushMode_)
04168           consPtr->setPushMode(false);
04170       // initialize it straight away (should later pass in the folder name to
04171       // optionally change the selection on a register?
04172       consPtr->initializeSelection();
04174       // build the registration response into the message buffer
04175       ConsRegResponseBuilder respMsg(msgBuff, BUFFER_SIZE,
04176                                      0, consPtr->getConsumerId());
04177       // debug message so that compiler thinks respMsg is used
04178       FDEBUG(20) << "Registration response size =  " <<
04179         respMsg.size() << std::endl;
04180     }
04182     // send the response
04183     ConsRegResponseView responseMessage(msgBuff);
04184     unsigned int len = responseMessage.size();
04185     if(mybuffer_.capacity() < len) mybuffer_.resize(len);
04186     for (unsigned int i=0; i<len; ++i) mybuffer_[i]=msgBuff[i];
04188     out->getHTTPResponseHeader().addHeader("Content-Type", "application/octet-stream");
04189     out->getHTTPResponseHeader().addHeader("Content-Transfer-Encoding", "binary");
04190     out->write((char*) &mybuffer_[0],len);
04192   } else { // is this the right thing to send?
04193    // In wrong state for this message - return zero length stream, should return Msg NOTREADY
04194    int len = 0;
04195    out->getHTTPResponseHeader().addHeader("Content-Type", "application/octet-stream");
04196    out->getHTTPResponseHeader().addHeader("Content-Transfer-Encoding", "binary");
04197    out->write((char*) &mybuffer_[0],len);
04198   }
04200 }
04202 //------------------------------------------------------------------------------
04203 // Everything that has to do with the flash list goes here
04204 // 
04205 // - setupFlashList()                  - setup variables and initialize them
04206 // - actionPerformed(xdata::Event &e)  - update values in flash list
04207 //------------------------------------------------------------------------------
04208 void StorageManager::setupFlashList()
04209 {
04210   //----------------------------------------------------------------------------
04211   // Setup the header variables
04212   //----------------------------------------------------------------------------
04213   class_    = getApplicationDescriptor()->getClassName();
04214   instance_ = getApplicationDescriptor()->getInstance();
04215   std::string url;
04216   url       = getApplicationDescriptor()->getContextDescriptor()->getURL();
04217   url      += "/";
04218   url      += getApplicationDescriptor()->getURN();
04219   url_      = url;
04221   //----------------------------------------------------------------------------
04222   // Create/Retrieve an infospace which can be monitored
04223   //----------------------------------------------------------------------------
04224   std::ostringstream oss;
04225   oss << "urn:xdaq-monitorable-" << class_.value_;
04226   toolbox::net::URN urn = this->createQualifiedInfoSpace(oss.str());
04227   xdata::InfoSpace *is = xdata::getInfoSpaceFactory()->get(urn.toString());
04229   //----------------------------------------------------------------------------
04230   // Publish monitor data in monitorable info space -- Head
04231   //----------------------------------------------------------------------------
04232   is->fireItemAvailable("class",                &class_);
04233   is->fireItemAvailable("instance",             &instance_);
04234   is->fireItemAvailable("runNumber",            &runNumber_);
04235   is->fireItemAvailable("url",                  &url_);
04236   // Body
04237   is->fireItemAvailable("receivedFrames",       &receivedFrames_);
04238   // should this be here also??
04239   is->fireItemAvailable("storedEvents",         &storedEvents_);
04240   is->fireItemAvailable("closedFiles",          &closedFiles_);
04241   is->fireItemAvailable("receivedEvents",       &receivedEvents_);
04242   is->fireItemAvailable("receivedErrorEvents",  &receivedErrorEvents_);
04243   is->fireItemAvailable("namesOfStream",      &namesOfStream_);
04244   is->fireItemAvailable("namesOfOutMod",      &namesOfOutMod_);
04245   is->fireItemAvailable("dqmRecords",           &dqmRecords_);
04246   is->fireItemAvailable("receivedVolume",       &receivedVolume_);
04247   is->fireItemAvailable("storedVolume",         &storedVolume_);
04248   is->fireItemAvailable("memoryUsed",           &memoryUsed_);
04249   is->fireItemAvailable("instantBandwidth",     &instantBandwidth_);
04250   is->fireItemAvailable("instantRate",          &instantRate_);
04251   is->fireItemAvailable("instantLatency",       &instantLatency_);
04252   is->fireItemAvailable("maxBandwidth",         &maxBandwidth_);
04253   is->fireItemAvailable("minBandwidth",         &minBandwidth_);
04254   is->fireItemAvailable("duration",             &duration_);
04255   is->fireItemAvailable("totalSamples",         &totalSamples_);
04256   is->fireItemAvailable("meanBandwidth",        &meanBandwidth_);
04257   is->fireItemAvailable("meanRate",             &meanRate_);
04258   is->fireItemAvailable("meanLatency",          &meanLatency_);
04259   is->fireItemAvailable("STparameterSet",       &offConfig_);
04260   is->fireItemAvailable("stateName",            fsm_.stateName());
04261   is->fireItemAvailable("progressMarker",       &progressMarker_);
04262   is->fireItemAvailable("connectedRBs",         &connectedRBs_);
04263   is->fireItemAvailable("pushMode2Proxy",       &pushmode2proxy_);
04264   is->fireItemAvailable("collateDQM",           &collateDQM_);
04265   is->fireItemAvailable("archiveDQM",           &archiveDQM_);
04266   is->fireItemAvailable("archiveIntervalDQM",   &archiveIntervalDQM_);
04267   is->fireItemAvailable("purgeTimeDQM",         &purgeTimeDQM_);
04268   is->fireItemAvailable("readyTimeDQM",         &readyTimeDQM_);
04269   is->fireItemAvailable("filePrefixDQM",        &filePrefixDQM_);
04270   is->fireItemAvailable("useCompressionDQM",    &useCompressionDQM_);
04271   is->fireItemAvailable("compressionLevelDQM",  &compressionLevelDQM_);
04272   is->fireItemAvailable("nLogicalDisk",         &nLogicalDisk_);
04273   is->fireItemAvailable("fileCatalog",          &fileCatalog_);
04274   is->fireItemAvailable("fileName",             &fileName_);
04275   is->fireItemAvailable("filePath",             &filePath_);
04276   is->fireItemAvailable("setupLabel",           &setupLabel_);
04277   is->fireItemAvailable("highWaterMark",        &highWaterMark_);
04278   is->fireItemAvailable("lumiSectionTimeOut",   &lumiSectionTimeOut_);
04279   is->fireItemAvailable("exactFileSizeTest",    &exactFileSizeTest_);
04280   is->fireItemAvailable("fileClosingTestInterval",&fileClosingTestInterval_);
04281   is->fireItemAvailable("maxESEventRate",       &maxESEventRate_);
04282   is->fireItemAvailable("maxESDataRate",        &maxESDataRate_);
04283   is->fireItemAvailable("activeConsumerTimeout",&activeConsumerTimeout_);
04284   is->fireItemAvailable("idleConsumerTimeout",  &idleConsumerTimeout_);
04285   is->fireItemAvailable("consumerQueueSize",    &consumerQueueSize_);
04286   is->fireItemAvailable("esSelectedHLTOutputModule",&esSelectedHLTOutputModule_);
04287   //is->fireItemAvailable("fairShareES",          &fairShareES_);
04289   //----------------------------------------------------------------------------
04290   // Attach listener to myCounter_ to detect retrieval event
04291   //----------------------------------------------------------------------------
04292   is->addItemRetrieveListener("class",                this);
04293   is->addItemRetrieveListener("instance",             this);
04294   is->addItemRetrieveListener("runNumber",            this);
04295   is->addItemRetrieveListener("url",                  this);
04296   // Body
04297   is->addItemRetrieveListener("receivedFrames",       this);
04298   // should this be here also??
04299   //is->addItemRetrieveListener("storedEvents",         this);
04300   is->addItemRetrieveListener("receivedEvents",       this);
04301   is->addItemRetrieveListener("namesOfStream", this);
04302   is->addItemRetrieveListener("namesOfOutMod", this);
04303   is->addItemRetrieveListener("dqmRecords",           this);
04304   is->addItemRetrieveListener("receivedVolume",       this);
04305   is->addItemRetrieveListener("storedVolume",         this);
04306   is->addItemRetrieveListener("memoryUsed",           this);
04307   is->addItemRetrieveListener("instantBandwidth",     this);
04308   is->addItemRetrieveListener("instantRate",          this);
04309   is->addItemRetrieveListener("instantLatency",       this);
04310   is->addItemRetrieveListener("maxBandwidth",         this);
04311   is->addItemRetrieveListener("minBandwidth",         this);
04312   is->addItemRetrieveListener("duration",             this);
04313   is->addItemRetrieveListener("totalSamples",         this);
04314   is->addItemRetrieveListener("meanBandwidth",        this);
04315   is->addItemRetrieveListener("meanRate",             this);
04316   is->addItemRetrieveListener("meanLatency",          this);
04317   is->addItemRetrieveListener("STparameterSet",       this);
04318   is->addItemRetrieveListener("stateName",            this);
04319   is->addItemRetrieveListener("progressMarker",       this);
04320   is->addItemRetrieveListener("connectedRBs",         this);
04321   is->addItemRetrieveListener("pushMode2Proxy",       this);
04322   is->addItemRetrieveListener("collateDQM",           this);
04323   is->addItemRetrieveListener("archiveDQM",           this);
04324   is->addItemRetrieveListener("archiveIntervalDQM",   this);
04325   is->addItemRetrieveListener("purgeTimeDQM",         this);
04326   is->addItemRetrieveListener("readyTimeDQM",         this);
04327   is->addItemRetrieveListener("filePrefixDQM",        this);
04328   is->addItemRetrieveListener("useCompressionDQM",    this);
04329   is->addItemRetrieveListener("compressionLevelDQM",  this);
04330   is->addItemRetrieveListener("nLogicalDisk",         this);
04331   is->addItemRetrieveListener("fileCatalog",          this);
04332   is->addItemRetrieveListener("fileName",             this);
04333   is->addItemRetrieveListener("filePath",             this);
04334   is->addItemRetrieveListener("setupLabel",           this);
04335   is->addItemRetrieveListener("highWaterMark",        this);
04336   is->addItemRetrieveListener("lumiSectionTimeOut",   this);
04337   is->addItemRetrieveListener("exactFileSizeTest",    this);
04338   is->addItemRetrieveListener("fileClosingTestInterval",this);
04339   is->addItemRetrieveListener("maxESEventRate",       this);
04340   is->addItemRetrieveListener("maxESDataRate",        this);
04341   is->addItemRetrieveListener("activeConsumerTimeout",this);
04342   is->addItemRetrieveListener("idleConsumerTimeout",  this);
04343   is->addItemRetrieveListener("consumerQueueSize",    this);
04344   is->addItemRetrieveListener("esSelectedHLTOutputModule",this);
04345   //is->addItemRetrieveListener("fairShareES",          this);
04346   //----------------------------------------------------------------------------
04347 }
04350 void StorageManager::actionPerformed(xdata::Event& e)  
04351 {
04352   // 14-Oct-2008, KAB - skip all processing in this method, for now,
04353   // when the SM state is halted.  This will protect against the use
04354   // of un-initialized variables (like jc_).
04355   if (fsm_.stateName()->toString()=="Halted") {return;}
04356   if (fsm_.stateName()->toString()=="halting") {return;}
04357   // paranoia - also return if jc_.get() is null.  Although, to do this
04358   // right, we would need a lock
04359   if (jc_.get() == 0) {return;}
04361   if (e.type() == "ItemRetrieveEvent") {
04362     std::ostringstream oss;
04363     oss << "urn:xdaq-monitorable:" << class_.value_ << ":" << instance_.value_;
04364     xdata::InfoSpace *is = xdata::InfoSpace::get(oss.str());
04366     is->lock();
04367     std::string item = dynamic_cast<xdata::ItemRetrieveEvent&>(e).itemName();
04368     // Only update those locations which are not always up to date
04369     if      (item == "connectedRBs")
04370       connectedRBs_   = smrbsenders_.size();
04371     else if (item == "memoryUsed")
04372       memoryUsed_     = pool_->getMemoryUsage().getUsed();
04373     else if (item == "receivedVolume")
04374       receivedVolume_   = pmeter_->totalvolumemb();
04375     else if (item == "storedVolume")
04376       //storedVolume_   = pmeter_->totalvolumemb();
04377       storedVolume_   = store_receivedVolume_;
04378     else if (item == "closedFiles") {
04379         std::list<std::string>& files = jc_->get_filelist();
04380         std::list<std::string>& currfiles= jc_->get_currfiles();
04381         closedFiles_ = files.size() - currfiles.size();
04382     } else if (item == "openFiles") {
04383         std::list<std::string>& currfiles= jc_->get_currfiles();
04384         openFiles_ = currfiles.size();
04385     } else if (item == "receivedEventsFromOutMod" || item == "namesOfOutMod") {
04386       receivedEventsFromOutMod_.clear();
04387       namesOfOutMod_.clear();
04389       boost::shared_ptr<InitMsgCollection> initMsgCollection;
04390       if(jc_.get() != NULL && jc_->getInitMsgCollection().get() != NULL) {
04391         initMsgCollection = jc_->getInitMsgCollection();
04392       }
04393       idMap_iter oi(modId2ModOutMap_.begin()), oe(modId2ModOutMap_.end());
04394       for( ; oi != oe; ++oi) {
04395         std::string outputModuleLabel = oi->second;
04396         if (initMsgCollection.get() != NULL &&
04397             initMsgCollection->getOutputModuleName(oi->first) != "") {
04398           outputModuleLabel = initMsgCollection->getOutputModuleName(oi->first);
04399         }
04400         receivedEventsFromOutMod_.push_back(receivedEventsMap_[oi->second]);
04401         namesOfOutMod_.push_back(outputModuleLabel);
04402       }
04403 /* removed for temporary solution of using the monitoring loop
04405     } else if (item == "storedEvents" || item == "storedEventsInStream" || item == "namesOfStream") {
04406       // only clear and get values if in enabled state so latest values available if fail/stop
04407       if(jc_.get() != NULL) {
04408         storedEvents_ = 0;
04409         storedEventsInStream_.clear();
04410         namesOfStream_.clear();
04411         // following is thread safe as size of all_storedEvents is fixed (number of streams)
04412         std::vector<uint32> all_storedEvents = jc_->get_storedEvents();
04413         std::vector<std::string> all_storedNames = jc_->get_storedNames();
04414         for(std::vector<uint32>::iterator it = all_storedEvents.begin(), itEnd = all_storedEvents.end();
04415             it != itEnd; ++it) {
04416               storedEvents_ = storedEvents_ + (*it);
04417               storedEventsInStream_.push_back(*it);
04418         }
04419         for(std::vector<std::string>::iterator it = all_storedNames.begin(), itEnd = all_storedNames.end();
04420             it != itEnd; ++it) {
04421               namesOfStream_.push_back(*it);
04422         }
04423       }
04424 */
04425     } else if (item == "progressMarker")
04426       progressMarker_ = ProgressMarker::instance()->status();
04427     is->unlock();
04428   }
04430   if (e.type()=="ItemChangedEvent" && !(fsm_.stateName()->toString()=="Halted")) {
04431     string item = dynamic_cast<xdata::ItemChangedEvent&>(e).itemName();
04432     if ( item == "STparameterSet") {
04433       reconfigurationRequested_ = true;
04434     }
04435   }
04436 }
04438 void StorageManager::parseFileEntry(const std::string &in, std::string &out, 
04439                                     unsigned int &nev, unsigned long long &sz) const
04440 {
04441   unsigned int no;
04442   stringstream pippo;
04443   pippo << in;
04444   pippo >> no >> out >> nev >> sz;
04445 }
04447 std::string StorageManager::findStreamName(const std::string &in) const
04448 {
04449   //cout << "in findStreamName with string " << in << endl;
04450   string::size_type t = in.find("storageManager");
04452   string::size_type b;
04453   if(t != string::npos)
04454     {
04455       //cout << " storageManager is at " << t << endl;
04456       b = in.rfind(".",t-2);
04457       if(b!=string::npos) 
04458         {
04459           //cout << "looking for substring " << t-b-2 << "long" <<endl;
04460           //cout << " stream name should be at " << b+1 << endl;
04461           //cout << " will return name " << string(in.substr(b+1,t-b-2)) << endl;
04462           return string(in.substr(b+1,t-b-2));
04463         }
04464       else
04465         cout << " stream name is lost " << endl;
04466     }
04467   else
04468     cout << " storageManager is not found " << endl;
04469   return in;
04470 }
04473 bool StorageManager::configuring(toolbox::task::WorkLoop* wl)
04474 {
04475   try {
04476     LOG4CPLUS_INFO(getApplicationLogger(),"Start configuring ...");
04478     configureAction();
04480     LOG4CPLUS_INFO(getApplicationLogger(),"Finished configuring!");
04482     fsm_.fireEvent("ConfigureDone",this);
04483   }
04484   catch (cms::Exception& e) {
04485     reasonForFailedState_ = e.explainSelf();
04486     fsm_.fireFailed(reasonForFailedState_,this);
04487     return false;
04488   }
04489   catch (xcept::Exception &e) {
04490     reasonForFailedState_ = "configuring FAILED: " + (string)e.what();
04491     fsm_.fireFailed(reasonForFailedState_,this);
04492     return false;
04493   }
04494   catch (std::exception& e) {
04495     reasonForFailedState_  = e.what();
04496     fsm_.fireFailed(reasonForFailedState_,this);
04497     return false;
04498   }
04499   catch (...) {
04500     reasonForFailedState_  = "Unknown Exception while configuring";
04501     fsm_.fireFailed(reasonForFailedState_,this);
04502     return false;
04503   }
04505   reconfigurationRequested_ = false;
04506   return false;
04507 }
04510 void StorageManager::configureAction()
04511 {
04512   if(!edmplugin::PluginManager::isAvailable()) {
04513     edmplugin::PluginManager::configure(edmplugin::standard::config());
04514   }
04516   // give the JobController a configuration string and
04517   // get the registry data coming over the network (the first one)
04518   // Note that there is currently no run number check for the INIT
04519   // message, just the first one received once in Enabled state is used
04520   evf::ParameterSetRetriever smpset(offConfig_.value_);
04522   string my_config = smpset.getAsString();
04524   pushMode_ = (bool) pushmode2proxy_;
04525   smConfigString_    = my_config;
04526   smFileCatalog_     = fileCatalog_.toString();
04528   boost::shared_ptr<stor::Parameter> smParameter_ = stor::Configurator::instance()->getParameter();
04529   smParameter_ -> setFileCatalog(fileCatalog_.toString());
04530   smParameter_ -> setfileName(fileName_.toString());
04531   smParameter_ -> setfilePath(filePath_.toString());
04532   smParameter_ -> setmaxFileSize(maxFileSize_.value_);
04533   smParameter_ -> setsetupLabel(setupLabel_.toString());
04534   smParameter_ -> sethighWaterMark(highWaterMark_.value_);
04535   smParameter_ -> setlumiSectionTimeOut(lumiSectionTimeOut_.value_);
04536   smParameter_ -> setExactFileSizeTest(exactFileSizeTest_.value_);
04538   // check output locations and scripts before we continue
04539   checkDirectoryOK(filePath_.toString());
04540   if((bool)archiveDQM_) checkDirectoryOK(filePrefixDQM_.toString());
04542   // check whether the maxSize parameter in an SM output stream
04543   // is still specified in bytes (rather than MBytes).  (All we really
04544   // check is if the maxSize is unreasonably large after converting
04545   // it to bytes.)
04546   //@@EM this is done on the xdaq parameter if it is set (i.e. if >0),
04547   // otherwise on the cfg params
04548   if(smParameter_ ->maxFileSize()>0) {
04549     long long maxSize = 1048576 * (long long) smParameter_ -> maxFileSize();
04550     if (maxSize > 2E+13) {
04551       std::string errorString =  "The maxSize parameter (file size) ";
04552       errorString.append("from xdaq configuration is too large(");
04553       try {
04554         errorString.append(boost::lexical_cast<std::string>(maxSize));
04555       }
04556       catch (boost::bad_lexical_cast& blcExcpt) {
04557         errorString.append("???");
04558       }
04559       errorString.append(" bytes). ");
04560       errorString.append("Please check that this parameter is ");
04561       errorString.append("specified as the number of MBytes, not bytes. ");
04562       errorString.append("(The units for maxSize was changed from ");
04563       errorString.append("bytes to MBytes, and it is possible that ");
04564       errorString.append("your storage manager configuration ");
04565       errorString.append("needs to be updated to reflect this.)");
04567       throw cms::Exception("StorageManager","configureAction")
04568         << errorString;
04569     }
04570   } else {
04571     try {
04572       // create a parameter set from the configuration string
04573       PythonProcessDesc py_pdesc(smConfigString_);
04574       boost::shared_ptr<ProcessDesc> pdesc = py_pdesc.processDesc();
04575       boost::shared_ptr<edm::ParameterSet> smPSet = pdesc->getProcessPSet();
04577       // loop over each end path
04578       std::vector<std::string> allEndPaths = 
04579         smPSet->getParameter<std::vector<std::string> >("@end_paths");
04580       for(std::vector<std::string>::iterator endPathIter = allEndPaths.begin();
04581           endPathIter != allEndPaths.end(); ++endPathIter) {
04583         // loop over each element in the end path list (not sure why...)
04584         std::vector<std::string> anEndPath =
04585           smPSet->getParameter<std::vector<std::string> >((*endPathIter));
04586         for(std::vector<std::string>::iterator ep2Iter = anEndPath.begin();
04587             ep2Iter != anEndPath.end(); ++ep2Iter) {
04589           // fetch the end path parameter set
04590           edm::ParameterSet endPathPSet =
04591             smPSet->getParameter<edm::ParameterSet>((*ep2Iter));
04592           if (! endPathPSet.empty()) {
04593             std::string mod_type =
04594               endPathPSet.getParameter<std::string> ("@module_type");
04595             if (mod_type == "EventStreamFileWriter") {
04596               // convert the maxSize parameter value from MB to bytes
04597               long long maxSize = 1048576 *
04598                 (long long) endPathPSet.getParameter<int> ("maxSize");
04600               // test the maxSize value.  2E13 is somewhat arbitrary,
04601               // but ~18 TeraBytes seems larger than we would realistically
04602               // want, and it will catch stale (byte-based) values greater
04603               // than ~18 MBytes.)
04604               if (maxSize > 2E+13) {
04605                 std::string streamLabel =  endPathPSet.getParameter<std::string> ("streamLabel");
04606                 std::string errorString =  "The maxSize parameter (file size) ";
04607                 errorString.append("for stream ");
04608                 errorString.append(streamLabel);
04609                 errorString.append(" is too large (");
04610                 try {
04611                   errorString.append(boost::lexical_cast<std::string>(maxSize));
04612                 }
04613                 catch (boost::bad_lexical_cast& blcExcpt) {
04614                   errorString.append("???");
04615                 }
04616                 errorString.append(" bytes). ");
04617                 errorString.append("Please check that this parameter is ");
04618                 errorString.append("specified as the number of MBytes, not bytes. ");
04619                 errorString.append("(The units for maxSize was changed from ");
04620                 errorString.append("bytes to MBytes, and it is possible that ");
04621                 errorString.append("your storage manager configuration file ");
04622                 errorString.append("needs to be updated to reflect this.)");
04624                 throw cms::Exception("StorageManager","configureAction")
04625                   << errorString;
04626               }
04627             }
04628           }
04629         }
04630       }
04631     }
04632     catch (...) {
04633       // since the maxSize test is just a convenience, we'll ignore
04634       // exceptions and continue normally, for now.
04635     }
04636   }
04638   if (maxESEventRate_ < 0.0)
04639     maxESEventRate_ = 0.0;
04640   if (maxESDataRate_ < 0.0)
04641     maxESDataRate_ = 0.0;
04642   if (DQMmaxESEventRate_ < 0.0)
04643     DQMmaxESEventRate_ = 0.0;
04645   xdata::Integer cutoff(1);
04646   if (consumerQueueSize_ < cutoff)
04647     consumerQueueSize_ = cutoff;
04648   if (DQMconsumerQueueSize_ < cutoff)
04649     DQMconsumerQueueSize_ = cutoff;
04651   jc_.reset(new stor::JobController(my_config, getApplicationLogger(), &deleteSMBuffer));
04653   int disks(nLogicalDisk_);
04655   jc_->setNumberOfFileSystems(disks);
04656   jc_->setFileCatalog(smFileCatalog_);
04657   jc_->setSourceId(sourceId_);
04659   jc_->setCollateDQM(collateDQM_);
04660   jc_->setArchiveDQM(archiveDQM_);
04661   jc_->setArchiveIntervalDQM(archiveIntervalDQM_);
04662   jc_->setPurgeTimeDQM(purgeTimeDQM_);
04663   jc_->setReadyTimeDQM(readyTimeDQM_);
04664   jc_->setFilePrefixDQM(filePrefixDQM_);
04665   jc_->setUseCompressionDQM(useCompressionDQM_);
04666   jc_->setCompressionLevelDQM(compressionLevelDQM_);
04667   jc_->setFileClosingTestInterval(fileClosingTestInterval_);
04669   boost::shared_ptr<EventServer>
04670     eventServer(new EventServer(maxESEventRate_, maxESDataRate_,
04671                                 esSelectedHLTOutputModule_,
04672                                 fairShareES_));
04673   jc_->setEventServer(eventServer);
04674   boost::shared_ptr<DQMEventServer> DQMeventServer(new DQMEventServer(DQMmaxESEventRate_));
04675   jc_->setDQMEventServer(DQMeventServer);
04676   boost::shared_ptr<InitMsgCollection> initMsgCollection(new InitMsgCollection());
04677   jc_->setInitMsgCollection(initMsgCollection);
04678   jc_->setSMRBSenderList(&smrbsenders_);
04679 }
04682 bool StorageManager::enabling(toolbox::task::WorkLoop* wl)
04683 {
04684   if (reconfigurationRequested_) {
04685     reconfigurationRequested_ = false;
04687     try {
04688       LOG4CPLUS_INFO(getApplicationLogger(),"Start re-configuring ...");
04689       this->haltAction();
04690       this->configureAction();
04691       LOG4CPLUS_INFO(getApplicationLogger(),"Finished re-configuring!");
04692     }
04693     catch (cms::Exception& e) {
04694       reasonForFailedState_ = e.explainSelf();
04695       fsm_.fireFailed(reasonForFailedState_,this);
04696       return false;
04697     }
04698     catch (xcept::Exception &e) {
04699       reasonForFailedState_ = "re-configuring FAILED: " + (string)e.what();
04700       fsm_.fireFailed(reasonForFailedState_,this);
04701       return false;
04702     }
04703     catch (std::exception& e) {
04704       reasonForFailedState_  = e.what();
04705       fsm_.fireFailed(reasonForFailedState_,this);
04706       return false;
04707     }
04708     catch (...) {
04709       reasonForFailedState_  = "Unknown Exception while re-configuring";
04710       fsm_.fireFailed(reasonForFailedState_,this);
04711       return false;
04712     }
04713   }
04715   try {
04716     LOG4CPLUS_INFO(getApplicationLogger(),"Start enabling ...");
04718     smrbsenders_.clear();
04720     fileList_.clear();
04721     eventsInFile_.clear();
04722     storedEventsInStream_.clear();
04723     fileSize_.clear();
04724     storedEvents_ = 0;
04725     receivedEvents_ = 0;
04726     receivedErrorEvents_ = 0;
04727     storedVolume_ = 0;
04728     receivedVolume_ = 0;
04729     receivedEventsFromOutMod_.clear();
04730     namesOfStream_.clear();
04731     namesOfOutMod_.clear();
04732     receivedEventsMap_.clear();
04733     avEventSizeMap_.clear();
04734     avCompressRatioMap_.clear();
04735     modId2ModOutMap_.clear();
04736     storedEventsMap_.clear();
04737     dqmRecords_   = 0;
04738     closedFiles_  = 0;
04739     openFiles_  = 0;
04740     lastEventSeen_ = 0;
04741     lastErrorEventSeen_ = 0;
04742     jc_->start();
04744     boost::shared_ptr<InitMsgCollection> initMsgCollection = jc_->getInitMsgCollection();
04745     if (initMsgCollection.get() != 0) {
04746       initMsgCollection->clear();
04747     }
04749     LOG4CPLUS_INFO(getApplicationLogger(),"Finished enabling!");
04751     fsm_.fireEvent("EnableDone",this);
04752   }
04753   catch (xcept::Exception &e) {
04754     reasonForFailedState_ = "enabling FAILED: " + (string)e.what();
04755     fsm_.fireFailed(reasonForFailedState_,this);
04756     return false;
04757   }
04758   catch(...)
04759   {
04760     reasonForFailedState_  = "Unknown Exception while enabling";
04761     fsm_.fireFailed(reasonForFailedState_,this);
04762     return false;
04763   }
04764   startMonitoringWorkLoop();
04765   return false;
04766 }
04769 bool StorageManager::stopping(toolbox::task::WorkLoop* wl)
04770 {
04771   try {
04772     LOG4CPLUS_INFO(getApplicationLogger(),"Start stopping ...");
04774     stopAction();
04776     LOG4CPLUS_INFO(getApplicationLogger(),"Finished stopping!");
04778     fsm_.fireEvent("StopDone",this);
04779   }
04780   catch (xcept::Exception &e) {
04781     reasonForFailedState_ = "stopping FAILED: " + (string)e.what();
04782     fsm_.fireFailed(reasonForFailedState_,this);
04783     return false;
04784   }
04785   catch(...)
04786   {
04787     reasonForFailedState_  = "Unknown Exception while stopping";
04788     fsm_.fireFailed(reasonForFailedState_,this);
04789     return false;
04790   }
04792   return false;
04793 }
04796 bool StorageManager::halting(toolbox::task::WorkLoop* wl)
04797 {
04798   try {
04799     LOG4CPLUS_INFO(getApplicationLogger(),"Start halting ...");
04801     haltAction();
04803     LOG4CPLUS_INFO(getApplicationLogger(),"Finished halting!");
04805     fsm_.fireEvent("HaltDone",this);
04806   }
04807   catch (xcept::Exception &e) {
04808     reasonForFailedState_ = "halting FAILED: " + (string)e.what();
04809     fsm_.fireFailed(reasonForFailedState_,this);
04810     return false;
04811   }
04812   catch(...)
04813   {
04814     reasonForFailedState_  = "Unknown Exception while halting";
04815     fsm_.fireFailed(reasonForFailedState_,this);
04816     return false;
04817   }
04819   return false;
04820 }
04822 void StorageManager::stopAction()
04823 {
04824   jc_->stop();
04825   jc_->join();
04827   // 08-Oct-2008, KAB
04828   // The file statistics need to be determined after we close
04829   // the files, n'est pas?  And since the file closing is
04830   // done underneath jc.stop(), the following code needs
04831   // to come after jc_->stop(), I believe.
04832   std::list<std::string>& files = jc_->get_filelist();
04833   std::list<std::string>& currfiles= jc_->get_currfiles();
04834   closedFiles_ = files.size() - currfiles.size();
04835   openFiles_ = currfiles.size();
04837   unsigned int totInFile = 0;
04838   for(list<string>::const_iterator it = files.begin();
04839       it != files.end(); ++it)
04840   {
04841       string name;
04842       unsigned int nev;
04843       unsigned long long size;
04844       parseFileEntry((*it),name,nev,size);
04845       fileList_.push_back(name);
04846       eventsInFile_.push_back(nev);
04847       totInFile += nev;
04848       fileSize_.push_back((unsigned int) (size / 1048576));
04849       FDEBUG(5) << name << " " << nev << " " << size << std::endl;
04850   }
04851   receivedEventsFromOutMod_.clear();
04852   namesOfOutMod_.clear();
04854   boost::shared_ptr<InitMsgCollection> initMsgCollection;
04855   if(jc_.get() != NULL && jc_->getInitMsgCollection().get() != NULL) {
04856     initMsgCollection = jc_->getInitMsgCollection();
04857   }
04858   idMap_iter oi(modId2ModOutMap_.begin()), oe(modId2ModOutMap_.end());
04859   for( ; oi != oe; ++oi) {
04860       std::string outputModuleLabel = oi->second;
04861       if (initMsgCollection.get() != NULL &&
04862           initMsgCollection->getOutputModuleName(oi->first) != "") {
04863         outputModuleLabel = initMsgCollection->getOutputModuleName(oi->first);
04864       }
04865       receivedEventsFromOutMod_.push_back(receivedEventsMap_[oi->second]);
04866       namesOfOutMod_.push_back(outputModuleLabel);
04867   }
04868   storedEvents_ = 0;
04869   storedEventsInStream_.clear();
04870   // following is thread safe as size of all_storedEvents is fixed (number of streams)
04871   std::vector<uint32> all_storedEvents = jc_->get_storedEvents();
04872   for(std::vector<uint32>::iterator it = all_storedEvents.begin(), itEnd = all_storedEvents.end();
04873       it != itEnd; ++it) {
04874       storedEvents_ = storedEvents_ + (*it);
04875       storedEventsInStream_.push_back(*it);
04876   }
04878   // should clear the event server(s) last event/queue
04879   boost::shared_ptr<EventServer> eventServer;
04880   boost::shared_ptr<DQMEventServer> dqmeventServer;
04881   if (jc_.get() != NULL)
04882   {
04883     eventServer = jc_->getEventServer();
04884     dqmeventServer = jc_->getDQMEventServer();
04885   }
04886   if (eventServer.get() != NULL) eventServer->clearQueue();
04887   if (dqmeventServer.get() != NULL) dqmeventServer->clearQueue();
04888 }
04890 void StorageManager::haltAction()
04891 {
04892   stopAction();
04894   // make sure serialized product registry is cleared also as its used
04895   // to check state readiness for web transactions
04896   pushMode_ = false;
04898   {
04899     boost::mutex::scoped_lock sl(halt_lock_);
04900     jc_.reset();
04901   }
04902 }
04904 void StorageManager::checkDirectoryOK(const std::string path) const
04905 {
04906   struct stat64 buf;
04908   int retVal = stat64(path.c_str(), &buf);
04909   if(retVal !=0 )
04910   {
04911     edm::LogError("StorageManager") << "Directory or file " << path
04912                                     << " does not exist. Error=" << errno ;
04913     throw cms::Exception("StorageManager","checkDirectoryOK")
04914             << "Directory or file " << path << " does not exist. Error=" << errno << std::endl;
04915   }
04916 }
04920 xoap::MessageReference StorageManager::fsmCallback(xoap::MessageReference msg)
04921   throw (xoap::exception::Exception)
04922 {
04923   return fsm_.commandCallback(msg);
04924 }
04928 void StorageManager::sendDiscardMessage(unsigned int    rbBufferID, 
04929                                         unsigned int    hltInstance,
04930                                         unsigned int    msgType,
04931                                         string          hltClassName)
04932 {
04933   /*
04934   std::cout << "sendDiscardMessage ... " 
04935             << rbBufferID     << "  "
04936             << hltInstance    << "  "
04937             << msgType        << "  "
04938             << hltClassName   << std::endl;
04939   */
04941   set<xdaq::ApplicationDescriptor*> setOfRBs=
04942     getApplicationContext()->getDefaultZone()->
04943     getApplicationDescriptors(hltClassName.c_str());
04945   for (set<xdaq::ApplicationDescriptor*>::iterator 
04946          it=setOfRBs.begin();it!=setOfRBs.end();++it)
04947     {
04948       if ((*it)->getInstance()==hltInstance)
04949         {
04951           stor::FUProxy* proxy =  new stor::FUProxy(getApplicationDescriptor(),
04952                                                     *it,
04953                                                     getApplicationContext(),
04954                                                     pool_);
04955           if ( msgType == I2O_FU_DATA_DISCARD )
04956             proxy -> sendDataDiscard(rbBufferID);       
04957           else if ( msgType == I2O_FU_DQM_DISCARD )
04958             proxy -> sendDQMDiscard(rbBufferID);
04959           else assert("Unknown discard message type" == 0);
04960           delete proxy;
04961         }
04962     }
04963 }
04965 void StorageManager::startMonitoringWorkLoop() throw (evf::Exception)
04966 {
04967   try {
04968     wlMonitoring_=
04969       toolbox::task::getWorkLoopFactory()->getWorkLoop(sourceId_+"Monitoring",
04970                                                        "waiting");
04971     if (!wlMonitoring_->isActive()) wlMonitoring_->activate();
04972     asMonitoring_ = toolbox::task::bind(this,&StorageManager::monitoring,
04973                                       sourceId_+"Monitoring");
04974     wlMonitoring_->submit(asMonitoring_);
04975   }
04976   catch (xcept::Exception& e) {
04977     string msg = "Failed to start workloop 'Monitoring'.";
04978     XCEPT_RETHROW(evf::Exception,msg,e);
04979   }
04980 }
04983 bool StorageManager::monitoring(toolbox::task::WorkLoop* wl)
04984 {
04985   // @@EM if state is already "failed" then no reason to firefailed again 
04986   //      (in fact it will cause problems) so bail out !
04987   if(fsm_.stateName()->toString() == "Failed") return false;
04988   // @@EM Look for exceptions in the FragmentCollector thread, do a state transition if present
04989   if(stor::getSMFC_exceptionStatus()) {
04990     edm::LogError("StorageManager") << "Fatal BURP in FragmentCollector thread detected! \n"
04991        << stor::getSMFC_reason4Exception();
04993     reasonForFailedState_ = stor::getSMFC_reason4Exception();
04994     fsm_.fireFailed(reasonForFailedState_,this);
04995     return false; // stop monitoring workloop after going to failed state
04996   }
04998   ::sleep(10);
04999   if(jc_.get() != NULL && jc_->getInitMsgCollection().get() != NULL &&
05000      jc_->getInitMsgCollection()->size() > 0) {
05001     boost::mutex::scoped_lock sl(halt_lock_);
05002     if(jc_.use_count() != 0) {
05003       // this is needed only if using flashlist infospace (not for the moment)
05004       std::ostringstream oss;
05005       oss << "urn:xdaq-monitorable:" << class_.value_ << ":" << instance_.value_;
05006       xdata::InfoSpace *is = xdata::InfoSpace::get(oss.str());  
05007       is->lock();
05009       // now for separate stored events via monitoring loop (temporary solution?)
05010       // following is thread safe as size of all_storedEvents is fixed (number of streams)
05011       std::vector<uint32> all_storedEvents = jc_->get_storedEvents();
05012       if(all_storedEvents.begin() != all_storedEvents.end())
05013       {
05014         // only reset if there are stored events otherwise on stop stats are reset to zero
05015         // we want to keep them for retrieval
05016         storedEvents_ = 0;
05017         storedEventsInStream_.clear();
05018         namesOfStream_.clear();
05019         std::vector<std::string> all_storedNames = jc_->get_storedNames();
05020         for(std::vector<uint32>::iterator it = all_storedEvents.begin(), itEnd = all_storedEvents.end();
05021             it != itEnd; ++it) {
05022               storedEvents_ = storedEvents_ + (*it);
05023               storedEventsInStream_.push_back(*it);
05024         }
05025         for(std::vector<std::string>::iterator it = all_storedNames.begin(), itEnd = all_storedNames.end();
05026             it != itEnd; ++it) {
05027               namesOfStream_.push_back(*it);
05028         }
05029       }
05030       boost::shared_ptr<stor::SMOnlyStats> stored_stats = jc_->get_stats();
05031       store_samples_ = stored_stats->samples_;
05032       store_period4samples_ = stored_stats->period4samples_;
05033       store_instantBandwidth_ = stored_stats->instantBandwidth_;
05034       store_instantRate_ = stored_stats->instantRate_;
05035       store_instantLatency_ = stored_stats->instantLatency_;
05036       store_totalSamples_ = (unsigned long)stored_stats->totalSamples_;
05037       store_duration_ = stored_stats->duration_;
05038       store_meanBandwidth_ = stored_stats->meanBandwidth_;
05039       store_meanRate_ = stored_stats->meanRate_;
05040       store_meanLatency_ = stored_stats->meanLatency_;
05041       store_maxBandwidth_ = stored_stats->maxBandwidth_;
05042       store_minBandwidth_ = stored_stats->minBandwidth_;
05043       store_instantBandwidth2_ = stored_stats->instantBandwidth2_;
05044       store_instantRate2_ = stored_stats->instantRate2_;
05045       store_instantLatency2_ = stored_stats->instantLatency2_;
05046       store_totalSamples2_ = (unsigned long)stored_stats->totalSamples2_;
05047       store_duration2_ = stored_stats->duration2_;
05048       store_meanBandwidth2_ = stored_stats->meanBandwidth2_;
05049       store_meanRate2_ = stored_stats->meanRate2_;
05050       store_meanLatency2_ = stored_stats->meanLatency2_;
05051       store_maxBandwidth2_ = stored_stats->maxBandwidth2_;
05052       store_minBandwidth2_ = stored_stats->minBandwidth2_;
05053       store_receivedVolume_ = stored_stats->receivedVolume_;
05054       storedVolume_   = store_receivedVolume_;
05056       // end temporary solution
05058       std::list<std::string>& files = jc_->get_filelist();
05060       if(files.size()==0){is->unlock(); return true;}
05061       if(streams_.size()==0) {
05062         for(list<string>::const_iterator it = files.begin();
05063             it != files.end(); ++it)
05064           {
05065             string name;
05066             unsigned int nev;
05067             unsigned long long size;
05068             parseFileEntry((*it),name,nev,size);
05069             string sname = findStreamName(name);
05070             if(sname=="" || sname==name) continue;
05071             if(streams_.find(sname) == streams_.end())
05072               streams_.insert(pair<string,streammon>(sname,streammon()));
05073           }
05075       }
05076       for(ismap it = streams_.begin(); it != streams_.end(); ++it)
05077         {
05078           (*it).second.nclosedfiles_=0;
05079           (*it).second.nevents_ =0;
05080           (*it).second.totSizeInkBytes_=0;
05081         }
05083       for(list<string>::const_iterator it = files.begin();
05084           it != files.end(); ++it)
05085         {
05086           string name;
05087           unsigned int nev;
05088           unsigned long long size;
05089           parseFileEntry((*it),name,nev,size);
05090           string sname = findStreamName(name);
05091           if(sname=="" || sname==name) continue;
05092           if(streams_.find(sname) == streams_.end())
05093             streams_.insert(pair<string,streammon>(sname,streammon()));
05094           streams_[sname].nclosedfiles_++;
05095           streams_[sname].nevents_ += nev;
05096           streams_[sname].totSizeInkBytes_ += size >> 10;
05097         }
05098       is->unlock();
05099     }
05102   }
05104   return true;
05105 }
05108 // *** Provides factory method for the instantiation of SM applications
05109 // should probably use the MACRO? Could a XDAQ version change cause problems?
05110 extern "C" xdaq::Application
05111 *instantiate_StorageManager(xdaq::ApplicationStub * stub)
05112 {
05113   std::cout << "Going to construct a StorageManager instance "
05114             << std::endl;
05115   return new stor::StorageManager(stub);
05116 }

Generated on Tue Jun 9 17:34:59 2009 for CMSSW by  doxygen 1.5.4