CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
DataManager.cc
Go to the documentation of this file.
1 // $Id: DataManager.cc,v 1.3 2011/03/24 17:26:25 mommsen Exp $
3 
8 #include "EventFilter/SMProxyServer/src/EventRetriever.icc"
10 
11 #include <boost/foreach.hpp>
12 #include <boost/pointer_cast.hpp>
13 
14 
15 namespace smproxy
16 {
18  (
19  StateMachine* stateMachine
20  ) :
21  stateMachine_(stateMachine),
22  registrationQueue_(stateMachine->getRegistrationQueue())
23  {
24  watchDogThread_.reset(
25  new boost::thread( boost::bind( &DataManager::checkForStaleConsumers, this) )
26  );
27  }
28 
30  {
31  stop();
32  watchDogThread_->interrupt();
33  watchDogThread_->join();
34  }
35 
36 
38  {
40  dataEventRetrievers_.clear();
41  dqmEventRetrievers_.clear();
42  edm::shutdown_flag = false;
43  thread_.reset(
44  new boost::thread( boost::bind( &DataManager::doIt, this) )
45  );
46  }
47 
48 
50  {
51  // enqueue a dummy RegistrationInfoBase to tell the thread to stop
52  registrationQueue_->enqWait( stor::RegPtr() );
53  thread_->join();
54 
55  edm::shutdown_flag = true;
56 
57  BOOST_FOREACH(
60  ) pair.second->stop();
61 
62  BOOST_FOREACH(
65  ) pair.second->stop();
66  }
67 
68 
70  (
71  stor::EventConsRegPtr eventConsumer,
72  stor::QueueIDs& queueIDs
73  ) const
74  {
75  if ( ! eventConsumer ) return false;
76 
77  DataEventRetrieverMap::const_iterator pos =
78  dataEventRetrievers_.find(eventConsumer);
79  if ( pos == dataEventRetrievers_.end() ) return false;
80 
81  queueIDs = pos->second->getQueueIDs();
82  return true;
83  }
84 
85 
87  (
88  stor::DQMEventConsRegPtr dqmEventConsumer,
89  stor::QueueIDs& queueIDs
90  ) const
91  {
92  if ( ! dqmEventConsumer ) return false;
93 
94  DQMEventRetrieverMap::const_iterator pos =
95  dqmEventRetrievers_.find(dqmEventConsumer);
96  if ( pos == dqmEventRetrievers_.end() ) return false;
97 
98  queueIDs = pos->second->getQueueIDs();
99  return true;
100  }
101 
102 
104  {
105  try
106  {
107  doIt();
108  }
109  catch(xcept::Exception &e)
110  {
112  }
113  catch(std::exception &e)
114  {
115  XCEPT_DECLARE(exception::Exception,
116  sentinelException, e.what());
117  stateMachine_->moveToFailedState(sentinelException);
118  }
119  catch(...)
120  {
121  std::string errorMsg = "Unknown exception in watch dog";
122  XCEPT_DECLARE(exception::Exception,
123  sentinelException, errorMsg);
124  stateMachine_->moveToFailedState(sentinelException);
125  }
126  }
127 
128 
130  {
131  stor::RegPtr regPtr;
132  bool process(true);
133 
134  DQMArchiver dqmArchiver(stateMachine_);
135  addDQMEventConsumer(dqmArchiver.getRegPtr());
136 
137  do
138  {
139  registrationQueue_->deqWait(regPtr);
140 
141  if ( ! (addEventConsumer(regPtr) || addDQMEventConsumer(regPtr)) )
142  {
143  // base type received, signalling the end of the run
144  process = false;
145  }
146  } while (process);
147  }
148 
149 
151  {
152  stor::EventConsRegPtr eventConsumer =
153  boost::dynamic_pointer_cast<stor::EventConsumerRegistrationInfo>(regPtr);
154 
155  if ( ! eventConsumer ) return false;
156 
157  DataEventRetrieverMap::iterator pos = dataEventRetrievers_.lower_bound(eventConsumer);
158  if (
159  pos == dataEventRetrievers_.end() ||
160  (dataEventRetrievers_.key_comp()(eventConsumer, pos->first))
161  )
162  {
163  // no retriever found for this event requests
164  DataEventRetrieverPtr dataEventRetriever(
165  new DataEventRetriever(stateMachine_, eventConsumer)
166  );
167  dataEventRetrievers_.insert(pos,
168  DataEventRetrieverMap::value_type(eventConsumer, dataEventRetriever));
169  }
170  else
171  {
172  pos->second->addConsumer( eventConsumer );
173  }
174 
175  return true;
176  }
177 
178 
180  {
181  stor::DQMEventConsRegPtr dqmEventConsumer =
182  boost::dynamic_pointer_cast<stor::DQMEventConsumerRegistrationInfo>(regPtr);
183 
184  if ( ! dqmEventConsumer ) return false;
185 
186  DQMEventRetrieverMap::iterator pos =
187  dqmEventRetrievers_.lower_bound(dqmEventConsumer);
188  if (
189  pos == dqmEventRetrievers_.end() ||
190  (dqmEventRetrievers_.key_comp()(dqmEventConsumer, pos->first)) )
191  {
192  // no retriever found for this DQM event requests
193  DQMEventRetrieverPtr dqmEventRetriever(
194  new DQMEventRetriever(stateMachine_, dqmEventConsumer)
195  );
196  dqmEventRetrievers_.insert(pos,
197  DQMEventRetrieverMap::value_type(dqmEventConsumer, dqmEventRetriever));
198  }
199  else
200  {
201  pos->second->addConsumer( dqmEventConsumer );
202  }
203 
204  return true;
205  }
206 
207 
209  {
210  try
211  {
213  }
214  catch(boost::thread_interrupted)
215  {
216  // thread was interrupted.
217  }
218  catch(xcept::Exception &e)
219  {
221  }
222  catch(std::exception &e)
223  {
224  XCEPT_DECLARE(exception::Exception,
225  sentinelException, e.what());
226  stateMachine_->moveToFailedState(sentinelException);
227  }
228  catch(...)
229  {
230  std::string errorMsg = "Unknown exception in watch dog";
231  XCEPT_DECLARE(exception::Exception,
232  sentinelException, errorMsg);
233  stateMachine_->moveToFailedState(sentinelException);
234  }
235  }
236 
237 
239  {
240  EventQueueCollectionPtr eventQueueCollection =
242  stor::DQMEventQueueCollectionPtr dqmEventQueueCollection =
244 
245  while (true)
246  {
249  eventQueueCollection->clearStaleQueues(now);
250  dqmEventQueueCollection->clearStaleQueues(now);
251  }
252  }
253 
254 } // namespace smproxy
255 
TimePoint_t getCurrentTime()
Definition: Utils.h:158
stor::RegistrationQueuePtr registrationQueue_
Definition: DataManager.h:85
boost::shared_ptr< RegistrationInfoBase > RegPtr
double seconds()
boost::shared_ptr< DataEventRetriever > DataEventRetrieverPtr
Definition: DataManager.h:93
DataEventRetrieverMap dataEventRetrievers_
Definition: DataManager.h:96
std::vector< QueueID > QueueIDs
Definition: QueueID.h:80
void start(DataRetrieverParams const &)
Definition: DataManager.cc:37
boost::shared_ptr< stor::EventConsumerRegistrationInfo > EventConsRegPtr
void moveToFailedState(xcept::Exception &e)
Definition: StateMachine.h:79
DataManager(StateMachine *)
Definition: DataManager.cc:18
void sleep(Duration_t)
Definition: Utils.h:163
stor::DQMEventQueueCollectionPtr getDQMEventQueueCollection() const
Definition: StateMachine.h:105
bool addDQMEventConsumer(stor::RegPtr)
Definition: DataManager.cc:179
boost::shared_ptr< DQMEventQueueCollection > DQMEventQueueCollectionPtr
EventQueueCollectionPtr getEventQueueCollection() const
Definition: StateMachine.h:103
boost::scoped_ptr< boost::thread > watchDogThread_
Definition: DataManager.h:89
boost::posix_time::ptime TimePoint_t
Definition: Utils.h:35
Container::value_type value_type
stor::RegistrationQueuePtr getRegistrationQueue() const
Definition: StateMachine.h:99
DataRetrieverParams dataRetrieverParams_
Definition: DataManager.h:86
bool addEventConsumer(stor::RegPtr)
Definition: DataManager.cc:150
EventRetriever< stor::EventConsumerRegistrationInfo, EventQueueCollectionPtr > DataEventRetriever
Definition: DataManager.h:92
boost::shared_ptr< EventQueueCollection > EventQueueCollectionPtr
boost::scoped_ptr< boost::thread > thread_
Definition: DataManager.h:88
DQMEventRetrieverMap dqmEventRetrievers_
Definition: DataManager.h:103
bool getQueueIDsFromDQMEventRetrievers(stor::DQMEventConsRegPtr, stor::QueueIDs &) const
Definition: DataManager.cc:87
const stor::DQMEventConsRegPtr & getRegPtr() const
Definition: DQMArchiver.h:41
boost::shared_ptr< DQMEventRetriever > DQMEventRetrieverPtr
Definition: DataManager.h:100
volatile bool shutdown_flag
StateMachine * stateMachine_
Definition: DataManager.h:84
tuple process
Definition: LaserDQM_cfg.py:3
EventRetriever< stor::DQMEventConsumerRegistrationInfo, stor::DQMEventQueueCollectionPtr > DQMEventRetriever
Definition: DataManager.h:99
bool getQueueIDsFromDataEventRetrievers(stor::EventConsRegPtr, stor::QueueIDs &) const
Definition: DataManager.cc:70
boost::shared_ptr< stor::DQMEventConsumerRegistrationInfo > DQMEventConsRegPtr