CMS 3D CMS Logo

FragmentCollector.h

Go to the documentation of this file.
00001 #ifndef HLT_FRAG_COLL_HPP
00002 #define HLT_FRAG_COLL_HPP
00003 
00004 /*
00005   All buffers passed in on queues are owned by the fragment collector.
00006 
00007   JBK - I think the frag_q is going to need to be a pair of pointers.
00008   The first is the address of the object that needs to be deleted 
00009   using the Deleter function.  The second is the address of the buffer
00010   of information that we manipulate (payload of the received object).
00011 
00012   The code should allow for deleters to be functors or simple functions.
00013  */
00014 
00015 #include "IOPool/Streamer/interface/HLTInfo.h"
00016 #include "IOPool/Streamer/interface/EventBuffer.h"
00017 #include "IOPool/Streamer/interface/Utilities.h"
00018 #include "IOPool/Streamer/interface/MsgTools.h"
00019 #include "IOPool/Streamer/interface/EventMessage.h"
00020 
00021 #include "DataFormats/Provenance/interface/ProductRegistry.h"
00022 
00023 #include "EventFilter/StorageManager/interface/EvtMsgRingBuffer.h"
00024 #include "EventFilter/StorageManager/interface/EventServer.h"
00025 #include "EventFilter/StorageManager/interface/DQMEventServer.h"
00026 #include "EventFilter/StorageManager/interface/ServiceManager.h"
00027 #include "EventFilter/StorageManager/interface/DQMServiceManager.h"
00028 #include "EventFilter/StorageManager/interface/InitMsgCollection.h"
00029 #include "EventFilter/StorageManager/interface/SMPerformanceMeter.h"
00030 #include "EventFilter/StorageManager/interface/SMFUSenderList.h"
00031 
00032 #include "boost/shared_ptr.hpp"
00033 #include "boost/thread/thread.hpp"
00034 
00035 #include "log4cplus/logger.h"
00036 
00037 #include <vector>
00038 #include <map>
00039 #include <string>
00040 
00041 namespace stor
00042 {
00043   struct FragmentContainer
00044   {
00045     FragmentContainer():creationTime_(time(0)),lastFragmentTime_(0) { }
00046     std::map<int, FragEntry> fragmentMap_;
00047     time_t creationTime_;
00048     time_t lastFragmentTime_;
00049   };
00050 
00051   class FragmentCollector
00052   {
00053   public:
00054     typedef void (*Deleter)(void*);
00055     typedef std::vector<unsigned char> Buffer;
00056 
00057     // This is not the most efficient way to store and manipulate this
00058     // type of data.  It is like this because there is not much time
00059     // available to create the prototype.
00060     typedef std::map<stor::FragKey, FragmentContainer> Collection;
00061 
00062     FragmentCollector(HLTInfo& h, Deleter d,
00063                       log4cplus::Logger& applicationLogger,
00064                       const std::string& config_str="");
00065     FragmentCollector(std::auto_ptr<HLTInfo>, Deleter d,
00066                       log4cplus::Logger& applicationLogger,
00067                       const std::string& config_str="");
00068     ~FragmentCollector();
00069 
00070     void start();
00071     void join();
00072     void stop();
00073 
00074     edm::EventBuffer& getFragmentQueue() { return *frag_q_; }
00075     edm::EventBuffer& getCommandQueue() { return *cmd_q_; }
00076     
00077     void setEventServer(boost::shared_ptr<EventServer>& es) {
00078       eventServer_ = es;
00079       if (eventServer_.get() != NULL && writer_.get() != NULL) {
00080         eventServer_->setStreamSelectionTable(writer_->getStreamSelectionTable());
00081       }
00082     }
00083     void setInitMsgCollection(boost::shared_ptr<InitMsgCollection>& imColl) { initMsgCollection_ = imColl; }
00084     void setSMRBSenderList(SMFUSenderList* senderList) { smRBSenderList_ = senderList; }
00085 
00086   private:
00087     static void run(FragmentCollector*);
00088     void processFragments();
00089     void processEvent(FragEntry* msg);
00090     void processHeader(FragEntry* msg);
00091     void processDQMEvent(FragEntry* msg);
00092     void processErrorEvent(FragEntry* msg);
00093 
00094     int assembleFragments(std::map<int, FragEntry>& fragmentMap);
00095     int removeStaleFragments();
00096 
00097     edm::EventBuffer* cmd_q_;
00098     edm::EventBuffer* evtbuf_q_;
00099     edm::EventBuffer* frag_q_;
00100 
00101     Deleter buffer_deleter_;
00102     Buffer event_area_;
00103     Collection fragment_area_;
00104     boost::shared_ptr<boost::thread> me_;
00105     const edm::ProductRegistry* prods_; // change to shared_ptr ? 
00106     stor::HLTInfo* info_;  // cannot be const when using EP_Runner?
00107 
00108     time_t lastStaleCheckTime_;
00109     int staleFragmentTimeout_;
00110 
00111   public:
00112 
00113     void setNumberOfFileSystems(int disks)   { disks_        = disks; }
00114     void setFileCatalog(std::string catalog) { catalog_      = catalog; }
00115     void setSourceId(std::string sourceId)   { sourceId_     = sourceId; }
00116 
00117     void setCollateDQM(bool collateDQM)
00118     { dqmServiceManager_->setCollateDQM(collateDQM); }
00119 
00120     void setArchiveDQM(bool archiveDQM)
00121     { dqmServiceManager_->setArchiveDQM(archiveDQM); }
00122 
00123     void setArchiveIntervalDQM(int archiveInterval)
00124     { dqmServiceManager_->setArchiveInterval(archiveInterval); }
00125 
00126     void setPurgeTimeDQM(int purgeTimeDQM)
00127     { dqmServiceManager_->setPurgeTime(purgeTimeDQM);}
00128 
00129     void setReadyTimeDQM(int readyTimeDQM)
00130     { dqmServiceManager_->setReadyTime(readyTimeDQM);}
00131 
00132     void setFilePrefixDQM(std::string filePrefixDQM)
00133     { dqmServiceManager_->setFilePrefix(filePrefixDQM);}
00134 
00135     void setUseCompressionDQM(bool useCompressionDQM)
00136     { dqmServiceManager_->setUseCompression(useCompressionDQM);}
00137 
00138     void setCompressionLevelDQM(int compressionLevelDQM)
00139     { dqmServiceManager_->setCompressionLevel(compressionLevelDQM);}
00140 
00141     void setDQMEventServer(boost::shared_ptr<DQMEventServer>& es)
00142     {
00143       // The auto_ptr still owns the memory after this get()
00144       if (dqmServiceManager_.get() != NULL) dqmServiceManager_->setDQMEventServer(es);
00145       DQMeventServer_ = es;
00146     }
00147     boost::shared_ptr<DQMEventServer>& getDQMEventServer() { return DQMeventServer_; }
00148 
00149     std::list<std::string>& get_filelist() { return writer_->get_filelist();  }
00150     std::list<std::string>& get_currfiles() { return writer_->get_currfiles(); }
00151     std::vector<uint32>& get_storedEvents() { return writer_->get_storedEvents(); }
00152     std::vector<std::string>& get_storedNames() { return writer_->get_storedNames(); }
00153     boost::shared_ptr<stor::SMOnlyStats> get_stats() { return writer_->get_stats(); }
00154   private:
00155     uint32 runNumber_;
00156     uint32 disks_;
00157     std::string catalog_;
00158     std::string sourceId_;
00159     log4cplus::Logger& applicationLogger_;
00160 
00161     std::auto_ptr<edm::ServiceManager> writer_;
00162     std::auto_ptr<stor::DQMServiceManager> dqmServiceManager_;
00163 
00164     boost::shared_ptr<EventServer> eventServer_;
00165     boost::shared_ptr<DQMEventServer> DQMeventServer_;
00166     boost::shared_ptr<InitMsgCollection> initMsgCollection_;
00167     SMFUSenderList* smRBSenderList_;
00168   };
00169 }
00170 
00171 #endif

Generated on Tue Jun 9 17:34:52 2009 for CMSSW by  doxygen 1.5.4