CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_5_3_0/src/EventFilter/Processor/src/FUEventProcessor.h

Go to the documentation of this file.
00001 #ifndef FUEVENTPROCESSOR_H
00002 #define FUEVENTPROCESSOR_H 1
00003 
00004 #include "EventFilter/Utilities/interface/StateMachine.h"
00005 #include "EventFilter/Utilities/interface/RunBase.h"
00006 #include "EventFilter/Utilities/interface/Css.h"
00007 #include "EventFilter/Utilities/interface/Exception.h"
00008 #include "EventFilter/Utilities/interface/SquidNet.h"
00009 #include "EventFilter/Utilities/interface/Vulture.h"
00010 
00011 #include "EventFilter/Utilities/interface/MasterQueue.h"
00012 #include "EventFilter/Utilities/interface/SlaveQueue.h"
00013 #include "EventFilter/Utilities/interface/ModuleWeb.h"
00014 #include "EventFilter/Utilities/interface/ModuleWebRegistry.h"
00015 #include "EventFilter/Modules/interface/ShmOutputModuleRegistry.h"
00016 #include "SubProcess.h"
00017 #include "FWEPWrapper.h"
00018 
00019 #include "DataFormats/Provenance/interface/ModuleDescription.h"
00020 
00021 #include "xdaq/Application.h"
00022 #include "xdaq/NamespaceURI.h"
00023 
00024 #include "xdata/String.h"
00025 #include "xdata/Integer.h"
00026 #include "xdata/Boolean.h"
00027 #include "xdata/Vector.h"
00028 #include "xdata/UnsignedInteger32.h"
00029 #include "xdata/ActionListener.h"
00030 #include "xdata/InfoSpaceFactory.h"
00031 
00032 #include "xgi/Input.h"
00033 #include "xgi/Output.h"
00034 #include "xgi/exception/Exception.h"
00035 
00036 #include <sys/time.h>
00037 #include <pthread.h>
00038 
00039 #include <list>
00040 #include <vector>
00041 #include <map>
00042 
00043 namespace evf
00044 {
00045 
00046   /* to be filled in with summary from paths */
00047   struct filter {
00048 
00049   };
00050   class CPUStat;
00051   class RateStat;
00052   namespace internal{
00053     
00054     class MyCgi : public xgi::Input{
00055     public:
00056       MyCgi() : xgi::Input("",0){}
00057       //      MyCgi(xgi::Input &b) : xgi::Input("",0) {environment_ = b.environment_;}
00058       std::map<std::string, std::string, std::less<std::string> > &getEnvironment(){return environment_;}
00059     };
00060   }
00061   class FUEventProcessor : public xdaq::Application,
00062                            public xdata::ActionListener
00063   {
00064   public:
00065     //
00066     // construction/destruction
00067     //
00068     XDAQ_INSTANTIATOR();
00069     FUEventProcessor(xdaq::ApplicationStub *s);
00070     virtual ~FUEventProcessor();
00071     
00072 
00073     //
00074     // member functions
00075     //
00076 
00077     // work loop functions to be executed during transitional states (async)
00078     bool configuring(toolbox::task::WorkLoop* wl);
00079     bool enabling(toolbox::task::WorkLoop* wl);
00080     bool stopping(toolbox::task::WorkLoop* wl);
00081     bool halting(toolbox::task::WorkLoop* wl);
00082 
00083     // fsm soap command callback
00084     xoap::MessageReference fsmCallback(xoap::MessageReference msg)
00085       throw (xoap::exception::Exception);
00086         
00087     // xdata:ActionListener interface
00088     void actionPerformed(xdata::Event& e);
00089     
00090     // trigger report related helper functions
00091     //    std::string triggerReportToString(const edm::TriggerReport& tr);
00092     //    void triggerReportToTable(const edm::TriggerReport& tr);
00093     //    void        printTriggerReport(const edm::TriggerReport& tr);
00094 
00095     // HyperDAQ related functions
00096     void defaultWebPage(xgi::Input *in,xgi::Output *out)
00097       throw(xgi::exception::Exception);
00098     void spotlightWebPage(xgi::Input *,xgi::Output *)
00099       throw(xgi::exception::Exception);
00100     void scalersWeb(xgi::Input *,xgi::Output *)
00101       throw(xgi::exception::Exception);
00102     void pathNames(xgi::Input *,xgi::Output *)
00103       throw(xgi::exception::Exception);
00104     void css(xgi::Input *in,xgi::Output *out) throw (xgi::exception::Exception)
00105     {
00106       css_.css(in,out);
00107     }
00108 
00109     void subWeb(xgi::Input *in,xgi::Output *out);
00110     void moduleWeb(xgi::Input *in,xgi::Output *out){evtProcessor_.moduleWeb(in,out);}
00111     void serviceWeb(xgi::Input *in,xgi::Output *out){evtProcessor_.serviceWeb(in,out);}
00112     void microState(xgi::Input *in,xgi::Output *out);
00113     void updater(xgi::Input *in,xgi::Output *out);
00114     void procStat(xgi::Input *in,xgi::Output *out);
00115     void sendMessageOverMonitorQueue(MsgBuf &);
00116 
00117     static void forkProcessFromEDM_helper(void * addr);
00118 
00119   private:
00120 
00121 
00122     void forkProcessesFromEDM();
00123 
00124     bool enableForkInEDM();
00125     bool restartForkInEDM(unsigned int slotId);
00126     bool doEndRunInEDM();
00127 
00128     void setAttachDqmToShm() throw (evf::Exception);
00129     void attachDqmToShm()    throw (evf::Exception);
00130     void detachDqmFromShm()  throw (evf::Exception);
00131 
00132     std::string logsAsString();
00133     void localLog(std::string);
00134 
00135     // MPEP functions
00136     void startSupervisorLoop();
00137     void startReceivingLoop();
00138     void startReceivingMonitorLoop();
00139     // calculate scalers information in separate thread
00140     void startScalersWorkLoop() throw (evf::Exception);
00141     bool scalers(toolbox::task::WorkLoop* wl);
00142     void startSummarizeWorkLoop() throw (evf::Exception);
00143     bool summarize(toolbox::task::WorkLoop* wl);
00144 
00145     bool receiving(toolbox::task::WorkLoop* wl);
00146     bool receivingAndMonitor(toolbox::task::WorkLoop* wl);
00147     bool supervisor(toolbox::task::WorkLoop* wl);
00148     bool enableCommon();
00149     bool enableClassic();
00150     //    void enableMPEPMaster();
00151     bool enableMPEPSlave();
00152     bool stopClassic();
00153     void stopSlavesAndAcknowledge();
00154     void makeStaticInfo();
00155 
00156     //
00157     // member data
00158     //
00159     
00160     // finite state machine
00161     evf::StateMachine                fsm_;
00162     
00163     // logger
00164     Logger                           log_;
00165 
00166     // edm event processor
00167     FWEPWrapper                      evtProcessor_;
00168     
00169     // parameters published to XDAQ info space(s)
00170     xdata::String                    url_;
00171     xdata::String                    class_;
00172     xdata::UnsignedInteger32         instance_;
00173     xdata::UnsignedInteger32         runNumber_;
00174     xdata::Boolean                   epInitialized_; 
00175     xdata::String                    configString_;
00176     std::string                      configuration_;
00177 
00178     xdata::Boolean                   outPut_;
00179 
00180     xdata::Boolean                   autoRestartSlaves_;
00181     xdata::UnsignedInteger32         slaveRestartDelaySecs_;
00182 
00183     xdata::Boolean                   hasShMem_;
00184     xdata::Boolean                   hasPrescaleService_;
00185     xdata::Boolean                   hasModuleWebRegistry_;
00186     xdata::Boolean                   hasServiceWebRegistry_;
00187     xdata::Boolean                   isRunNumberSetter_;
00188     xdata::Boolean                   iDieStatisticsGathering_;
00189     bool                             outprev_;
00190     
00191     // application identifier
00192     std::string                      sourceId_;
00193 
00194     // flashlist variables, squids
00195     xdata::Boolean                   squidPresent_; 
00196 
00197     // behavior on error - configurable
00198     
00199     xdata::Boolean                   exitOnError_; 
00200 
00201     // HyperDAQ related
00202     Css                              css_;
00203 
00204     // Misc
00205     std::string                      reasonForFailedState_;
00206 
00207     SquidNet                         squidnet_;
00208     std::vector<std::string>         logRing_;
00209     unsigned int                     logRingIndex_;
00210     static const unsigned int        logRingSize_ = 50;
00211     bool                             logWrap_;
00212 
00213     xdata::UnsignedInteger32         nbSubProcesses_;
00214     xdata::UnsignedInteger32         nbSubProcessesReporting_;
00215     xdata::UnsignedInteger32         forkInEDM_;
00216     std::vector<SubProcess>          subs_;
00217     unsigned int                     nblive_; 
00218     unsigned int                     nbdead_; 
00219 
00220     unsigned int                     nbTotalDQM_;
00221 
00222     // workloop / action signature for message passing
00223     toolbox::task::WorkLoop         *wlReceiving_;      
00224     toolbox::task::ActionSignature  *asReceiveMsgAndExecute_;
00225     bool                             receiving_;
00226     toolbox::task::WorkLoop         *wlReceivingMonitor_;      
00227     toolbox::task::ActionSignature  *asReceiveMsgAndRead_;
00228     bool                             receivingM_;
00229     SubProcess*                      myProcess_;
00230     toolbox::task::WorkLoop         *wlSupervising_;      
00231     toolbox::task::ActionSignature  *asSupervisor_;
00232     bool                             supervising_;
00233 
00234     xdata::InfoSpace*                monitorInfoSpace_;
00235     xdata::InfoSpace*                monitorLegendaInfoSpace_;
00236     xdata::InfoSpace*                applicationInfoSpace_;
00237     pthread_mutex_t                  stop_lock_;
00238     pthread_mutex_t                  start_lock_;
00239     pthread_mutex_t                  pickup_lock_;
00240     std::string                      updaterStatic_;
00241     xdata::Serializable             *nbProcessed;
00242     xdata::Serializable             *nbAccepted;
00243 
00244     // flahslist variables, scalers
00245     xdata::InfoSpace                *scalersInfoSpace_;
00246     xdata::InfoSpace                *scalersLegendaInfoSpace_;
00247 
00248     //scalers workloop
00249     toolbox::task::WorkLoop         *wlScalers_;      
00250     toolbox::task::ActionSignature  *asScalers_;
00251     bool                             wlScalersActive_;
00252     unsigned int                     scalersUpdates_;
00253 
00254     //summarize workloop
00255     toolbox::task::WorkLoop         *wlSummarize_;      
00256     toolbox::task::ActionSignature  *asSummarize_;
00257     bool                             wlSummarizeActive_;
00258     int                              anonymousPipe_[2];
00259     xdata::Vector<xdata::Integer>    spMStates_;
00260     xdata::Vector<xdata::Integer>    spmStates_;
00261     xdata::UnsignedInteger32         superSleepSec_; 
00262     std::list<std::string>           names_;
00263     xdata::String                    iDieUrl_;
00264     Vulture                         *vulture_;
00265     pid_t                            vp_;
00266     CPUStat                         *cpustat_;
00267     RateStat                        *ratestat_;
00268 
00269     ModuleWebRegistry                *mwrRef_;
00270     ShmOutputModuleRegistry          *sorRef_;
00271     MsgBuf                           master_message_prg_;
00272     MsgBuf                           master_message_prr_;
00273     MsgBuf                           slave_message_prr_;
00274     MsgBuf                           slave_message_monitoring_;
00275     MsgBuf                           master_message_trr_;
00276 
00277     moduleweb::ForkInfoObj           *forkInfoObj_;
00278     pthread_mutex_t                  forkObjLock_;
00279     bool                             edm_init_done_;
00280   };
00281   
00282 } // namespace evf
00283 
00284 
00285 #endif