CMS 3D CMS Logo

StorageManager.h

Go to the documentation of this file.
00001 #ifndef _StorageManager_h
00002 #define _StorageManager_h
00003 
00004 /*
00005    Author: Harry Cheung, FNAL
00006 
00007    Description:
00008      Storage Manager XDAQ application. It can receive and collect
00009      I2O frames to remake event data. 
00010 
00011      See CMS EventFilter wiki page for further notes.
00012 
00013    $Id: StorageManager.h,v 1.46 2008/12/19 23:32:36 biery Exp $
00014 */
00015 
00016 #include <string>
00017 #include <list>
00018 #include <map>
00019 
00020 #include "FWCore/PluginManager/interface/ProblemTracker.h"
00021 #include "FWCore/Utilities/interface/Exception.h"
00022 #include "FWCore/MessageService/interface/MessageServicePresence.h"
00023 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00024 
00025 #include "EventFilter/Utilities/interface/Exception.h"
00026 #include "EventFilter/Utilities/interface/Css.h"
00027 #include "EventFilter/Utilities/interface/RunBase.h"
00028 #include "EventFilter/Utilities/interface/StateMachine.h"
00029 
00030 #include "EventFilter/StorageManager/interface/JobController.h"
00031 #include "EventFilter/StorageManager/interface/SMPerformanceMeter.h"
00032 #include "EventFilter/StorageManager/interface/ForeverAverageCounter.h"
00033 #include "EventFilter/StorageManager/interface/SMFUSenderList.h"
00034 
00035 #include "FWCore/PluginManager/interface/PluginManager.h"
00036 
00037 #include "toolbox/mem/Reference.h"
00038 
00039 #include "xdaq/Application.h"
00040 #include "xdaq/ApplicationContext.h"
00041 
00042 #include "xdata/String.h"
00043 #include "xdata/UnsignedInteger32.h"
00044 #include "xdata/Integer.h"
00045 #include "xdata/Double.h"
00046 #include "xdata/Boolean.h"
00047 #include "xdata/Vector.h"
00048 
00049 #include "xgi/Input.h"
00050 #include "xgi/Output.h"
00051 #include "xgi/exception/Exception.h"
00052 
00053 #include "boost/shared_ptr.hpp"
00054 #include "boost/thread/thread.hpp"
00055 
00056 
00057 namespace stor {
00058 
00059   class StorageManager: public xdaq::Application, 
00060                         public xdata::ActionListener,
00061                         public evf::RunBase
00062   {
00063    public:
00064     StorageManager(xdaq::ApplicationStub* s) throw (xdaq::exception::Exception);
00065   
00066     ~StorageManager();
00067 
00068     // *** Updates the exported parameters
00069     xoap::MessageReference ParameterGet(xoap::MessageReference message)
00070     throw (xoap::exception::Exception);
00071 
00072     // *** Anything to do with the flash list
00073     void setupFlashList();
00074     void actionPerformed(xdata::Event& e);
00075 
00076     // *** Callbacks to be executed during transitional states
00077     bool configuring(toolbox::task::WorkLoop* wl);
00078     bool enabling(toolbox::task::WorkLoop* wl);
00079     bool stopping(toolbox::task::WorkLoop* wl);
00080     bool halting(toolbox::task::WorkLoop* wl);
00081 
00082     // *** FSM soap command callback
00083     xoap::MessageReference fsmCallback(xoap::MessageReference msg)
00084       throw (xoap::exception::Exception);
00085     // @@EM added monitoring workloop
00086     void startMonitoringWorkLoop() throw (evf::Exception);
00087     bool monitoring(toolbox::task::WorkLoop* wl);
00088     
00090    private:  
00091     void receiveRegistryMessage(toolbox::mem::Reference *ref);
00092     void receiveDataMessage(toolbox::mem::Reference *ref);
00093     void receiveErrorDataMessage(toolbox::mem::Reference *ref);
00094     void receiveDQMMessage(toolbox::mem::Reference *ref);
00095 
00096     void sendDiscardMessage(unsigned int, 
00097                             unsigned int, 
00098                             unsigned int, 
00099                             std::string);
00100 
00101     void configureAction();
00102     void stopAction();
00103     void haltAction();
00104 
00105     void checkDirectoryOK(const std::string dir) const;
00106 
00107     void defaultWebPage
00108       (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception);
00109     void css(xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception)
00110       {css_.css(in,out);}
00111     void rbsenderWebPage
00112       (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception);
00113     void streamerOutputWebPage
00114       (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception);
00115     void eventdataWebPage
00116       (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception);
00117     void headerdataWebPage
00118       (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception);
00119     void consumerWebPage
00120       (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception);
00121     void consumerListWebPage
00122       (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception);
00123     void eventServerWebPage
00124       (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception);
00125     void DQMeventdataWebPage
00126       (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception);
00127     void DQMconsumerWebPage
00128       (xgi::Input *in, xgi::Output *out) throw (xgi::exception::Exception);
00129 
00130 
00131     void parseFileEntry(const std::string &in, std::string &out, unsigned int &nev, unsigned long long &sz) const;
00132 
00133     std::string findStreamName(const std::string &in) const;
00134         
00135     // *** state machine related
00136     evf::StateMachine fsm_;
00137     std::string       reasonForFailedState_;
00138 
00139     edm::AssertHandler *ah_;
00140     edm::service::MessageServicePresence theMessageServicePresence;
00141     xdata::String offConfig_;
00142   
00143     boost::shared_ptr<stor::JobController> jc_;
00144     boost::mutex                           halt_lock_;
00145 
00146     xdata::Boolean pushmode2proxy_;
00147     xdata::Integer nLogicalDisk_;
00148     xdata::String  fileName_;
00149     xdata::String  filePath_;
00150     xdata::Integer maxFileSize_;
00151     xdata::String  setupLabel_;
00152     xdata::Double  highWaterMark_;
00153     xdata::Double  lumiSectionTimeOut_;
00154     xdata::String  fileCatalog_;
00155     xdata::Boolean exactFileSizeTest_;
00156     xdata::Integer fileClosingTestInterval_;
00157 
00158     bool pushMode_;
00159     std::string smConfigString_;
00160     std::string smFileCatalog_;
00161     bool reconfigurationRequested_;
00162 
00163     xdata::Boolean collateDQM_;
00164     xdata::Boolean archiveDQM_;
00165     xdata::Integer archiveIntervalDQM_;
00166     xdata::String  filePrefixDQM_;
00167     xdata::Integer purgeTimeDQM_;
00168     xdata::Integer readyTimeDQM_;
00169     xdata::Boolean useCompressionDQM_;
00170     xdata::Integer compressionLevelDQM_;
00171 
00172     evf::Css css_;
00173     xdata::UnsignedInteger32 receivedFrames_;
00174     int pool_is_set_;
00175     toolbox::mem::Pool *pool_;
00176 
00177     // added for Event Server
00178     std::vector<unsigned char> mybuffer_; //temporary buffer instead of using stack
00179     xdata::Double maxESEventRate_;  // hertz
00180     xdata::Double maxESDataRate_;  // MB/sec
00181     xdata::Integer activeConsumerTimeout_;  // seconds
00182     xdata::Integer idleConsumerTimeout_;  // seconds
00183     xdata::Integer consumerQueueSize_;
00184     xdata::Boolean fairShareES_;
00185     xdata::Double DQMmaxESEventRate_;  // hertz
00186     xdata::Integer DQMactiveConsumerTimeout_;  // seconds
00187     xdata::Integer DQMidleConsumerTimeout_;  // seconds
00188     xdata::Integer DQMconsumerQueueSize_;
00189     boost::mutex consumerInitMsgLock_;
00190     xdata::String esSelectedHLTOutputModule_;
00191 
00192     SMFUSenderList smrbsenders_;
00193     xdata::UnsignedInteger32 connectedRBs_;
00194 
00195     xdata::UnsignedInteger32 storedEvents_;
00196     xdata::UnsignedInteger32 receivedEvents_;
00197     xdata::UnsignedInteger32 receivedErrorEvents_;
00198     xdata::UnsignedInteger32 dqmRecords_;
00199     xdata::UnsignedInteger32 closedFiles_;
00200     xdata::UnsignedInteger32 openFiles_;
00201     xdata::Vector<xdata::String> fileList_;
00202     xdata::Vector<xdata::UnsignedInteger32> eventsInFile_;
00203     xdata::Vector<xdata::UnsignedInteger32> storedEventsInStream_;
00204     xdata::Vector<xdata::UnsignedInteger32> receivedEventsFromOutMod_;
00205     typedef std::map<std::string,uint32> countMap;
00206     typedef std::map<std::string, boost::shared_ptr<ForeverAverageCounter> > valueMap;
00207     typedef std::map<uint32,std::string> idMap;
00208     typedef std::map<uint32,std::string>::iterator idMap_iter;
00209     countMap receivedEventsMap_;
00210     valueMap avEventSizeMap_;
00211     valueMap avCompressRatioMap_;
00212     idMap modId2ModOutMap_;
00213     countMap storedEventsMap_;
00214     xdata::Vector<xdata::UnsignedInteger32> fileSize_;
00215     xdata::Vector<xdata::String> namesOfStream_;
00216     xdata::Vector<xdata::String> namesOfOutMod_;
00217 
00218     // *** for received data performance measurements
00219     void addMeasurement(unsigned long size);
00220     stor::SMPerformanceMeter *pmeter_;
00221 
00222     // *** measurements for last set of samples
00223     xdata::UnsignedInteger32 samples_; // number of samples/frames per measurement
00224     xdata::UnsignedInteger32 period4samples_; // time period per measurement
00225     xdata::Double instantBandwidth_; // bandwidth in MB/s
00226     xdata::Double instantRate_;      // number of frames/s
00227     xdata::Double instantLatency_;   // micro-seconds/frame
00228     xdata::Double maxBandwidth_;     // maximum bandwidth in MB/s
00229     xdata::Double minBandwidth_;     // minimum bandwidth in MB/s
00230     // *** measurements for last time period
00231     xdata::Double instantBandwidth2_;// bandwidth in MB/s
00232     xdata::Double instantRate2_;     // number of frames/s
00233     xdata::Double instantLatency2_;  // micro-seconds/frame
00234     xdata::Double maxBandwidth2_;    // maximum bandwidth in MB/s
00235     xdata::Double minBandwidth2_;    // minimum bandwidth in MB/s
00236 
00237     // *** measurements for all samples
00238     xdata::Double duration_;         // time for run in seconds
00239     xdata::UnsignedInteger32 totalSamples_; //number of samples/frames per measurement
00240     xdata::Double meanBandwidth_;    // bandwidth in MB/s
00241     xdata::Double meanRate_;         // number of frames/s
00242     xdata::Double meanLatency_;      // micro-seconds/frame
00243     xdata::Double receivedVolume_;   // total received data in MB
00244 
00245     xdata::Double duration2_;         // time for run in seconds
00246     xdata::UnsignedInteger32 totalSamples2_; //number of samples/frames per measurement
00247     xdata::Double meanBandwidth2_;    // bandwidth in MB/s
00248     xdata::Double meanRate2_;         // number of frames/s
00249     xdata::Double meanLatency2_;      // micro-seconds/frame
00250 
00251     // *** for stored data performance measurements
00252     // *** measurements for last set of samples
00253     xdata::UnsignedInteger32 store_samples_; // number of samples/frames per measurement
00254     xdata::UnsignedInteger32 store_period4samples_; // time period per measurement
00255     xdata::Double store_instantBandwidth_; // bandwidth in MB/s
00256     xdata::Double store_instantRate_;      // number of frames/s
00257     xdata::Double store_instantLatency_;   // micro-seconds/frame
00258     xdata::Double store_maxBandwidth_;     // maximum bandwidth in MB/s
00259     xdata::Double store_minBandwidth_;     // minimum bandwidth in MB/s
00260     // *** measurements for last time period
00261     xdata::Double store_instantBandwidth2_;// bandwidth in MB/s
00262     xdata::Double store_instantRate2_;     // number of frames/s
00263     xdata::Double store_instantLatency2_;  // micro-seconds/frame
00264     xdata::Double store_maxBandwidth2_;    // maximum bandwidth in MB/s
00265     xdata::Double store_minBandwidth2_;    // minimum bandwidth in MB/s
00266 
00267     // *** measurements for all samples
00268     xdata::Double store_duration_;         // time for run in seconds
00269     xdata::UnsignedInteger32 store_totalSamples_; //number of samples/frames per measurement
00270     xdata::Double store_meanBandwidth_;    // bandwidth in MB/s
00271     xdata::Double store_meanRate_;         // number of frames/s
00272     xdata::Double store_meanLatency_;      // micro-seconds/frame
00273     xdata::Double store_receivedVolume_;   // total received data in MB
00274 
00275     xdata::Double store_duration2_;         // time for run in seconds
00276     xdata::UnsignedInteger32 store_totalSamples2_; //number of samples/frames per measurement
00277     xdata::Double store_meanBandwidth2_;    // bandwidth in MB/s
00278     xdata::Double store_meanRate2_;         // number of frames/s
00279     xdata::Double store_meanLatency2_;      // micro-seconds/frame
00280 
00281     // *** additional flashlist contents (rest was already there)
00282     xdata::String            class_;
00283     xdata::UnsignedInteger32 instance_;
00284     xdata::String            url_;       
00285 
00286     xdata::Double            storedVolume_;
00287     xdata::UnsignedInteger32 memoryUsed_;
00288     xdata::String            progressMarker_;
00289 
00290     // @@EM workloop / action signature for monitoring
00291     toolbox::task::WorkLoop         *wlMonitoring_;      
00292     toolbox::task::ActionSignature  *asMonitoring_;
00293 
00294     // @@EM parameters monitored by workloop (not in flashlist just yet) 
00295     struct streammon{
00296       int               nclosedfiles_;
00297       int               nevents_;
00298       int               totSizeInkBytes_;
00299     };
00300 
00301     typedef std::map<std::string,streammon> smap;
00302     typedef std::map<std::string,streammon>::iterator ismap;
00303     smap         streams_;
00304 
00305     unsigned int lastEventSeen_; // report last seen event id
00306     unsigned int lastErrorEventSeen_; // report last error event id seen
00307     boost::mutex rblist_lock_;  // quick (temporary) fix for registration problem
00308 
00309     std::string sm_cvs_version_;
00310 
00311     enum
00312     {
00313       DEFAULT_PURGE_TIME = 120,
00314       DEFAULT_READY_TIME = 30
00315     };
00316 
00317   }; 
00318 } 
00319 
00320 #endif

Generated on Tue Jun 9 17:34:55 2009 for CMSSW by  doxygen 1.5.4