00001 #ifndef HLT_FRAG_COLL_HPP
00002 #define HLT_FRAG_COLL_HPP
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #include "IOPool/Streamer/interface/HLTInfo.h"
00016 #include "IOPool/Streamer/interface/EventBuffer.h"
00017 #include "IOPool/Streamer/interface/Utilities.h"
00018 #include "IOPool/Streamer/interface/MsgTools.h"
00019 #include "IOPool/Streamer/interface/EventMessage.h"
00020
00021 #include "DataFormats/Provenance/interface/ProductRegistry.h"
00022
00023 #include "EventFilter/StorageManager/interface/EvtMsgRingBuffer.h"
00024 #include "EventFilter/StorageManager/interface/EventServer.h"
00025 #include "EventFilter/StorageManager/interface/DQMEventServer.h"
00026 #include "EventFilter/StorageManager/interface/ServiceManager.h"
00027 #include "EventFilter/StorageManager/interface/DQMServiceManager.h"
00028 #include "EventFilter/StorageManager/interface/InitMsgCollection.h"
00029 #include "EventFilter/StorageManager/interface/SMPerformanceMeter.h"
00030 #include "EventFilter/StorageManager/interface/SMFUSenderList.h"
00031
00032 #include "boost/shared_ptr.hpp"
00033 #include "boost/thread/thread.hpp"
00034
00035 #include "log4cplus/logger.h"
00036
00037 #include <vector>
00038 #include <map>
00039 #include <string>
00040
00041 namespace stor
00042 {
00043 struct FragmentContainer
00044 {
00045 FragmentContainer():creationTime_(time(0)),lastFragmentTime_(0) { }
00046 std::map<int, FragEntry> fragmentMap_;
00047 time_t creationTime_;
00048 time_t lastFragmentTime_;
00049 };
00050
00051 class FragmentCollector
00052 {
00053 public:
00054 typedef void (*Deleter)(void*);
00055 typedef std::vector<unsigned char> Buffer;
00056
00057
00058
00059
00060 typedef std::map<stor::FragKey, FragmentContainer> Collection;
00061
00062 FragmentCollector(HLTInfo& h, Deleter d,
00063 log4cplus::Logger& applicationLogger,
00064 const std::string& config_str="");
00065 FragmentCollector(std::auto_ptr<HLTInfo>, Deleter d,
00066 log4cplus::Logger& applicationLogger,
00067 const std::string& config_str="");
00068 ~FragmentCollector();
00069
00070 void start();
00071 void join();
00072 void stop();
00073
00074 edm::EventBuffer& getFragmentQueue() { return *frag_q_; }
00075 edm::EventBuffer& getCommandQueue() { return *cmd_q_; }
00076
00077 void setEventServer(boost::shared_ptr<EventServer>& es) {
00078 eventServer_ = es;
00079 if (eventServer_.get() != NULL && writer_.get() != NULL) {
00080 eventServer_->setStreamSelectionTable(writer_->getStreamSelectionTable());
00081 }
00082 }
00083 void setInitMsgCollection(boost::shared_ptr<InitMsgCollection>& imColl) { initMsgCollection_ = imColl; }
00084 void setSMRBSenderList(SMFUSenderList* senderList) { smRBSenderList_ = senderList; }
00085
00086 private:
00087 static void run(FragmentCollector*);
00088 void processFragments();
00089 void processEvent(FragEntry* msg);
00090 void processHeader(FragEntry* msg);
00091 void processDQMEvent(FragEntry* msg);
00092 void processErrorEvent(FragEntry* msg);
00093
00094 int assembleFragments(std::map<int, FragEntry>& fragmentMap);
00095 int removeStaleFragments();
00096
00097 edm::EventBuffer* cmd_q_;
00098 edm::EventBuffer* evtbuf_q_;
00099 edm::EventBuffer* frag_q_;
00100
00101 Deleter buffer_deleter_;
00102 Buffer event_area_;
00103 Collection fragment_area_;
00104 boost::shared_ptr<boost::thread> me_;
00105 const edm::ProductRegistry* prods_;
00106 stor::HLTInfo* info_;
00107
00108 time_t lastStaleCheckTime_;
00109 int staleFragmentTimeout_;
00110
00111 public:
00112
00113 void setNumberOfFileSystems(int disks) { disks_ = disks; }
00114 void setFileCatalog(std::string catalog) { catalog_ = catalog; }
00115 void setSourceId(std::string sourceId) { sourceId_ = sourceId; }
00116
00117 void setCollateDQM(bool collateDQM)
00118 { dqmServiceManager_->setCollateDQM(collateDQM); }
00119
00120 void setArchiveDQM(bool archiveDQM)
00121 { dqmServiceManager_->setArchiveDQM(archiveDQM); }
00122
00123 void setArchiveIntervalDQM(int archiveInterval)
00124 { dqmServiceManager_->setArchiveInterval(archiveInterval); }
00125
00126 void setPurgeTimeDQM(int purgeTimeDQM)
00127 { dqmServiceManager_->setPurgeTime(purgeTimeDQM);}
00128
00129 void setReadyTimeDQM(int readyTimeDQM)
00130 { dqmServiceManager_->setReadyTime(readyTimeDQM);}
00131
00132 void setFilePrefixDQM(std::string filePrefixDQM)
00133 { dqmServiceManager_->setFilePrefix(filePrefixDQM);}
00134
00135 void setUseCompressionDQM(bool useCompressionDQM)
00136 { dqmServiceManager_->setUseCompression(useCompressionDQM);}
00137
00138 void setCompressionLevelDQM(int compressionLevelDQM)
00139 { dqmServiceManager_->setCompressionLevel(compressionLevelDQM);}
00140
00141 void setDQMEventServer(boost::shared_ptr<DQMEventServer>& es)
00142 {
00143
00144 if (dqmServiceManager_.get() != NULL) dqmServiceManager_->setDQMEventServer(es);
00145 DQMeventServer_ = es;
00146 }
00147 boost::shared_ptr<DQMEventServer>& getDQMEventServer() { return DQMeventServer_; }
00148
00149 std::list<std::string>& get_filelist() { return writer_->get_filelist(); }
00150 std::list<std::string>& get_currfiles() { return writer_->get_currfiles(); }
00151 std::vector<uint32>& get_storedEvents() { return writer_->get_storedEvents(); }
00152 std::vector<std::string>& get_storedNames() { return writer_->get_storedNames(); }
00153 boost::shared_ptr<stor::SMOnlyStats> get_stats() { return writer_->get_stats(); }
00154 private:
00155 uint32 runNumber_;
00156 uint32 disks_;
00157 std::string catalog_;
00158 std::string sourceId_;
00159 log4cplus::Logger& applicationLogger_;
00160
00161 std::auto_ptr<edm::ServiceManager> writer_;
00162 std::auto_ptr<stor::DQMServiceManager> dqmServiceManager_;
00163
00164 boost::shared_ptr<EventServer> eventServer_;
00165 boost::shared_ptr<DQMEventServer> DQMeventServer_;
00166 boost::shared_ptr<InitMsgCollection> initMsgCollection_;
00167 SMFUSenderList* smRBSenderList_;
00168 };
00169 }
00170
00171 #endif