CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
StorageManager.cc
Go to the documentation of this file.
1 // $Id: StorageManager.cc,v 1.138 2011/11/17 17:35:40 mommsen Exp $
3 
15 #include "EventFilter/StorageManager/src/ConsumerUtils.icc"
16 
18 
20 
21 #include "i2o/Method.h"
22 #include "interface/shared/version.h"
23 #include "interface/shared/i2oXFunctionCodes.h"
24 #include "xcept/tools.h"
25 #include "xdaq/NamespaceURI.h"
26 #include "xdata/InfoSpaceFactory.h"
27 #include "xgi/Method.h"
28 #include "xoap/Method.h"
29 
30 #include <boost/lexical_cast.hpp>
31 #include <boost/shared_ptr.hpp>
32 
33 #include <cstdlib>
34 
35 using namespace std;
36 using namespace stor;
37 
38 
39 StorageManager::StorageManager(xdaq::ApplicationStub * s) :
40  xdaq::Application(s)
41 {
42  LOG4CPLUS_INFO(this->getApplicationLogger(),"Making StorageManager");
43 
44  // bind all callback functions
49 
50  std::string errorMsg = "Exception in StorageManager constructor: ";
51  try
52  {
54  }
55  catch(std::exception &e)
56  {
57  errorMsg += e.what();
58  LOG4CPLUS_FATAL( getApplicationLogger(), e.what() );
59  XCEPT_RAISE( stor::exception::Exception, e.what() );
60  }
61  catch(...)
62  {
63  errorMsg += "unknown exception";
64  LOG4CPLUS_FATAL( getApplicationLogger(), errorMsg );
65  XCEPT_RAISE( stor::exception::Exception, errorMsg );
66  }
67 
69 }
70 
71 
73 {
74  i2o::bind(this,
77  XDAQ_ORGANIZATION_ID);
78  i2o::bind(this,
81  XDAQ_ORGANIZATION_ID);
82  i2o::bind(this,
85  XDAQ_ORGANIZATION_ID);
86  i2o::bind(this,
88  I2O_SM_DQM,
89  XDAQ_ORGANIZATION_ID);
90  i2o::bind(this,
92  I2O_EVM_LUMISECTION,
93  XDAQ_ORGANIZATION_ID);
94 }
95 
96 
98 {
99  xoap::bind( this,
101  "Configure",
102  XDAQ_NS_URI );
103  xoap::bind( this,
105  "Enable",
106  XDAQ_NS_URI );
107  xoap::bind( this,
109  "Stop",
110  XDAQ_NS_URI );
111  xoap::bind( this,
113  "Halt",
114  XDAQ_NS_URI );
115  xoap::bind( this,
117  "EmergencyStop",
118  XDAQ_NS_URI );
119 }
120 
121 
123 {
124  xgi::bind(this,&StorageManager::css, "styles.css");
125  xgi::bind(this,&StorageManager::defaultWebPage, "Default");
126  xgi::bind(this,&StorageManager::inputWebPage, "input");
127  xgi::bind(this,&StorageManager::storedDataWebPage, "storedData");
128  xgi::bind(this,&StorageManager::rbsenderWebPage, "rbsenderlist");
129  xgi::bind(this,&StorageManager::rbsenderDetailWebPage, "rbsenderdetail");
130  xgi::bind(this,&StorageManager::fileStatisticsWebPage, "fileStatistics");
131  xgi::bind(this,&StorageManager::dqmEventStatisticsWebPage,"dqmEventStatistics");
132  xgi::bind(this,&StorageManager::consumerStatisticsPage, "consumerStatistics" );
133  xgi::bind(this,&StorageManager::consumerListWebPage, "consumerList");
134  xgi::bind(this,&StorageManager::throughputWebPage, "throughputStatistics");
135 }
136 
137 
139 {
140  // event consumers
141  xgi::bind( this, &StorageManager::processConsumerRegistrationRequest, "registerConsumer" );
142  xgi::bind( this, &StorageManager::processConsumerHeaderRequest, "getregdata" );
143  xgi::bind( this, &StorageManager::processConsumerEventRequest, "geteventdata" );
144 
145  // dqm event consumers
146  xgi::bind(this,&StorageManager::processDQMConsumerRegistrationRequest, "registerDQMConsumer");
147  xgi::bind(this,&StorageManager::processDQMConsumerEventRequest, "getDQMeventdata");
148 }
149 
150 
152 {
153  sharedResources_.reset(new SharedResources());
154 
155  xdata::InfoSpace *ispace = getApplicationInfoSpace();
156  unsigned long instance = getApplicationDescriptor()->getInstance();
157  sharedResources_->configuration_.reset(new Configuration(ispace, instance));
158 
159  QueueConfigurationParams queueParams =
160  sharedResources_->configuration_->getQueueConfigurationParams();
161  sharedResources_->commandQueue_.
162  reset(new CommandQueue(queueParams.commandQueueSize_));
163  sharedResources_->fragmentQueue_.
164  reset(new FragmentQueue(queueParams.fragmentQueueSize_, queueParams.fragmentQueueMemoryLimitMB_ * 1024*1024));
165  sharedResources_->registrationQueue_.
167  sharedResources_->streamQueue_.
168  reset(new StreamQueue(queueParams.streamQueueSize_, queueParams.streamQueueMemoryLimitMB_ * 1024*1024));
169  sharedResources_->dqmEventQueue_.
170  reset(new DQMEventQueue(queueParams.dqmEventQueueSize_, queueParams.dqmEventQueueMemoryLimitMB_ * 1024*1024));
171 
172  sharedResources_->alarmHandler_.reset( new AlarmHandler(this, sharedResources_) );
173  sharedResources_->statisticsReporter_.reset(
175  );
176  sharedResources_->initMsgCollection_.reset(new InitMsgCollection());
177  sharedResources_->diskWriterResources_.reset(new DiskWriterResources());
178  sharedResources_->dqmEventProcessorResources_.reset(new DQMEventProcessorResources());
179 
180  sharedResources_->statisticsReporter_->getThroughputMonitorCollection().setFragmentQueue(sharedResources_->fragmentQueue_);
181  sharedResources_->statisticsReporter_->getThroughputMonitorCollection().setStreamQueue(sharedResources_->streamQueue_);
182  sharedResources_->statisticsReporter_->getThroughputMonitorCollection().setDQMEventQueue(sharedResources_->dqmEventQueue_);
183 
185  discardManager_.reset(new DiscardManager(getApplicationContext(),
186  getApplicationDescriptor(),
187  sharedResources_->statisticsReporter_->
188  getDataSenderMonitorCollection()));
189 
190  sharedResources_->registrationCollection_.reset( new RegistrationCollection() );
192  sharedResources_->statisticsReporter_->getEventConsumerMonitorCollection();
193  sharedResources_->eventQueueCollection_.reset( new EventQueueCollection( ecmc ) );
194 
196  sharedResources_->statisticsReporter_->getDQMConsumerMonitorCollection();
197  sharedResources_->dqmEventQueueCollection_.reset( new DQMEventQueueCollection( dcmc ) );
198 
199  consumerUtils_.reset( new ConsumerUtils_t(
200  sharedResources_->configuration_,
201  sharedResources_->registrationCollection_,
202  sharedResources_->registrationQueue_,
203  sharedResources_->initMsgCollection_,
204  sharedResources_->eventQueueCollection_,
205  sharedResources_->dqmEventQueueCollection_,
206  sharedResources_->alarmHandler_
207  ) );
208 
210  getApplicationDescriptor(), sharedResources_));
211 
212 }
213 
214 
216 {
217 
218  // Start the workloops
219  try
220  {
222  diskWriter_.reset( new DiskWriter(this, sharedResources_) );
224  sharedResources_->statisticsReporter_->startWorkLoop("theStatisticsReporter");
225  fragmentProcessor_->startWorkLoop("theFragmentProcessor");
226  diskWriter_->startWorkLoop("theDiskWriter");
227  dqmEventProcessor_->startWorkLoop("theDQMEventProcessor");
228  }
229  catch(xcept::Exception &e)
230  {
231  sharedResources_->alarmHandler_->moveToFailedState( e );
232  }
233  catch(std::exception &e)
234  {
235  XCEPT_DECLARE(stor::exception::Exception,
236  sentinelException, e.what());
237  sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
238  }
239  catch(...)
240  {
241  std::string errorMsg = "Unknown exception when starting the workloops";
242  XCEPT_DECLARE(stor::exception::Exception,
243  sentinelException, errorMsg);
244  sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
245  }
246 }
247 
248 
250 // I2O call back functions //
252 
253 void StorageManager::receiveRegistryMessage(toolbox::mem::Reference *ref)
254 {
255  I2OChain i2oChain(ref);
256 
257  // Set the I2O message pool pointer. Only done for init messages.
258  ThroughputMonitorCollection& throughputMonCollection =
259  sharedResources_->statisticsReporter_->getThroughputMonitorCollection();
260  throughputMonCollection.setMemoryPoolPointer( ref->getBuffer()->getPool() );
261 
262  FragmentMonitorCollection& fragMonCollection =
263  sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
264  fragMonCollection.getAllFragmentSizeMQ().addSample(
265  static_cast<double>( i2oChain.totalDataSize() ) / 0x100000
266  );
267 
268  sharedResources_->fragmentQueue_->enqWait(i2oChain);
269 }
270 
271 
272 void StorageManager::receiveDataMessage(toolbox::mem::Reference *ref)
273 {
274  I2OChain i2oChain(ref);
275 
276  FragmentMonitorCollection& fragMonCollection =
277  sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
278  fragMonCollection.addEventFragmentSample( i2oChain.totalDataSize() );
279 
280  sharedResources_->fragmentQueue_->enqWait(i2oChain);
281 
282 #ifdef STOR_DEBUG_DUPLICATE_MESSAGES
283  double r = rand()/static_cast<double>(RAND_MAX);
284  if (r < 0.001)
285  {
286  LOG4CPLUS_INFO(this->getApplicationLogger(), "Simulating duplicated data message");
287  receiveDataMessage(ref->duplicate());
288  }
289 #endif
290 }
291 
292 
293 void StorageManager::receiveErrorDataMessage(toolbox::mem::Reference *ref)
294 {
295  I2OChain i2oChain(ref);
296 
297  FragmentMonitorCollection& fragMonCollection =
298  sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
299  fragMonCollection.addEventFragmentSample( i2oChain.totalDataSize() );
300 
301  sharedResources_->fragmentQueue_->enqWait(i2oChain);
302 }
303 
304 
305 void StorageManager::receiveDQMMessage(toolbox::mem::Reference *ref)
306 {
307  I2OChain i2oChain(ref);
308 
309  FragmentMonitorCollection& fragMonCollection =
310  sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
311  fragMonCollection.addDQMEventFragmentSample( i2oChain.totalDataSize() );
312 
313  sharedResources_->fragmentQueue_->enqWait(i2oChain);
314 }
315 
316 
317 void StorageManager::receiveEndOfLumiSectionMessage(toolbox::mem::Reference *ref)
318 {
319  I2OChain i2oChain( ref );
320 
321  FragmentMonitorCollection& fragMonCollection =
322  sharedResources_->statisticsReporter_->getFragmentMonitorCollection();
323  fragMonCollection.addFragmentSample( i2oChain.totalDataSize() );
324 
325  RunMonitorCollection& runMonCollection =
326  sharedResources_->statisticsReporter_->getRunMonitorCollection();
327  runMonCollection.getEoLSSeenMQ().addSample( i2oChain.lumiSection() );
328 
329  sharedResources_->streamQueue_->enqWait( i2oChain );
330 }
331 
332 
334 // Web interface call back functions //
336 
339 {
340  smWebPageHelper_->css(in,out);
341 }
342 
343 
346 {
347  std::string errorMsg = "Failed to create the default webpage";
348 
349  try
350  {
351  smWebPageHelper_->defaultWebPage(out);
352  }
353  catch(std::exception &e)
354  {
355  errorMsg += ": ";
356  errorMsg += e.what();
357 
358  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
359  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
360  }
361  catch(...)
362  {
363  errorMsg += ": Unknown exception";
364 
365  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
366  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
367  }
368 }
369 
370 
373 {
374  std::string errorMsg = "Failed to create the I2O input webpage";
375 
376  try
377  {
378  smWebPageHelper_->inputWebPage(out);
379  }
380  catch(std::exception &e)
381  {
382  errorMsg += ": ";
383  errorMsg += e.what();
384 
385  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
386  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
387  }
388  catch(...)
389  {
390  errorMsg += ": Unknown exception";
391 
392  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
393  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
394  }
395 }
396 
397 
400 {
401  std::string errorMsg = "Failed to create the stored data webpage";
402 
403  try
404  {
405  smWebPageHelper_->storedDataWebPage(out);
406  }
407  catch(std::exception &e)
408  {
409  errorMsg += ": ";
410  errorMsg += e.what();
411 
412  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
413  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
414  }
415  catch(...)
416  {
417  errorMsg += ": Unknown exception";
418 
419  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
420  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
421  }
422 }
423 
424 
426  xgi::Output* out )
428 {
429 
430  std::string err_msg =
431  "Failed to create consumer statistics page";
432 
433  try
434  {
435  smWebPageHelper_->consumerStatistics(out);
436  }
437  catch( std::exception &e )
438  {
439  err_msg += ": ";
440  err_msg += e.what();
441  LOG4CPLUS_ERROR( getApplicationLogger(), err_msg );
442  XCEPT_RAISE( xgi::exception::Exception, err_msg );
443  }
444  catch(...)
445  {
446  err_msg += ": Unknown exception";
447  LOG4CPLUS_ERROR( getApplicationLogger(), err_msg );
448  XCEPT_RAISE( xgi::exception::Exception, err_msg );
449  }
450 
451 }
452 
453 
456 {
457  std::string errorMsg = "Failed to create the data sender webpage";
458 
459  try
460  {
461  smWebPageHelper_->resourceBrokerOverview(out);
462  }
463  catch(std::exception &e)
464  {
465  errorMsg += ": ";
466  errorMsg += e.what();
467 
468  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
469  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
470  }
471  catch(...)
472  {
473  errorMsg += ": Unknown exception";
474 
475  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
476  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
477  }
478 
479 }
480 
481 
484 {
485  std::string errorMsg = "Failed to create the data sender webpage";
486 
487  try
488  {
489  long long localRBID = 0;
490  cgicc::Cgicc cgiWrapper(in);
491  cgicc::const_form_iterator updateRef = cgiWrapper.getElement("id");
492  if (updateRef != cgiWrapper.getElements().end())
493  {
494  std::string idString = updateRef->getValue();
495  localRBID = boost::lexical_cast<long long>(idString);
496  }
497 
498  smWebPageHelper_->resourceBrokerDetail(out, localRBID);
499  }
500  catch(std::exception &e)
501  {
502  errorMsg += ": ";
503  errorMsg += e.what();
504 
505  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
506  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
507  }
508  catch(...)
509  {
510  errorMsg += ": Unknown exception";
511 
512  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
513  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
514  }
515 
516 }
517 
518 
521 {
522  std::string errorMsg = "Failed to create the file statistics webpage";
523 
524  try
525  {
526  smWebPageHelper_->filesWebPage(out);
527  }
528  catch(std::exception &e)
529  {
530  errorMsg += ": ";
531  errorMsg += e.what();
532 
533  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
534  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
535  }
536  catch(...)
537  {
538  errorMsg += ": Unknown exception";
539 
540  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
541  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
542  }
543 
544 }
545 
546 
549 {
550  std::string errorMsg = "Failed to create the DQM event statistics webpage";
551 
552  try
553  {
554  smWebPageHelper_->dqmEventWebPage(out);
555  }
556  catch(std::exception &e)
557  {
558  errorMsg += ": ";
559  errorMsg += e.what();
560 
561  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
562  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
563  }
564  catch(...)
565  {
566  errorMsg += ": Unknown exception";
567 
568  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
569  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
570  }
571 }
572 
573 
576 {
577  std::string errorMsg = "Failed to create the throughput statistics webpage";
578 
579  try
580  {
581  smWebPageHelper_->throughputWebPage(out);
582  }
583  catch(std::exception &e)
584  {
585  errorMsg += ": ";
586  errorMsg += e.what();
587 
588  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
589  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
590  }
591  catch(...)
592  {
593  errorMsg += ": Unknown exception";
594 
595  LOG4CPLUS_ERROR(getApplicationLogger(), errorMsg);
596  XCEPT_RAISE(xgi::exception::Exception, errorMsg);
597  }
598 
599 }
600 
601 
602 // Leaving in for now but making it return an empty buffer:
605 {
606  out->getHTTPResponseHeader().addHeader( "Content-Type",
607  "application/octet-stream" );
608  out->getHTTPResponseHeader().addHeader( "Content-Transfer-Encoding",
609  "binary" );
610  char buff;
611  out->write( &buff, 0 );
612 }
613 
614 
616 // State Machine call back functions //
618 
619 xoap::MessageReference StorageManager::handleFSMSoapMessage( xoap::MessageReference msg )
621 {
622  std::string errorMsg;
623  xoap::MessageReference returnMsg;
624 
625  try {
626  errorMsg = "Failed to extract FSM event and parameters from SOAP message: ";
627  std::string command = soaputils::extractParameters(msg, this);
628 
629  errorMsg = "Failed to put a '" + command + "' state machine event into command queue: ";
630  if (command == "Configure")
631  {
632  sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Configure() ) );
633  }
634  else if (command == "Enable")
635  {
636  if (sharedResources_->configuration_->streamConfigurationHasChanged())
637  {
638  sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Reconfigure() ) );
639  }
640  sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Enable() ) );
641  }
642  else if (command == "Stop")
643  {
644  sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Stop() ) );
645  }
646  else if (command == "Halt")
647  {
648  sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::Halt() ) );
649  }
650  else if (command == "EmergencyStop")
651  {
652  sharedResources_->commandQueue_->enqWait( stor::EventPtr_t( new stor::EmergencyStop() ) );
653  }
654  else
655  {
656  XCEPT_RAISE(stor::exception::StateMachine,
657  "Received an unknown state machine event '" + command + "'.");
658  }
659 
660  errorMsg = "Failed to create FSM SOAP reply message: ";
661  returnMsg = soaputils::createFsmSoapResponseMsg(command,
662  sharedResources_->statisticsReporter_->
663  getStateMachineMonitorCollection().externallyVisibleState());
664  }
665  catch (cms::Exception& e) {
666  errorMsg += e.explainSelf();
667  XCEPT_DECLARE(xoap::exception::Exception,
668  sentinelException, errorMsg);
669  sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
670  throw sentinelException;
671  }
672  catch (xcept::Exception &e) {
673  XCEPT_DECLARE_NESTED(xoap::exception::Exception,
674  sentinelException, errorMsg, e);
675  sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
676  throw sentinelException;
677  }
678  catch (std::exception& e) {
679  errorMsg += e.what();
680  XCEPT_DECLARE(xoap::exception::Exception,
681  sentinelException, errorMsg);
682  sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
683  throw sentinelException;
684  }
685  catch (...) {
686  errorMsg += "Unknown exception";
687  XCEPT_DECLARE(xoap::exception::Exception,
688  sentinelException, errorMsg);
689  sharedResources_->alarmHandler_->moveToFailedState( sentinelException );
690  throw sentinelException;
691  }
692 
693  return returnMsg;
694 }
695 
696 
700 
701 void
704 {
705  consumerUtils_->processConsumerRegistrationRequest(in,out);
706 }
707 
708 
709 void
712 {
713  consumerUtils_->processConsumerHeaderRequest(in,out);
714 }
715 
716 
717 void
720 {
721  consumerUtils_->processConsumerEventRequest(in,out);
722 }
723 
724 
725 void
728 {
729  consumerUtils_->processDQMConsumerRegistrationRequest(in,out);
730 }
731 
732 
733 void
736 {
737  consumerUtils_->processDQMConsumerEventRequest(in,out);
738 }
739 
740 namespace stor {
742  // Specialization for ConsumerUtils //
744  template<>
745  void
748  {
749  writeHTTPHeaders( out );
750 
751  #ifdef STOR_DEBUG_CORRUPTED_EVENT_HEADER
752  double r = rand()/static_cast<double>(RAND_MAX);
753  if (r < 0.1)
754  {
755  std::cout << "Simulating corrupted event header" << std::endl;
757  h->protocolVersion_ = 1;
758  }
759  #endif // STOR_DEBUG_CORRUPTED_EVENT_HEADER
760 
761  const unsigned int nfrags = evt.fragmentCount();
762  for ( unsigned int i = 0; i < nfrags; ++i )
763  {
764  const unsigned long len = evt.dataSize( i );
765  unsigned char* location = evt.dataLocation( i );
766  out->write( (char*)location, len );
767  }
768  }
769 }
770 
771 
773 // *** Provides factory method for the instantiation of SM applications //
775 // This macro is depreciated:
776 XDAQ_INSTANTIATE(StorageManager)
777 
778 // One should use the XDAQ_INSTANTIATOR() in the header file
779 // and this one here. But this breaks the backward compatibility,
780 // as all xml configuration files would have to be changed to use
781 // 'stor::StorageManager' instead of 'StorageManager'.
782 // XDAQ_INSTANTIATOR_IMPL(stor::StorageManager)
783 
784 
785 
const MonitoredQuantity & getAllFragmentSizeMQ() const
void processConsumerRegistrationRequest(xgi::Input *in, xgi::Output *out)
unsigned int fragmentCount() const
Definition: I2OChain.cc:284
int i
Definition: DBlmapReader.cc:9
#define I2O_SM_ERROR
Definition: i2oEvfMsgs.h:23
#define Input(cl)
Definition: vmac.h:189
#define I2O_SM_DQM
Definition: i2oEvfMsgs.h:25
unsigned int dqmEventQueueMemoryLimitMB_
Definition: Configuration.h:91
boost::scoped_ptr< DQMEventProcessor > dqmEventProcessor_
void receiveEndOfLumiSectionMessage(toolbox::mem::Reference *ref)
boost::scoped_ptr< DiskWriter > diskWriter_
#define I2O_SM_PREAMBLE
Definition: i2oEvfMsgs.h:21
static PFTauRenderPlugin instance
void inputWebPage(xgi::Input *in, xgi::Output *out)
virtual std::string explainSelf() const
Definition: Exception.cc:146
void receiveDataMessage(toolbox::mem::Reference *ref)
void addSample(const double &value=1)
ConsumerUtils< Configuration, EventQueueCollection > ConsumerUtils_t
unsigned int fragmentQueueMemoryLimitMB_
Definition: Configuration.h:93
ConcurrentQueue< RegPtr > RegistrationQueue
void consumerStatisticsPage(xgi::Input *in, xgi::Output *out)
unsigned int streamQueueMemoryLimitMB_
Definition: Configuration.h:96
SharedResourcesPtr sharedResources_
boost::shared_ptr< boost::statechart::event_base > EventPtr_t
Definition: CommandQueue.h:21
boost::scoped_ptr< FragmentProcessor > fragmentProcessor_
void writeConsumerEvent(xgi::Output *, const stor::I2OChain &) const
void consumerListWebPage(xgi::Input *in, xgi::Output *out)
xoap::MessageReference createFsmSoapResponseMsg(const std::string commandName, const std::string currentState)
Definition: SoapUtils.cc:38
ConcurrentQueue< I2OChain, KeepNewest< I2OChain > > DQMEventQueue
Definition: DQMEventQueue.h:22
ConcurrentQueue< I2OChain > StreamQueue
Definition: StreamQueue.h:21
void processConsumerHeaderRequest(xgi::Input *in, xgi::Output *out)
uint32_t lumiSection() const
Definition: I2OChain.cc:595
void fileStatisticsWebPage(xgi::Input *in, xgi::Output *out)
void rbsenderDetailWebPage(xgi::Input *in, xgi::Output *out)
uint8 protocolVersion_
Definition: EventMessage.h:60
void storedDataWebPage(xgi::Input *in, xgi::Output *out)
void setMemoryPoolPointer(toolbox::mem::Pool *)
const MonitoredQuantity & getEoLSSeenMQ() const
void css(xgi::Input *in, xgi::Output *out)
void dqmEventStatisticsWebPage(xgi::Input *in, xgi::Output *out)
tuple out
Definition: dbtoconf.py:99
void processDQMConsumerEventRequest(xgi::Input *in, xgi::Output *out)
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
ConcurrentQueue< EventPtr_t > CommandQueue
Definition: CommandQueue.h:22
QueueCollection< I2OChain > EventQueueCollection
unsigned long totalDataSize() const
Definition: I2OChain.cc:432
QueueCollection< DQMTopLevelFolder::Record > DQMEventQueueCollection
void receiveErrorDataMessage(toolbox::mem::Reference *ref)
#define Output(cl)
Definition: vmac.h:193
void rbsenderWebPage(xgi::Input *in, xgi::Output *out)
void processDQMConsumerRegistrationRequest(xgi::Input *in, xgi::Output *out)
Signal rand(Signal arg)
Definition: vlib.cc:442
unsigned long dataSize(int fragmentIndex) const
Definition: I2OChain.cc:438
std::string extractParameters(xoap::MessageReference, xdaq::Application *)
Definition: SoapUtils.cc:25
void receiveRegistryMessage(toolbox::mem::Reference *ref)
tuple cout
Definition: gather_cfg.py:121
void defaultWebPage(xgi::Input *in, xgi::Output *out)
void processConsumerEventRequest(xgi::Input *in, xgi::Output *out)
#define I2O_SM_DATA
Definition: i2oEvfMsgs.h:22
void throughputWebPage(xgi::Input *in, xgi::Output *out)
xoap::MessageReference handleFSMSoapMessage(xoap::MessageReference)
void addDQMEventFragmentSample(const double bytecount)
void reset(double vett[256])
Definition: TPedValues.cc:11
ConcurrentQueue< I2OChain > FragmentQueue
Definition: FragmentQueue.h:21
boost::scoped_ptr< ConsumerUtils_t > consumerUtils_
void addEventFragmentSample(const double bytecount)
boost::scoped_ptr< SMWebPageHelper > smWebPageHelper_
void receiveDQMMessage(toolbox::mem::Reference *ref)
void addFragmentSample(const double bytecount)
unsigned char * dataLocation(int fragmentIndex) const
Definition: I2OChain.cc:444