CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
StorageManager.cc
Go to the documentation of this file.
1 // $Id: StorageManager.cc,v 1.139 2013/01/07 11:30:00 eulisse Exp $
3 
15 #include "EventFilter/StorageManager/src/ConsumerUtils.icc"
16 
18 
20 
21 #include "interface/shared/version.h"
22 #include "interface/shared/i2oXFunctionCodes.h"
23 #include "xcept/tools.h"
24 #include "xdaq/NamespaceURI.h"
25 #include "xdata/InfoSpaceFactory.h"
26 #include "xgi/Method.h"
27 #include "xoap/Method.h"
28 
29 #include <boost/lexical_cast.hpp>
30 #include <boost/shared_ptr.hpp>
31 
32 #include <cstdlib>
33 
34 using namespace std;
35 using namespace stor;
36 
37 
38 StorageManager::StorageManager(xdaq::ApplicationStub * s) :
39  xdaq::Application(s)
40 {
41  LOG4CPLUS_INFO(this->getApplicationLogger(),"Making StorageManager");
42 
43  // bind all callback functions
48 
49  std::string errorMsg = "Exception in StorageManager constructor: ";
50  try
51  {
53  }
54  catch(std::exception &e)
55  {
56  errorMsg += e.what();
57  LOG4CPLUS_FATAL( getApplicationLogger(), e.what() );
58  XCEPT_RAISE( stor::exception::Exception, e.what() );
59  }
60  catch(...)
61  {
62  errorMsg += "unknown exception";
63  LOG4CPLUS_FATAL( getApplicationLogger(), errorMsg );
64  XCEPT_RAISE( stor::exception::Exception, errorMsg );
65  }
66 
68 }
69 
70 
72 {
73  i2o::bind(this,
76  XDAQ_ORGANIZATION_ID);
77  i2o::bind(this,
80  XDAQ_ORGANIZATION_ID);
81  i2o::bind(this,
84  XDAQ_ORGANIZATION_ID);
85  i2o::bind(this,
87  I2O_SM_DQM,
88  XDAQ_ORGANIZATION_ID);
89  i2o::bind(this,
91  I2O_EVM_LUMISECTION,
92  XDAQ_ORGANIZATION_ID);
93 }
94 
95 
97 {
98  xoap::bind( this,
100  "Configure",
101  XDAQ_NS_URI );
102  xoap::bind( this,
104  "Enable",
105  XDAQ_NS_URI );
106  xoap::bind( this,
108  "Stop",
109  XDAQ_NS_URI );
110  xoap::bind( this,
112  "Halt",
113  XDAQ_NS_URI );
114  xoap::bind( this,
116  "EmergencyStop",
117  XDAQ_NS_URI );
118 }
119 
120 
122 {
123  xgi::bind(this,&StorageManager::css, "styles.css");
124  xgi::bind(this,&StorageManager::defaultWebPage, "Default");
125  xgi::bind(this,&StorageManager::inputWebPage, "input");
126  xgi::bind(this,&StorageManager::storedDataWebPage, "storedData");
127  xgi::bind(this,&StorageManager::rbsenderWebPage, "rbsenderlist");
128  xgi::bind(this,&StorageManager::rbsenderDetailWebPage, "rbsenderdetail");
129  xgi::bind(this,&StorageManager::fileStatisticsWebPage, "fileStatistics");
130  xgi::bind(this,&StorageManager::dqmEventStatisticsWebPage,"dqmEventStatistics");
131  xgi::bind(this,&StorageManager::consumerStatisticsPage, "consumerStatistics" );
132  xgi::bind(this,&StorageManager::consumerListWebPage, "consumerList");
133  xgi::bind(this,&StorageManager::throughputWebPage, "throughputStatistics");
134 }
135 
136 
138 {
139  // event consumers
140  xgi::bind( this, &StorageManager::processConsumerRegistrationRequest, "registerConsumer" );
141  xgi::bind( this, &StorageManager::processConsumerHeaderRequest, "getregdata" );
142  xgi::bind( this, &StorageManager::processConsumerEventRequest, "geteventdata" );
143 
144  // dqm event consumers
145  xgi::bind(this,&StorageManager::processDQMConsumerRegistrationRequest, "registerDQMConsumer");
146  xgi::bind(this,&StorageManager::processDQMConsumerEventRequest, "getDQMeventdata");
147 }
148 
149 
151 {
152  sharedResources_.reset(new SharedResources());
153 
154  xdata::InfoSpace *ispace = getApplicationInfoSpace();
155  unsigned long instance = getApplicationDescriptor()->getInstance();
156  sharedResources_->configuration_.reset(new Configuration(ispace, instance));
157 
158  QueueConfigurationParams queueParams =
159  sharedResources_->configuration_->getQueueConfigurationParams();
160  sharedResources_->commandQueue_.
161  reset(new CommandQueue(queueParams.commandQueueSize_));
162  sharedResources_->fragmentQueue_.
163  reset(new FragmentQueue(queueParams.fragmentQueueSize_, queueParams.fragmentQueueMemoryLimitMB_ * 1024*1024));
164  sharedResources_->registrationQueue_.
166  sharedResources_->streamQueue_.
167  reset(new StreamQueue(queueParams.streamQueueSize_, queueParams.streamQueueMemoryLimitMB_ * 1024*1024));
168  sharedResources_->dqmEventQueue_.
169  reset(new DQMEventQueue(queueParams.dqmEventQueueSize_, queueParams.dqmEventQueueMemoryLimitMB_ * 1024*1024));
170 
171  sharedResources_->alarmHandler_.reset( new AlarmHandler(this, sharedResources_) );
172  sharedResources_->statisticsReporter_.reset(
174  );
175  sharedResources_->initMsgCollection_.reset(new InitMsgCollection());
176  sharedResources_->diskWriterResources_.reset(new DiskWriterResources());
177  sharedResources_->dqmEventProcessorResources_.reset(new DQMEventProcessorResources());
178 
179  sharedResources_->statisticsReporter_->getThroughputMonitorCollection().setFragmentQueue(sharedResources_->fragmentQueue_);
180  sharedResources_->statisticsReporter_->getThroughputMonitorCollection().setStreamQueue(sharedResources_->streamQueue_);
181  sharedResources_->statisticsReporter_->getThroughputMonitorCollection().setDQMEventQueue(sharedResources_->dqmEventQueue_);
182 
184  discardManager_.reset(new DiscardManager(getApplicationContext(),
185  getApplicationDescriptor(),
186  sharedResources_->statisticsReporter_->
187  getDataSenderMonitorCollection()));
188 
189  sharedResources_->registrationCollection_.reset( new RegistrationCollection() );
191  sharedResources_->statisticsReporter_->getEventConsumerMonitorCollection();
192  sharedResources_->eventQueueCollection_.reset( new EventQueueCollection( ecmc ) );
193 
195  sharedResources_->statisticsReporter_->getDQMConsumerMonitorCollection();
196  sharedResources_->dqmEventQueueCollection_.reset( new DQMEventQueueCollection( dcmc ) );
197 
198  consumerUtils_.reset( new ConsumerUtils_t(
199  sharedResources_->configuration_,
200  sharedResources_->registrationCollection_,
201  sharedResources_->registrationQueue_,
202  sharedResources_->initMsgCollection_,
203  sharedResources_->eventQueueCollection_,
204  sharedResources_->dqmEventQueueCollection_,
205  sharedResources_->alarmHandler_
206  ) );
207 
209  getApplicationDescriptor(), sharedResources_));
210 
211 }
212 
213 
215 {
216 
217  // Start the workloops
218  try
219  {
221  diskWriter_.reset( new DiskWriter(this, sharedResources_) );
223  sharedResources_->statisticsReporter_->startWorkLoop("theStatisticsReporter");
224  fragmentProcessor_->startWorkLoop("theFragmentProcessor");
225  diskWriter_->startWorkLoop("theDiskWriter");
226  dqmEventProcessor_->startWorkLoop("theDQMEventProcessor");
227  }
228  catch(xcept::Exception &e)
229  {
230  sharedResources_->alarmHandler_->moveToFailedState( e );
231  }
232  catch(std::exception &e)
233  {
234  XCEPT_DECLARE(stor::exception::Exception,
235  sentinelException, e.what());
236  sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
237  }
238  catch(...)
239  {
240  std::string errorMsg = "Unknown exception when starting the workloops";
241  XCEPT_DECLARE(stor::exception::Exception,
242  sentinelException, errorMsg);
243  sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
244  }
245 }
246 
247 
249 // I2O call back functions //
251 
252 void StorageManager::receiveRegistryMessage(toolbox::mem::Reference *ref) throw (i2o::exception::Exception)
253 {
254  I2OChain i2oChain(ref);
255 
256  // Set the I2O message pool pointer. Only done for init messages.
257  ThroughputMonitorCollection& throughputMonCollection =
258  sharedResources_->statisticsReporter_->getThroughputMonitorCollection();
259  throughputMonCollection.setMemoryPoolPointer( ref->getBuffer()->getPool() );
260 
261  FragmentMonitorCollection& fragMonCollection =
262  sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
263  fragMonCollection.getAllFragmentSizeMQ().addSample(
264  static_cast<double>( i2oChain.totalDataSize() ) / 0x100000
265  );
266 
267  sharedResources_->fragmentQueue_->enqWait(i2oChain);
268 }
269 
270 
271 void StorageManager::receiveDataMessage(toolbox::mem::Reference *ref) throw (i2o::exception::Exception)
272 {
273  I2OChain i2oChain(ref);
274 
275  FragmentMonitorCollection& fragMonCollection =
276  sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
277  fragMonCollection.addEventFragmentSample( i2oChain.totalDataSize() );
278 
279  sharedResources_->fragmentQueue_->enqWait(i2oChain);
280 
281 #ifdef STOR_DEBUG_DUPLICATE_MESSAGES
282  double r = rand()/static_cast<double>(RAND_MAX);
283  if (r < 0.001)
284  {
285  LOG4CPLUS_INFO(this->getApplicationLogger(), "Simulating duplicated data message");
286  receiveDataMessage(ref->duplicate());
287  }
288 #endif
289 }
290 
291 
292 void StorageManager::receiveErrorDataMessage(toolbox::mem::Reference *ref) throw (i2o::exception::Exception)
293 {
294  I2OChain i2oChain(ref);
295 
296  FragmentMonitorCollection& fragMonCollection =
297  sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
298  fragMonCollection.addEventFragmentSample( i2oChain.totalDataSize() );
299 
300  sharedResources_->fragmentQueue_->enqWait(i2oChain);
301 }
302 
303 
304 void StorageManager::receiveDQMMessage(toolbox::mem::Reference *ref) throw (i2o::exception::Exception)
305 {
306  I2OChain i2oChain(ref);
307 
308  FragmentMonitorCollection& fragMonCollection =
309  sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
310  fragMonCollection.addDQMEventFragmentSample( i2oChain.totalDataSize() );
311 
312  sharedResources_->fragmentQueue_->enqWait(i2oChain);
313 }
314 
315 
317 {
318  I2OChain i2oChain( ref );
319 
320  FragmentMonitorCollection& fragMonCollection =
321  sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
322  fragMonCollection.addFragmentSample( i2oChain.totalDataSize() );
323 
324  RunMonitorCollection& runMonCollection =
325  sharedResources_->statisticsReporter_->getRunMonitorCollection();
326  runMonCollection.getEoLSSeenMQ().addSample( i2oChain.lumiSection() );
327 
328  sharedResources_->streamQueue_->enqWait( i2oChain );
329 }
330 
331 
333 // Web interface call back functions //
335 
338 {
339  smWebPageHelper_->css(in,out);
340 }
341 
342 
345 {
346  std::string errorMsg = "Failed to create the default webpage";
347 
348  try
349  {
350  smWebPageHelper_->defaultWebPage(out);
351  }
352  catch(std::exception &e)
353  {
354  errorMsg += ": ";
355  errorMsg += e.what();
356 
357  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
358  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
359  }
360  catch(...)
361  {
362  errorMsg += ": Unknown exception";
363 
364  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
365  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
366  }
367 }
368 
369 
372 {
373  std::string errorMsg = "Failed to create the I2O input webpage";
374 
375  try
376  {
377  smWebPageHelper_->inputWebPage(out);
378  }
379  catch(std::exception &e)
380  {
381  errorMsg += ": ";
382  errorMsg += e.what();
383 
384  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
385  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
386  }
387  catch(...)
388  {
389  errorMsg += ": Unknown exception";
390 
391  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
392  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
393  }
394 }
395 
396 
399 {
400  std::string errorMsg = "Failed to create the stored data webpage";
401 
402  try
403  {
404  smWebPageHelper_->storedDataWebPage(out);
405  }
406  catch(std::exception &e)
407  {
408  errorMsg += ": ";
409  errorMsg += e.what();
410 
411  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
412  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
413  }
414  catch(...)
415  {
416  errorMsg += ": Unknown exception";
417 
418  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
419  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
420  }
421 }
422 
423 
425  xgi::Output* out )
427 {
428 
429  std::string err_msg =
430  "Failed to create consumer statistics page";
431 
432  try
433  {
434  smWebPageHelper_->consumerStatistics(out);
435  }
436  catch( std::exception &e )
437  {
438  err_msg += ": ";
439  err_msg += e.what();
440  LOG4CPLUS_ERROR( getApplicationLogger(), err_msg );
441  XCEPT_RAISE( xgi::exception::Exception, err_msg );
442  }
443  catch(...)
444  {
445  err_msg += ": Unknown exception";
446  LOG4CPLUS_ERROR( getApplicationLogger(), err_msg );
447  XCEPT_RAISE( xgi::exception::Exception, err_msg );
448  }
449 
450 }
451 
452 
455 {
456  std::string errorMsg = "Failed to create the data sender webpage";
457 
458  try
459  {
460  smWebPageHelper_->resourceBrokerOverview(out);
461  }
462  catch(std::exception &e)
463  {
464  errorMsg += ": ";
465  errorMsg += e.what();
466 
467  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
468  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
469  }
470  catch(...)
471  {
472  errorMsg += ": Unknown exception";
473 
474  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
475  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
476  }
477 
478 }
479 
480 
483 {
484  std::string errorMsg = "Failed to create the data sender webpage";
485 
486  try
487  {
488  long long localRBID = 0;
489  cgicc::Cgicc cgiWrapper(in);
490  cgicc::const_form_iterator updateRef = cgiWrapper.getElement("id");
491  if (updateRef != cgiWrapper.getElements().end())
492  {
493  std::string idString = updateRef->getValue();
494  localRBID = boost::lexical_cast<long long>(idString);
495  }
496 
497  smWebPageHelper_->resourceBrokerDetail(out, localRBID);
498  }
499  catch(std::exception &e)
500  {
501  errorMsg += ": ";
502  errorMsg += e.what();
503 
504  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
505  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
506  }
507  catch(...)
508  {
509  errorMsg += ": Unknown exception";
510 
511  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
512  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
513  }
514 
515 }
516 
517 
520 {
521  std::string errorMsg = "Failed to create the file statistics webpage";
522 
523  try
524  {
525  smWebPageHelper_->filesWebPage(out);
526  }
527  catch(std::exception &e)
528  {
529  errorMsg += ": ";
530  errorMsg += e.what();
531 
532  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
533  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
534  }
535  catch(...)
536  {
537  errorMsg += ": Unknown exception";
538 
539  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
540  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
541  }
542 
543 }
544 
545 
548 {
549  std::string errorMsg = "Failed to create the DQM event statistics webpage";
550 
551  try
552  {
553  smWebPageHelper_->dqmEventWebPage(out);
554  }
555  catch(std::exception &e)
556  {
557  errorMsg += ": ";
558  errorMsg += e.what();
559 
560  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
561  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
562  }
563  catch(...)
564  {
565  errorMsg += ": Unknown exception";
566 
567  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
568  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
569  }
570 }
571 
572 
575 {
576  std::string errorMsg = "Failed to create the throughput statistics webpage";
577 
578  try
579  {
580  smWebPageHelper_->throughputWebPage(out);
581  }
582  catch(std::exception &e)
583  {
584  errorMsg += ": ";
585  errorMsg += e.what();
586 
587  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
588  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
589  }
590  catch(...)
591  {
592  errorMsg += ": Unknown exception";
593 
594  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
595  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
596  }
597 
598 }
599 
600 
601 // Leaving in for now but making it return an empty buffer:
604 {
605  out->getHTTPResponseHeader().addHeader( "Content-Type",
606  "application/octet-stream" );
607  out->getHTTPResponseHeader().addHeader( "Content-Transfer-Encoding",
608  "binary" );
609  char buff;
610  out->write( &buff, 0 );
611 }
612 
613 
615 // State Machine call back functions //
617 
618 xoap::MessageReference StorageManager::handleFSMSoapMessage( xoap::MessageReference msg )
620 {
621  std::string errorMsg;
622  xoap::MessageReference returnMsg;
623 
624  try {
625  errorMsg = "Failed to extract FSM event and parameters from SOAP message: ";
627 
628  errorMsg = "Failed to put a '" + command + "' state machine event into command queue: ";
629  if (command == "Configure")
630  {
631  sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Configure() ) );
632  }
633  else if (command == "Enable")
634  {
635  if (sharedResources_->configuration_->streamConfigurationHasChanged())
636  {
637  sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Reconfigure() ) );
638  }
639  sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Enable() ) );
640  }
641  else if (command == "Stop")
642  {
643  sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Stop() ) );
644  }
645  else if (command == "Halt")
646  {
647  sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Halt() ) );
648  }
649  else if (command == "EmergencyStop")
650  {
651  sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::EmergencyStop() ) );
652  }
653  else
654  {
655  XCEPT_RAISE(stor::exception::StateMachine,
656  "Received an unknown state machine event '" + command + "'.");
657  }
658 
659  errorMsg = "Failed to create FSM SOAP reply message: ";
660  returnMsg = soaputils::createFsmSoapResponseMsg(command,
661  sharedResources_->statisticsReporter_->
662  getStateMachineMonitorCollection().externallyVisibleState());
663  }
664  catch (cms::Exception& e) {
665  errorMsg += e.explainSelf();
666  XCEPT_DECLARE(xoap::exception::Exception,
667  sentinelException, errorMsg);
668  sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
669  throw sentinelException;
670  }
671  catch (xcept::Exception &e) {
672  XCEPT_DECLARE_NESTED(xoap::exception::Exception,
673  sentinelException, errorMsg, e);
674  sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
675  throw sentinelException;
676  }
677  catch (std::exception& e) {
678  errorMsg += e.what();
679  XCEPT_DECLARE(xoap::exception::Exception,
680  sentinelException, errorMsg);
681  sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
682  throw sentinelException;
683  }
684  catch (...) {
685  errorMsg += "Unknown exception";
686  XCEPT_DECLARE(xoap::exception::Exception,
687  sentinelException, errorMsg);
688  sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
689  throw sentinelException;
690  }
691 
692  return returnMsg;
693 }
694 
695 
699 
700 void
703 {
704  consumerUtils_->processConsumerRegistrationRequest(in,out);
705 }
706 
707 
708 void
711 {
712  consumerUtils_->processConsumerHeaderRequest(in,out);
713 }
714 
715 
716 void
719 {
720  consumerUtils_->processConsumerEventRequest(in,out);
721 }
722 
723 
724 void
727 {
728  consumerUtils_->processDQMConsumerRegistrationRequest(in,out);
729 }
730 
731 
732 void
735 {
736  consumerUtils_->processDQMConsumerEventRequest(in,out);
737 }
738 
739 namespace stor {
741  // Specialization for ConsumerUtils //
743  template<>
744  void
747  {
748  writeHTTPHeaders( out );
749 
750  #ifdef STOR_DEBUG_CORRUPTED_EVENT_HEADER
751  double r = rand()/static_cast<double>(RAND_MAX);
752  if (r < 0.1)
753  {
754  std::cout << "Simulating corrupted event header" << std::endl;
756  h->protocolVersion_ = 1;
757  }
758  #endif // STOR_DEBUG_CORRUPTED_EVENT_HEADER
759 
760  const unsigned int nfrags = evt.fragmentCount();
761  for ( unsigned int i = 0; i < nfrags; ++i )
762  {
763  const unsigned long len = evt.dataSize( i );
764  unsigned char* location = evt.dataLocation( i );
765  out->write( (char*)location, len );
766  }
767  }
768 }
769 
770 
772 // *** Provides factory method for the instantiation of SM applications //
774 // This macro is depreciated:
775 XDAQ_INSTANTIATE(StorageManager)
776 
777 // One should use the XDAQ_INSTANTIATOR() in the header file
778 // and this one here. But this breaks the backward compatibility,
779 // as all xml configuration files would have to be changed to use
780 // 'stor::StorageManager' instead of 'StorageManager'.
781 // XDAQ_INSTANTIATOR_IMPL(stor::StorageManager)
782 
783 
784 
const MonitoredQuantity & getAllFragmentSizeMQ() const
void processConsumerRegistrationRequest(xgi::Input *in, xgi::Output *out)
unsigned int fragmentCount() const
Definition: I2OChain.cc:284
int i
Definition: DBlmapReader.cc:9
#define I2O_SM_ERROR
Definition: i2oEvfMsgs.h:23
#define Input(cl)
Definition: vmac.h:189
#define I2O_SM_DQM
Definition: i2oEvfMsgs.h:25
unsigned int dqmEventQueueMemoryLimitMB_
Definition: Configuration.h:91
boost::scoped_ptr< DQMEventProcessor > dqmEventProcessor_
void receiveEndOfLumiSectionMessage(toolbox::mem::Reference *ref)
boost::scoped_ptr< DiskWriter > diskWriter_
#define I2O_SM_PREAMBLE
Definition: i2oEvfMsgs.h:21
static PFTauRenderPlugin instance
void inputWebPage(xgi::Input *in, xgi::Output *out)
virtual std::string explainSelf() const
Definition: Exception.cc:146
void receiveDataMessage(toolbox::mem::Reference *ref)
void addSample(const double &value=1)
ConsumerUtils< Configuration, EventQueueCollection > ConsumerUtils_t
unsigned int fragmentQueueMemoryLimitMB_
Definition: Configuration.h:93
ConcurrentQueue< RegPtr > RegistrationQueue
void consumerStatisticsPage(xgi::Input *in, xgi::Output *out)
unsigned int streamQueueMemoryLimitMB_
Definition: Configuration.h:96
SharedResourcesPtr sharedResources_
boost::shared_ptr< boost::statechart::event_base > EventPtr_t
Definition: CommandQueue.h:21
boost::scoped_ptr< FragmentProcessor > fragmentProcessor_
void writeConsumerEvent(xgi::Output *, const stor::I2OChain &) const
void consumerListWebPage(xgi::Input *in, xgi::Output *out)
xoap::MessageReference createFsmSoapResponseMsg(const std::string commandName, const std::string currentState)
Definition: SoapUtils.cc:38
ConcurrentQueue< I2OChain, KeepNewest< I2OChain > > DQMEventQueue
Definition: DQMEventQueue.h:22
ConcurrentQueue< I2OChain > StreamQueue
Definition: StreamQueue.h:21
void processConsumerHeaderRequest(xgi::Input *in, xgi::Output *out)
uint32_t lumiSection() const
Definition: I2OChain.cc:595
void fileStatisticsWebPage(xgi::Input *in, xgi::Output *out)
void rbsenderDetailWebPage(xgi::Input *in, xgi::Output *out)
uint8 protocolVersion_
Definition: EventMessage.h:60
void storedDataWebPage(xgi::Input *in, xgi::Output *out)
void setMemoryPoolPointer(toolbox::mem::Pool *)
const MonitoredQuantity & getEoLSSeenMQ() const
void css(xgi::Input *in, xgi::Output *out)
void dqmEventStatisticsWebPage(xgi::Input *in, xgi::Output *out)
tuple out
Definition: dbtoconf.py:99
void processDQMConsumerEventRequest(xgi::Input *in, xgi::Output *out)
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
ConcurrentQueue< EventPtr_t > CommandQueue
Definition: CommandQueue.h:22
QueueCollection< I2OChain > EventQueueCollection
unsigned long totalDataSize() const
Definition: I2OChain.cc:432
QueueCollection< DQMTopLevelFolder::Record > DQMEventQueueCollection
void receiveErrorDataMessage(toolbox::mem::Reference *ref)
#define Output(cl)
Definition: vmac.h:193
void rbsenderWebPage(xgi::Input *in, xgi::Output *out)
void processDQMConsumerRegistrationRequest(xgi::Input *in, xgi::Output *out)
Signal rand(Signal arg)
Definition: vlib.cc:442
unsigned long dataSize(int fragmentIndex) const
Definition: I2OChain.cc:438
std::string extractParameters(xoap::MessageReference, xdaq::Application *)
Definition: SoapUtils.cc:25
void receiveRegistryMessage(toolbox::mem::Reference *ref)
tuple cout
Definition: gather_cfg.py:121
void defaultWebPage(xgi::Input *in, xgi::Output *out)
void processConsumerEventRequest(xgi::Input *in, xgi::Output *out)
#define I2O_SM_DATA
Definition: i2oEvfMsgs.h:22
void throughputWebPage(xgi::Input *in, xgi::Output *out)
xoap::MessageReference handleFSMSoapMessage(xoap::MessageReference)
void addDQMEventFragmentSample(const double bytecount)
void reset(double vett[256])
Definition: TPedValues.cc:11
ConcurrentQueue< I2OChain > FragmentQueue
Definition: FragmentQueue.h:21
boost::scoped_ptr< ConsumerUtils_t > consumerUtils_
void addEventFragmentSample(const double bytecount)
boost::scoped_ptr< SMWebPageHelper > smWebPageHelper_
void receiveDQMMessage(toolbox::mem::Reference *ref)
void addFragmentSample(const double bytecount)
unsigned char * dataLocation(int fragmentIndex) const
Definition: I2OChain.cc:444