CMS 3D CMS Logo

CMSSW_4_4_3_patch1/src/EventFilter/Processor/src/FUEventProcessor.h

Go to the documentation of this file.
00001 #ifndef FUEVENTPROCESSOR_H
00002 #define FUEVENTPROCESSOR_H 1
00003 
00004 #include "EventFilter/Utilities/interface/StateMachine.h"
00005 #include "EventFilter/Utilities/interface/RunBase.h"
00006 #include "EventFilter/Utilities/interface/Css.h"
00007 #include "EventFilter/Utilities/interface/Exception.h"
00008 #include "EventFilter/Utilities/interface/SquidNet.h"
00009 #include "EventFilter/Utilities/interface/Vulture.h"
00010 
00011 #include "EventFilter/Utilities/interface/MasterQueue.h"
00012 #include "EventFilter/Utilities/interface/SlaveQueue.h"
00013 #include "SubProcess.h"
00014 #include "FWEPWrapper.h"
00015 
00016 #include "DataFormats/Provenance/interface/ModuleDescription.h"
00017 
00018 #include "xdaq/Application.h"
00019 #include "xdaq/NamespaceURI.h"
00020 
00021 #include "xdata/String.h"
00022 #include "xdata/Integer.h"
00023 #include "xdata/Boolean.h"
00024 #include "xdata/Vector.h"
00025 #include "xdata/UnsignedInteger32.h"
00026 #include "xdata/ActionListener.h"
00027 #include "xdata/InfoSpaceFactory.h"
00028 
00029 #include "xgi/Input.h"
00030 #include "xgi/Output.h"
00031 #include "xgi/exception/Exception.h"
00032 
00033 #include <sys/time.h>
00034 #include <pthread.h>
00035 
00036 #include <list>
00037 #include <vector>
00038 #include <map>
00039 
00040 namespace evf
00041 {
00042 
00043   /* to be filled in with summary from paths */
00044   struct filter {
00045 
00046   };
00047   class CPUStat;
00048   class RateStat;
00049   namespace internal{
00050     
00051     class MyCgi : public xgi::Input{
00052     public:
00053       MyCgi() : xgi::Input("",0){}
00054       //      MyCgi(xgi::Input &b) : xgi::Input("",0) {environment_ = b.environment_;}
00055       std::map<std::string, std::string, std::less<std::string> > &getEnvironment(){return environment_;}
00056     };
00057   }
00058   class FUEventProcessor : public xdaq::Application,
00059                            public xdata::ActionListener
00060   {
00061   public:
00062     //
00063     // construction/destruction
00064     //
00065     XDAQ_INSTANTIATOR();
00066     FUEventProcessor(xdaq::ApplicationStub *s);
00067     virtual ~FUEventProcessor();
00068     
00069 
00070     //
00071     // member functions
00072     //
00073 
00074     // work loop functions to be executed during transitional states (async)
00075     bool configuring(toolbox::task::WorkLoop* wl);
00076     bool enabling(toolbox::task::WorkLoop* wl);
00077     bool stopping(toolbox::task::WorkLoop* wl);
00078     bool halting(toolbox::task::WorkLoop* wl);
00079 
00080     // fsm soap command callback
00081     xoap::MessageReference fsmCallback(xoap::MessageReference msg)
00082       throw (xoap::exception::Exception);
00083         
00084     // xdata:ActionListener interface
00085     void actionPerformed(xdata::Event& e);
00086     
00087     // trigger report related helper functions
00088     //    std::string triggerReportToString(const edm::TriggerReport& tr);
00089     //    void triggerReportToTable(const edm::TriggerReport& tr);
00090     //    void        printTriggerReport(const edm::TriggerReport& tr);
00091 
00092     // HyperDAQ related functions
00093     void defaultWebPage(xgi::Input *in,xgi::Output *out)
00094       throw(xgi::exception::Exception);
00095     void spotlightWebPage(xgi::Input *,xgi::Output *)
00096       throw(xgi::exception::Exception);
00097     void scalersWeb(xgi::Input *,xgi::Output *)
00098       throw(xgi::exception::Exception);
00099     void pathNames(xgi::Input *,xgi::Output *)
00100       throw(xgi::exception::Exception);
00101     void css(xgi::Input *in,xgi::Output *out) throw (xgi::exception::Exception)
00102     {
00103       css_.css(in,out);
00104     }
00105 
00106     void subWeb(xgi::Input *in,xgi::Output *out);
00107     void moduleWeb(xgi::Input *in,xgi::Output *out){evtProcessor_.moduleWeb(in,out);}
00108     void serviceWeb(xgi::Input *in,xgi::Output *out){evtProcessor_.serviceWeb(in,out);}
00109     void microState(xgi::Input *in,xgi::Output *out);
00110     void updater(xgi::Input *in,xgi::Output *out);
00111     void procStat(xgi::Input *in,xgi::Output *out);
00112     void sendMessageOverMonitorQueue(MsgBuf &);
00113 
00114   private:
00115 
00116     void attachDqmToShm()   throw (evf::Exception);
00117     void detachDqmFromShm() throw (evf::Exception);
00118 
00119     std::string logsAsString();
00120     void localLog(std::string);
00121 
00122     // MPEP functions
00123     void startSupervisorLoop();
00124     void startReceivingLoop();
00125     void startReceivingMonitorLoop();
00126     // calculate scalers information in separate thread
00127     void startScalersWorkLoop() throw (evf::Exception);
00128     bool scalers(toolbox::task::WorkLoop* wl);
00129     void startSummarizeWorkLoop() throw (evf::Exception);
00130     bool summarize(toolbox::task::WorkLoop* wl);
00131 
00132     bool receiving(toolbox::task::WorkLoop* wl);
00133     bool receivingAndMonitor(toolbox::task::WorkLoop* wl);
00134     bool supervisor(toolbox::task::WorkLoop* wl);
00135     bool enableCommon();
00136     bool enableClassic();
00137     //    void enableMPEPMaster();
00138     bool enableMPEPSlave();
00139     bool stopClassic();
00140     void stopSlavesAndAcknowledge();
00141     void makeStaticInfo();
00142 
00143     //
00144     // member data
00145     //
00146     
00147     // finite state machine
00148     evf::StateMachine                fsm_;
00149     
00150     // logger
00151     Logger                           log_;
00152 
00153     // edm event processor
00154     FWEPWrapper                      evtProcessor_;
00155     
00156     // parameters published to XDAQ info space(s)
00157     xdata::String                    url_;
00158     xdata::String                    class_;
00159     xdata::UnsignedInteger32         instance_;
00160     xdata::UnsignedInteger32         runNumber_;
00161     xdata::Boolean                   epInitialized_; 
00162     xdata::String                    configString_;
00163     std::string                      configuration_;
00164 
00165     xdata::Boolean                   outPut_;
00166 
00167     xdata::Boolean                   autoRestartSlaves_;
00168     xdata::UnsignedInteger32         slaveRestartDelaySecs_;
00169 
00170     xdata::Boolean                   hasShMem_;
00171     xdata::Boolean                   hasPrescaleService_;
00172     xdata::Boolean                   hasModuleWebRegistry_;
00173     xdata::Boolean                   hasServiceWebRegistry_;
00174     xdata::Boolean                   isRunNumberSetter_;
00175     xdata::Boolean                   iDieStatisticsGathering_;
00176     bool                             outprev_;
00177     
00178     // application identifier
00179     std::string                      sourceId_;
00180 
00181     // flashlist variables, squids
00182     xdata::Boolean                   squidPresent_; 
00183 
00184     // behavior on error - configurable
00185     
00186     xdata::Boolean                   exitOnError_; 
00187 
00188     // HyperDAQ related
00189     Css                              css_;
00190 
00191     // Misc
00192     std::string                      reasonForFailedState_;
00193 
00194     SquidNet                         squidnet_;
00195     std::vector<std::string>         logRing_;
00196     unsigned int                     logRingIndex_;
00197     static const unsigned int        logRingSize_ = 50;
00198     bool                             logWrap_;
00199 
00200     xdata::UnsignedInteger32         nbSubProcesses_;
00201     xdata::UnsignedInteger32         nbSubProcessesReporting_;
00202     std::vector<SubProcess>          subs_;
00203     unsigned int                     nblive_; 
00204     unsigned int                     nbdead_; 
00205 
00206     unsigned int                     nbTotalDQM_;
00207 
00208     // workloop / action signature for message passing
00209     toolbox::task::WorkLoop         *wlReceiving_;      
00210     toolbox::task::ActionSignature  *asReceiveMsgAndExecute_;
00211     bool                             receiving_;
00212     toolbox::task::WorkLoop         *wlReceivingMonitor_;      
00213     toolbox::task::ActionSignature  *asReceiveMsgAndRead_;
00214     bool                             receivingM_;
00215     SubProcess*                      myProcess_;
00216     toolbox::task::WorkLoop         *wlSupervising_;      
00217     toolbox::task::ActionSignature  *asSupervisor_;
00218     bool                             supervising_;
00219 
00220     xdata::InfoSpace*                monitorInfoSpace_;
00221     xdata::InfoSpace*                monitorLegendaInfoSpace_;
00222     xdata::InfoSpace*                applicationInfoSpace_;
00223     pthread_mutex_t                  stop_lock_;
00224     pthread_mutex_t                  start_lock_;
00225     pthread_mutex_t                  pickup_lock_;
00226     std::string                      updaterStatic_;
00227     xdata::Serializable             *nbProcessed;
00228     xdata::Serializable             *nbAccepted;
00229 
00230     // flahslist variables, scalers
00231     xdata::InfoSpace                *scalersInfoSpace_;
00232     xdata::InfoSpace                *scalersLegendaInfoSpace_;
00233 
00234     //scalers workloop
00235     toolbox::task::WorkLoop         *wlScalers_;      
00236     toolbox::task::ActionSignature  *asScalers_;
00237     bool                             wlScalersActive_;
00238     unsigned int                     scalersUpdates_;
00239 
00240     //summarize workloop
00241     toolbox::task::WorkLoop         *wlSummarize_;      
00242     toolbox::task::ActionSignature  *asSummarize_;
00243     bool                             wlSummarizeActive_;
00244     int                              anonymousPipe_[2];
00245     xdata::Vector<xdata::Integer>    spMStates_;
00246     xdata::Vector<xdata::Integer>    spmStates_;
00247     xdata::UnsignedInteger32         superSleepSec_; 
00248     std::list<std::string>           names_;
00249     xdata::String                    iDieUrl_;
00250     Vulture                         *vulture_;
00251     pid_t                            vp_;
00252     CPUStat                         *cpustat_;
00253     RateStat                        *ratestat_;
00254 
00255     MsgBuf                           master_message_prg_;
00256     MsgBuf                           master_message_prr_;
00257     MsgBuf                           slave_message_prr_;
00258     MsgBuf                           slave_message_monitoring_;
00259     MsgBuf                           master_message_trr_;
00260   };
00261   
00262 } // namespace evf
00263 
00264 
00265 #endif