CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
FUResourceTable.cc
Go to the documentation of this file.
1 //
3 // FUResourceTable
4 // ---------------
5 //
6 // 12/10/2006 Philipp Schieferdecker <philipp.schieferdecker@cern.ch>
8 
9 
12 #include "EvffedFillerRB.h"
13 
14 #include "toolbox/task/WorkLoopFactory.h"
15 #include "interface/evb/i2oEVBMsgs.h"
16 #include "xcept/tools.h"
17 
18 
19 #include <fstream>
20 #include <sstream>
21 #include <iomanip>
22 #include <unistd.h>
23 
24 
25 using namespace evf;
26 using namespace std;
27 
28 
30 // construction/destruction
32 
33 //______________________________________________________________________________
34 FUResourceTable::FUResourceTable(bool segmentationMode,
35  UInt_t nbRawCells,
36  UInt_t nbRecoCells,
37  UInt_t nbDqmCells,
38  UInt_t rawCellSize,
39  UInt_t recoCellSize,
40  UInt_t dqmCellSize,
41  BUProxy *bu,
42  SMProxy *sm,
43  log4cplus::Logger logger,
44  unsigned int timeout,
45  EvffedFillerRB *frb,
46  xdaq::Application*app)
47  throw (evf::Exception)
48  : bu_(bu)
49  , sm_(sm)
50  , log_(logger)
51  , wlSendData_(0)
52  , asSendData_(0)
53  , wlSendDqm_(0)
54  , asSendDqm_(0)
55  , wlDiscard_(0)
56  , asDiscard_(0)
57  , shmBuffer_(0)
58  , nbDqmCells_(nbDqmCells)
59  , nbRawCells_(nbRawCells)
60  , nbRecoCells_(nbRecoCells)
61  , acceptSMDataDiscard_(0)
62  , acceptSMDqmDiscard_(0)
63  , doCrcCheck_(1)
64  , shutdownTimeout_(timeout)
65  , nbPending_(0)
66  , nbClientsToShutDown_(0)
67  , isReadyToShutDown_(true)
68  , isActive_(false)
69  , isHalting_(false)
70  , isStopping_(false)
71  , runNumber_(0xffffffff)
72  , lock_(toolbox::BSem::FULL)
73  , frb_(frb)
74  , app_(app)
75 {
76  initialize(segmentationMode,
77  nbRawCells,nbRecoCells,nbDqmCells,
78  rawCellSize,recoCellSize,dqmCellSize);
79 }
80 
81 
82 //______________________________________________________________________________
84 {
85  clear();
86  if(wlSendData_){
87  wlSendData_->cancel();
88  toolbox::task::getWorkLoopFactory()->removeWorkLoop("SendData","waiting");
89  }
90  if(wlSendDqm_){
91  wlSendDqm_->cancel();
92  toolbox::task::getWorkLoopFactory()->removeWorkLoop("SendDqm","waiting");
93  }
94  if(wlDiscard_){
95  wlDiscard_->cancel();
96  toolbox::task::getWorkLoopFactory()->removeWorkLoop("Discard","waiting");
97  }
98  shmdt(shmBuffer_);
100  LOG4CPLUS_INFO(log_,"SHARED MEMORY SUCCESSFULLY RELEASED.");
101  if (0!=acceptSMDataDiscard_) delete [] acceptSMDataDiscard_;
102  if (0!= acceptSMDqmDiscard_) delete [] acceptSMDqmDiscard_;
103 }
104 
105 
107 // implementation of member functions
109 
110 //______________________________________________________________________________
111 void FUResourceTable::initialize(bool segmentationMode,
112  UInt_t nbRawCells,
113  UInt_t nbRecoCells,
114  UInt_t nbDqmCells,
115  UInt_t rawCellSize,
116  UInt_t recoCellSize,
117  UInt_t dqmCellSize)
118  throw (evf::Exception)
119 {
120  clear();
121 
122  shmBuffer_=FUShmBuffer::createShmBuffer(segmentationMode,
123  nbRawCells,nbRecoCells,nbDqmCells,
124  rawCellSize,recoCellSize,dqmCellSize);
125  if (0==shmBuffer_) {
126  string msg = "CREATION OF SHARED MEMORY SEGMENT FAILED!";
127  LOG4CPLUS_FATAL(log_,msg);
128  XCEPT_RAISE(evf::Exception,msg);
129  }
130 
131  for (UInt_t i=0;i<nbRawCells_;i++) {
132  resources_.push_back(new FUResource(i,log_,frb_,app_));
133  freeResourceIds_.push(i);
134  }
135 
136  acceptSMDataDiscard_ = new bool[nbRecoCells];
137  acceptSMDqmDiscard_ = new int[nbDqmCells];
138 
139  resetCounters();
140 }
141 
142 
143 //______________________________________________________________________________
145 {
146  try {
147  wlSendData_=
148  toolbox::task::getWorkLoopFactory()->getWorkLoop("SendData","waiting");
149  if (!wlSendData_->isActive()) wlSendData_->activate();
150  asSendData_=toolbox::task::bind(this,&FUResourceTable::sendData,"SendData");
151  wlSendData_->submit(asSendData_);
152  }
153  catch (xcept::Exception& e) {
154  string msg = "Failed to start workloop 'SendData'.";
155  XCEPT_RETHROW(evf::Exception,msg,e);
156  }
157 }
158 
159 
160 //______________________________________________________________________________
161 bool FUResourceTable::sendData(toolbox::task::WorkLoop* /* wl */)
162 {
163  bool reschedule=true;
164 
165  FUShmRecoCell* cell=shmBuffer_->recoCellToRead();
166 
167  if (0==cell->eventSize()) {
168  LOG4CPLUS_INFO(log_,"Don't reschedule sendData workloop.");
169  UInt_t cellIndex=cell->index();
170  shmBuffer_->finishReadingRecoCell(cell);
171  shmBuffer_->discardRecoCell(cellIndex);
172  reschedule=false;
173  }
174  else if (isHalting_) {
175  LOG4CPLUS_INFO(log_,"sendData: isHalting, discard recoCell.");
176  UInt_t cellIndex=cell->index();
177  shmBuffer_->finishReadingRecoCell(cell);
178  shmBuffer_->discardRecoCell(cellIndex);
179  }
180  else {
181  try {
182  if (cell->type()==0) {
183  UInt_t cellIndex = cell->index();
184  UInt_t cellOutModId = cell->outModId();
185  UInt_t cellFUProcId = cell->fuProcessId();
186  UInt_t cellFUGuid = cell->fuGuid();
187  UChar_t* cellPayloadAddr = cell->payloadAddr();
188  UInt_t cellEventSize = cell->eventSize();
189  shmBuffer_->finishReadingRecoCell(cell);
190 
191  lock();
192  nbPendingSMDiscards_++;
193  unlock();
194 
195  sendInitMessage(cellIndex,cellOutModId,cellFUProcId,cellFUGuid,
196  cellPayloadAddr,cellEventSize);
197  }
198  else if (cell->type()==1) {
199  UInt_t cellIndex = cell->index();
200  UInt_t cellRawIndex = cell->rawCellIndex();
201  UInt_t cellRunNumber = cell->runNumber();
202  UInt_t cellEvtNumber = cell->evtNumber();
203  UInt_t cellOutModId = cell->outModId();
204  UInt_t cellFUProcId = cell->fuProcessId();
205  UInt_t cellFUGuid = cell->fuGuid();
206  UChar_t *cellPayloadAddr = cell->payloadAddr();
207  UInt_t cellEventSize = cell->eventSize();
208  shmBuffer_->finishReadingRecoCell(cell);
209 
210  lock();
211  nbPendingSMDiscards_++;
212  resources_[cellRawIndex]->incNbSent();
213  if (resources_[cellRawIndex]->nbSent()==1) nbSent_++;
214  unlock();
215 
216  sendDataEvent(cellIndex,cellRunNumber,cellEvtNumber,cellOutModId,
217  cellFUProcId,cellFUGuid,cellPayloadAddr,cellEventSize);
218  }
219  else if (cell->type()==2) {
220  UInt_t cellIndex = cell->index();
221  UInt_t cellRawIndex = cell->rawCellIndex();
222  //UInt_t cellRunNumber = cell->runNumber();
223  UInt_t cellEvtNumber = cell->evtNumber();
224  UInt_t cellFUProcId = cell->fuProcessId();
225  UInt_t cellFUGuid = cell->fuGuid();
226  UChar_t *cellPayloadAddr = cell->payloadAddr();
227  UInt_t cellEventSize = cell->eventSize();
228  shmBuffer_->finishReadingRecoCell(cell);
229 
230  lock();
231  nbPendingSMDiscards_++;
232  resources_[cellRawIndex]->incNbSent();
233  if (resources_[cellRawIndex]->nbSent()==1) { nbSent_++; nbSentError_++; }
234  unlock();
235 
236  sendErrorEvent(cellIndex,runNumber_,cellEvtNumber,
237  cellFUProcId,cellFUGuid,cellPayloadAddr,cellEventSize);
238  }
239  else {
240  string errmsg="Unknown RecoCell type (neither INIT/DATA/ERROR).";
241  XCEPT_RAISE(evf::Exception,errmsg);
242  }
243  }
244  catch (xcept::Exception& e) {
245  LOG4CPLUS_FATAL(log_,"Failed to send EVENT DATA to StorageManager: "
246  <<xcept::stdformat_exception_history(e));
247  reschedule=false;
248  }
249  }
250 
251  return reschedule;
252 }
253 
254 
255 //______________________________________________________________________________
257 {
258  try {
259  wlSendDqm_=toolbox::task::getWorkLoopFactory()->getWorkLoop("SendDqm","waiting");
260  if (!wlSendDqm_->isActive()) wlSendDqm_->activate();
261  asSendDqm_=toolbox::task::bind(this,&FUResourceTable::sendDqm,"SendDqm");
262  wlSendDqm_->submit(asSendDqm_);
263  }
264  catch (xcept::Exception& e) {
265  string msg = "Failed to start workloop 'SendDqm'.";
266  XCEPT_RETHROW(evf::Exception,msg,e);
267  }
268 }
269 
270 
271 //______________________________________________________________________________
272 bool FUResourceTable::sendDqm(toolbox::task::WorkLoop* /* wl */)
273 {
274  bool reschedule=true;
275 
276  FUShmDqmCell* cell=shmBuffer_->dqmCellToRead();
277  dqm::State_t state=shmBuffer_->dqmState(cell->index());
278 
279  if (state==dqm::EMPTY) {
280  LOG4CPLUS_WARN(log_,"Don't reschedule sendDqm workloop.");
281  std::cout << "shut down dqm workloop " << std::endl;
282  UInt_t cellIndex=cell->index();
283  shmBuffer_->finishReadingDqmCell(cell);
284  shmBuffer_->discardDqmCell(cellIndex);
285  reschedule=false;
286  }
287  else if (isHalting_) {
288  UInt_t cellIndex=cell->index();
289  shmBuffer_->finishReadingDqmCell(cell);
290  shmBuffer_->discardDqmCell(cellIndex);
291  }
292  else {
293  try {
294  UInt_t cellIndex = cell->index();
295  UInt_t cellRunNumber = cell->runNumber();
296  UInt_t cellEvtAtUpdate = cell->evtAtUpdate();
297  UInt_t cellFolderId = cell->folderId();
298  UInt_t cellFUProcId = cell->fuProcessId();
299  UInt_t cellFUGuid = cell->fuGuid();
300  UChar_t *cellPayloadAddr = cell->payloadAddr();
301  UInt_t cellEventSize = cell->eventSize();
302  sendDqmEvent(cellIndex,cellRunNumber,cellEvtAtUpdate,cellFolderId,
303  cellFUProcId,cellFUGuid,cellPayloadAddr,cellEventSize);
304  shmBuffer_->finishReadingDqmCell(cell);
305  }
306  catch (xcept::Exception& e) {
307  LOG4CPLUS_FATAL(log_,"Failed to send DQM DATA to StorageManager: "
308  <<xcept::stdformat_exception_history(e));
309  reschedule=false;
310  }
311  }
312 
313  return reschedule;
314 }
315 
316 
317 //______________________________________________________________________________
319 {
320  try {
321  LOG4CPLUS_INFO(log_,"Start 'discard' workloop.");
322  wlDiscard_=toolbox::task::getWorkLoopFactory()->getWorkLoop("Discard","waiting");
323  if (!wlDiscard_->isActive()) wlDiscard_->activate();
324  asDiscard_=toolbox::task::bind(this,&FUResourceTable::discard,"Discard");
325  wlDiscard_->submit(asDiscard_);
326  isActive_=true;
327  }
328  catch (xcept::Exception& e) {
329  string msg = "Failed to start workloop 'Discard'.";
330  XCEPT_RETHROW(evf::Exception,msg,e);
331  }
332  isReadyToShutDown_=false;
333 }
334 
335 
336 //______________________________________________________________________________
337 bool FUResourceTable::discard(toolbox::task::WorkLoop* /* wl */)
338 {
339  FUShmRawCell* cell =shmBuffer_->rawCellToDiscard();
340  evt::State_t state=shmBuffer_->evtState(cell->index());
341 
342  bool reschedule =true;
343  bool shutDown =(state==evt::STOP);
344  bool isLumi =(state==evt::LUMISECTION);
345  UInt_t fuResourceId=cell->fuResourceId();
346  UInt_t buResourceId=cell->buResourceId();
347 
348  // std::cout << "discard loop, state, shutDown, isLumi " << state << " "
349  // << shutDown << " " << isLumi << std::endl;
350  // std::cout << "resource ids " << fuResourceId << " " << buResourceId << std::endl;
351 
352  if (shutDown) {
353  LOG4CPLUS_INFO(log_,"nbClientsToShutDown = "<<nbClientsToShutDown_);
354  if (nbClientsToShutDown_>0) --nbClientsToShutDown_;
355  if (nbClientsToShutDown_==0) {
356  LOG4CPLUS_INFO(log_,"Don't reschedule discard-workloop.");
357  isActive_ =false;
358  reschedule=false;
359  }
360  }
361 
362  shmBuffer_->discardRawCell(cell);
363 
364  if (!shutDown && !isLumi) {
365  resources_[fuResourceId]->release();
366  lock();
367  freeResourceIds_.push(fuResourceId);
368  assert(freeResourceIds_.size()<=resources_.size());
369  unlock();
370 
371  if (!isHalting_) {
372  sendDiscard(buResourceId);
373  if(!isStopping_)sendAllocate();
374  }
375  }
376 
377  if (!reschedule) {
378  std::cout << " entered shutdown cycle " << std::endl;
379  shmBuffer_->writeRecoEmptyEvent();
380  UInt_t count=0;
381  while (count<100) {
382  std::cout << " shutdown cycle " <<shmBuffer_->nClients() << " "
383  << FUShmBuffer::shm_nattch(shmBuffer_->shmid()) << std::endl;
384  if (shmBuffer_->nClients()==0&&
385  FUShmBuffer::shm_nattch(shmBuffer_->shmid())==1) {
386  // isReadyToShutDown_ = true;
387  break;
388  }
389  else {
390  count++;
391  std::cout << " shutdown cycle attempt " << count << std::endl;
392  LOG4CPLUS_DEBUG(log_,"FUResourceTable: Wait for all clients to detach,"
393  <<" nClients="<<shmBuffer_->nClients()
394  <<" nattch="<<FUShmBuffer::shm_nattch(shmBuffer_->shmid())
395  <<" ("<<count<<")");
396  ::usleep(shutdownTimeout_);
397  if(count*shutdownTimeout_ > 10000000)
398  LOG4CPLUS_WARN(log_,"FUResourceTable:LONG Wait (>10s) for all clients to detach,"
399  <<" nClients="<<shmBuffer_->nClients()
400  <<" nattch="<<FUShmBuffer::shm_nattch(shmBuffer_->shmid())
401  <<" ("<<count<<")");
402 
403  }
404  }
405  bool allEmpty = false;
406  std::cout << "Checking if all dqm cells are empty " << std::endl;
407  while(!allEmpty){
408  UInt_t n=nbDqmCells_;
409  allEmpty = true;
410  shmBuffer_->lock();
411  for (UInt_t i=0;i<n;i++) {
412  dqm::State_t state=shmBuffer_->dqmState(i);
413  if(state!=dqm::EMPTY) allEmpty = false;
414  }
415  shmBuffer_->unlock();
416  }
417  std::cout << "Making sure there are no dqm pending discards " << std::endl;
418  if(nbPendingSMDqmDiscards_ != 0)
419  {
420  LOG4CPLUS_WARN(log_,"FUResourceTable: pending DQM discards not zero: ="
421  << nbPendingSMDqmDiscards_ << " while cells are all empty. This may cause problems at next start ");
422 
423  }
424  shmBuffer_->writeDqmEmptyEvent();
425  isReadyToShutDown_ = true; // moved here from within the first while loop to make sure the
426  // sendDqm loop has been shut down as well
427  }
428 
429  return reschedule;
430 }
431 
432 
433 
434 //______________________________________________________________________________
436 {
437  assert(!freeResourceIds_.empty());
438 
439  lock();
440  UInt_t fuResourceId=freeResourceIds_.front();
441  freeResourceIds_.pop();
442  nbPending_++;
443  nbAllocated_++;
444  unlock();
445 
446  return fuResourceId;
447 }
448 
449 
450 //______________________________________________________________________________
452 {
453  bool eventComplete=false;
454 
455  I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *block=
456  (I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*)bufRef->getDataLocation();
457 
458  UInt_t fuResourceId=(UInt_t)block->fuTransactionId;
459  UInt_t buResourceId=(UInt_t)block->buResourceId;
460  FUResource* resource =resources_[fuResourceId];
461 
462  // allocate resource
463  if (!resource->fatalError()&&!resource->isAllocated()) {
464  FUShmRawCell* cell=shmBuffer_->rawCellToWrite();
465  resource->allocate(cell);
466  timeval now;
467  gettimeofday(&now,0);
468 
469  frb_->setRBTimeStamp(((uint64_t)(now.tv_sec) << 32) + (uint64_t)(now.tv_usec));
470 
471  frb_->setRBEventCount(nbCompleted_);
472 
473  if (doCrcCheck_>0&&0==nbAllocated_%doCrcCheck_) resource->doCrcCheck(true);
474  else resource->doCrcCheck(false);
475  }
476 
477 
478  // keep building this resource if it is healthy
479  if (!resource->fatalError()) {
480  resource->process(bufRef);
481  lock();
482  nbErrors_ +=resource->nbErrors();
483  nbCrcErrors_+=resource->nbCrcErrors();
484  unlock();
485 
486  // make resource available for pick-up
487  if (resource->isComplete()) {
488  lock();
489  nbCompleted_++;
490  nbPending_--;
491  unlock();
492  if (doDumpEvents_>0&&nbCompleted_%doDumpEvents_==0)
493  dumpEvent(resource->shmCell());
494  shmBuffer_->finishWritingRawCell(resource->shmCell());
495  eventComplete=true;
496  }
497 
498  }
499  // bad event, release msg, and the whole resource if this was the last one
500  //else {
501  if (resource->fatalError()) {
502  bool lastMsg=isLastMessageOfEvent(bufRef);
503  if (lastMsg) {
504  shmBuffer_->releaseRawCell(resource->shmCell());
505  resource->release();
506  lock();
507  freeResourceIds_.push(fuResourceId);
508  nbDiscarded_++;
509  nbLost_++;
510  nbPending_--;
511  unlock();
512  bu_->sendDiscard(buResourceId);
513  sendAllocate();
514  }
515  bufRef->release(); // this should now be safe re: appendToSuperFrag as corrupted blocks will be removed...
516  }
517 
518  return eventComplete;
519 }
520 
521 
522 //______________________________________________________________________________
524 {
526  msg=(I2O_FU_DATA_DISCARD_MESSAGE_FRAME*)bufRef->getDataLocation();
527  UInt_t recoIndex=msg->rbBufferID;
528 
529  if (acceptSMDataDiscard_[recoIndex]) {
530  lock();
531  nbPendingSMDiscards_--;
532  unlock();
533  acceptSMDataDiscard_[recoIndex] = false;
534 
535  if (!isHalting_) {
536  shmBuffer_->discardRecoCell(recoIndex);
537  bufRef->release();
538  }
539  }
540  else {
541  LOG4CPLUS_ERROR(log_,"Spurious DATA discard by StorageManager, skip!");
542  }
543 
544  if (isHalting_) {
545  bufRef->release();
546  return false;
547  }
548 
549  return true;
550 }
551 
552 
553 //______________________________________________________________________________
555 {
557  msg=(I2O_FU_DQM_DISCARD_MESSAGE_FRAME*)bufRef->getDataLocation();
558  UInt_t dqmIndex=msg->rbBufferID;
559  unsigned int ntries = 0;
560  while(shmBuffer_->dqmState(dqmIndex)!=dqm::SENT){
561  LOG4CPLUS_WARN(log_,"DQM discard for cell "<< dqmIndex << " which is not yer in SENT state - waiting");
562  ::usleep(10000);
563  if(ntries++>10){
564  LOG4CPLUS_ERROR(log_,"DQM cell " << dqmIndex
565  << " discard timed out while cell still in state " << shmBuffer_->dqmState(dqmIndex) );
566  bufRef->release();
567  return true;
568  }
569  }
570  if (acceptSMDqmDiscard_[dqmIndex]>0) {
571  acceptSMDqmDiscard_[dqmIndex]--;
572  if(nbPendingSMDqmDiscards_>0){
573  nbPendingSMDqmDiscards_--;
574  }
575  else {
576  LOG4CPLUS_WARN(log_,"Spurious??? DQM discard by StorageManager, index " << dqmIndex
577  << " cell state " << shmBuffer_->dqmState(dqmIndex) << " accept flag " << acceptSMDqmDiscard_[dqmIndex];);
578  }
579 
580  if (!isHalting_) {
581  shmBuffer_->discardDqmCell(dqmIndex);
582  bufRef->release();
583  }
584 
585  }
586  else {
587  LOG4CPLUS_ERROR(log_,"Spurious DQM discard for cell " << dqmIndex
588  << " from StorageManager while cell is not accepting discards");
589  }
590 
591  if (isHalting_) {
592  bufRef->release();
593  return false;
594  }
595 
596  return true;
597 }
598 
599 
600 //______________________________________________________________________________
602 {
603  I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *msg =
604  (I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *)bufRef->getDataLocation();
605  //make sure to fill up the shmem so no process will miss it
606  // but processes will have to handle duplicates
607 
608  for(unsigned int i = 0; i < nbRawCells_; i++)
609  shmBuffer_->writeRawLumiSectionEvent(msg->lumiSection);
610 }
611 
612 
613 //______________________________________________________________________________
615 {
616  FUShmRawCell* cell=shmBuffer_->rawCellToRead();
617  UInt_t fuResourceId=cell->fuResourceId();
618  shmBuffer_->finishReadingRawCell(cell);
619  shmBuffer_->scheduleRawCellForDiscard(fuResourceId);
620 }
621 
622 
623 //______________________________________________________________________________
625 {
626  bool retval = false;
627  vector<pid_t> pids=cellPrcIds();
628  UInt_t iRawCell=pids.size();
629  for (UInt_t i=0;i<pids.size();i++) { if (pid==pids[i]) { iRawCell=i; break; } }
630 
631  if (iRawCell<pids.size()){
632  shmBuffer_->writeErrorEventData(runNumber,pid,iRawCell);
633  retval = true;
634  }
635  else
636  LOG4CPLUS_WARN(log_,"No raw data to send to error stream for process " << pid);
637  shmBuffer_->removeClientPrcId(pid);
638  return retval;
639 }
640 
641 
642 //______________________________________________________________________________
644 {
645  ostringstream oss; oss<<"/tmp/evt"<<cell->evtNumber()<<".dump";
646  ofstream fout(oss.str().c_str());
647  fout.fill('0');
648 
649  fout<<"#\n# evt "<<cell->evtNumber()<<"\n#\n"<<endl;
650  for (unsigned int i=0;i<cell->nFed();i++) {
651  if (cell->fedSize(i)==0) continue;
652  fout<<"# fedid "<<i<<endl;
653  unsigned char* addr=cell->fedAddr(i);
654  for (unsigned int j=0;j<cell->fedSize(i);j++) {
655  fout<<setiosflags(ios::right)<<setw(2)<<hex<<(int)(*addr)<<dec;
656  if ((j+1)%8) fout<<" "; else fout<<endl;
657  ++addr;
658  }
659  fout<<endl;
660  }
661  fout.close();
662 }
663 
664 
665 //______________________________________________________________________________
667 {
668  isStopping_ = true;
669  shutDownClients();
670 }
671 
672 
673 //______________________________________________________________________________
675 {
676  isHalting_=true;
677  shutDownClients();
678 }
679 
680 
681 //______________________________________________________________________________
683 {
684  nbClientsToShutDown_ = nbClients();
685  isReadyToShutDown_ = false;
686 
687  if (nbClientsToShutDown_==0) {
688  LOG4CPLUS_INFO(log_,"No clients to shut down. Checking if there are raw cells not assigned to any process yet");
689  UInt_t n=nbResources();
690  for (UInt_t i=0;i<n;i++) {
691  evt::State_t state=shmBuffer_->evtState(i);
692  if (state!=evt::EMPTY){
693  LOG4CPLUS_WARN(log_,"Schedule discard at STOP for orphaned event in state "
694  << state);
695  shmBuffer_->scheduleRawCellForDiscardServerSide(i);
696  }
697  }
698  shmBuffer_->scheduleRawEmptyCellForDiscard();
699  }
700  else {
701  UInt_t n=nbClientsToShutDown_;
702  for (UInt_t i=0;i<n;++i) shmBuffer_->writeRawEmptyEvent();
703  }
704 }
705 
706 
707 //______________________________________________________________________________
709 {
710  for (UInt_t i=0;i<resources_.size();i++) {
711  resources_[i]->release();
712  delete resources_[i];
713  }
714  resources_.clear();
715  while (!freeResourceIds_.empty()) freeResourceIds_.pop();
716 }
717 
718 
719 //______________________________________________________________________________
721 {
722  if (0!=shmBuffer_) {
723  for (UInt_t i=0;i<shmBuffer_->nRecoCells();i++) acceptSMDataDiscard_[i]= false;
724  for (UInt_t i=0;i<shmBuffer_->nDqmCells();i++) acceptSMDqmDiscard_[i] = 0;
725  }
726 
727  nbAllocated_ =nbPending_;
728  nbCompleted_ =0;
729  nbSent_ =0;
730  nbSentError_ =0;
731  nbSentDqm_ =0;
732  nbPendingSMDiscards_ =0;
733  nbPendingSMDqmDiscards_=0;
734  nbDiscarded_ =0;
735  nbLost_ =0;
736 
737  nbErrors_ =0;
738  nbCrcErrors_ =0;
739  nbAllocSent_ =0;
740 
741  sumOfSquares_ =0;
742  sumOfSizes_ =0;
743  isStopping_ =false;
744 }
745 
746 
747 //______________________________________________________________________________
749 {
750  UInt_t result(0);
751  if (0!=shmBuffer_) result=shmBuffer_->nClients();
752  return result;
753 }
754 
755 
756 //______________________________________________________________________________
757 vector<pid_t> FUResourceTable::clientPrcIds() const
758 {
759  vector<pid_t> result;
760  if (0!=shmBuffer_) {
761  UInt_t n=nbClients();
762  for (UInt_t i=0;i<n;i++) result.push_back(shmBuffer_->clientPrcId(i));
763  }
764  return result;
765 }
766 
767 
768 //______________________________________________________________________________
770 {
771  stringstream ss;
772  if (0!=shmBuffer_) {
773  UInt_t n=nbClients();
774  for (UInt_t i=0;i<n;i++) {
775  if (i>0) ss<<",";
776  ss<<shmBuffer_->clientPrcId(i);
777  }
778  }
779  return ss.str();
780 }
781 
782 
783 //______________________________________________________________________________
784 vector<string> FUResourceTable::cellStates() const
785 {
786  vector<string> result;
787  if (0!=shmBuffer_) {
788  UInt_t n=nbResources();
789  shmBuffer_->lock();
790  for (UInt_t i=0;i<n;i++) {
791  evt::State_t state=shmBuffer_->evtState(i);
792  if (state==evt::EMPTY) result.push_back("EMPTY");
793  else if (state==evt::STOP) result.push_back("STOP");
794  else if (state==evt::RAWWRITING) result.push_back("RAWWRITING");
795  else if (state==evt::RAWWRITTEN) result.push_back("RAWWRITTEN");
796  else if (state==evt::RAWREADING) result.push_back("RAWREADING");
797  else if (state==evt::RAWREAD) result.push_back("RAWREAD");
798  else if (state==evt::PROCESSING) result.push_back("PROCESSING");
799  else if (state==evt::PROCESSED) result.push_back("PROCESSED");
800  else if (state==evt::RECOWRITING)result.push_back("RECOWRITING");
801  else if (state==evt::RECOWRITTEN)result.push_back("RECOWRITTEN");
802  else if (state==evt::SENDING) result.push_back("SENDING");
803  else if (state==evt::SENT) result.push_back("SENT");
804  else if (state==evt::DISCARDING) result.push_back("DISCARDING");
805  }
806  shmBuffer_->unlock();
807  }
808  return result;
809 }
810 
811 vector<string> FUResourceTable::dqmCellStates() const
812 {
813  vector<string> result;
814  if (0!=shmBuffer_) {
815  UInt_t n=nbDqmCells_;
816  shmBuffer_->lock();
817  for (UInt_t i=0;i<n;i++) {
818  dqm::State_t state=shmBuffer_->dqmState(i);
819  if (state==dqm::EMPTY) result.push_back("EMPTY");
820  else if (state==dqm::WRITING) result.push_back("WRITING");
821  else if (state==dqm::WRITTEN) result.push_back("WRITTEN");
822  else if (state==dqm::SENDING) result.push_back("SENDING");
823  else if (state==dqm::SENT) result.push_back("SENT");
824  else if (state==dqm::DISCARDING) result.push_back("DISCARDING");
825  }
826  shmBuffer_->unlock();
827  }
828  return result;
829 }
830 
831 
832 //______________________________________________________________________________
833 vector<UInt_t> FUResourceTable::cellEvtNumbers() const
834 {
835  vector<UInt_t> result;
836  if (0!=shmBuffer_) {
837  UInt_t n=nbResources();
838  shmBuffer_->lock();
839  for (UInt_t i=0;i<n;i++) result.push_back(shmBuffer_->evtNumber(i));
840  shmBuffer_->unlock();
841  }
842  return result;
843 }
844 
845 
846 //______________________________________________________________________________
847 vector<pid_t> FUResourceTable::cellPrcIds() const
848 {
849  vector<pid_t> result;
850  if (0!=shmBuffer_) {
851  UInt_t n=nbResources();
852  shmBuffer_->lock();
853  for (UInt_t i=0;i<n;i++) result.push_back(shmBuffer_->evtPrcId(i));
854  shmBuffer_->unlock();
855  }
856  return result;
857 }
858 
859 
860 //______________________________________________________________________________
861 vector<time_t> FUResourceTable::cellTimeStamps() const
862 {
863  vector<time_t> result;
864  if (0!=shmBuffer_) {
865  UInt_t n=nbResources();
866  shmBuffer_->lock();
867  for (UInt_t i=0;i<n;i++) result.push_back(shmBuffer_->evtTimeStamp(i));
868  shmBuffer_->unlock();
869  }
870  return result;
871 }
872 
873 
875 // implementation of private member functions
877 
878 //______________________________________________________________________________
880 {
881  UInt_t nbFreeSlots = this->nbFreeSlots();
882  UInt_t nbFreeSlotsMax = resources_.size()/2;
883  if (nbFreeSlots>nbFreeSlotsMax) {
884  UIntVec_t fuResourceIds;
885  for (UInt_t i=0;i<nbFreeSlots;i++)
886  fuResourceIds.push_back(allocateResource());
887  bu_->sendAllocate(fuResourceIds);
888  nbAllocSent_++;
889  }
890 }
891 
892 
893 //______________________________________________________________________________
895 {
896  bu_->sendDiscard(buResourceId);
897  nbDiscarded_++;
898 }
899 
900 
901 //______________________________________________________________________________
903  UInt_t outModId,
904  UInt_t fuProcessId,
905  UInt_t fuGuid,
906  UChar_t *data,
907  UInt_t dataSize)
908 {
909  if (0==sm_) {
910  LOG4CPLUS_ERROR(log_,"No StorageManager, DROP INIT MESSAGE!");
911  }
912  else {
913  acceptSMDataDiscard_[fuResourceId] = true;
914  UInt_t nbBytes=sm_->sendInitMessage(fuResourceId,outModId,fuProcessId,
915  fuGuid,data,dataSize);
916  sumOfSquares_+=(uint64_t)nbBytes*(uint64_t)nbBytes;
917  sumOfSizes_ +=nbBytes;
918  }
919 }
920 
921 
922 //______________________________________________________________________________
925  UInt_t evtNumber,
926  UInt_t outModId,
927  UInt_t fuProcessId,
928  UInt_t fuGuid,
929  UChar_t *data,
930  UInt_t dataSize)
931 {
932  if (0==sm_) {
933  LOG4CPLUS_ERROR(log_,"No StorageManager, DROP DATA EVENT!");
934  }
935  else {
936  acceptSMDataDiscard_[fuResourceId] = true;
937  UInt_t nbBytes=sm_->sendDataEvent(fuResourceId,runNumber,evtNumber,
938  outModId,fuProcessId,fuGuid,
939  data,dataSize);
940  sumOfSquares_+=(uint64_t)nbBytes*(uint64_t)nbBytes;
941  sumOfSizes_ +=nbBytes;
942  }
943 }
944 
945 
946 //______________________________________________________________________________
949  UInt_t evtNumber,
950  UInt_t fuProcessId,
951  UInt_t fuGuid,
952  UChar_t *data,
953  UInt_t dataSize)
954 {
955  if (0==sm_) {
956  LOG4CPLUS_ERROR(log_,"No StorageManager, DROP ERROR EVENT!");
957  }
958  else {
959  acceptSMDataDiscard_[fuResourceId] = true;
960  UInt_t nbBytes=sm_->sendErrorEvent(fuResourceId,runNumber,evtNumber,
961  fuProcessId,fuGuid,data,dataSize);
962  sumOfSquares_+=(uint64_t)nbBytes*(uint64_t)nbBytes;
963  sumOfSizes_ +=nbBytes;
964  }
965 
966 // if (0!=shmBuffer_) {
967 // UInt_t n=nbDqmCells_;
968 
969 // for (UInt_t i=0;i<n;i++) {
970 // if(shmBuffer_->dqmCell(i)->fuProcessId()==fuProcessId)
971 // {
972 // if(shmBuffer_->dqmState(i)!=dqm::SENT){
973 // shmBuffer_->setDqmState(i,dqm::SENT);
974 // shmBuffer_->discardDqmCell(i);
975 // acceptSMDqmDiscard_[i] = false;
976 // }
977 // }
978 // }
979 // n=nbRecoCells_;
980 // for (UInt_t i=0;i<n;i++) {
981 // if(shmBuffer_->recoCell(i)->fuProcessId()==fuProcessId)
982 // {
983 // shmBuffer_->discardOrphanedRecoCell(i);
984 // }
985 // }
986 
987 // }
988 }
989 
990 
991 //______________________________________________________________________________
994  UInt_t evtAtUpdate,
995  UInt_t folderId,
996  UInt_t fuProcessId,
997  UInt_t fuGuid,
998  UChar_t* data,
999  UInt_t dataSize)
1000 {
1001  if (0==sm_) {
1002  LOG4CPLUS_WARN(log_,"No StorageManager, DROP DQM EVENT.");
1003  }
1004  else {
1005  sm_->sendDqmEvent(fuDqmId,runNumber,evtAtUpdate,folderId,
1006  fuProcessId,fuGuid,data,dataSize);
1007 
1008  nbPendingSMDqmDiscards_++;
1009 
1010  acceptSMDqmDiscard_[fuDqmId]++;
1011  if(acceptSMDqmDiscard_[fuDqmId]>1)
1012  LOG4CPLUS_WARN(log_,"DQM Cell " << fuDqmId << " being sent more than once for folder "
1013  << folderId << " process " << fuProcessId << " guid " << fuGuid);
1014  nbSentDqm_++;
1015  }
1016 }
1017 
1018 
1019 //______________________________________________________________________________
1021 {
1022  while (0!=bufRef->getNextReference()) bufRef=bufRef->getNextReference();
1023 
1024  I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *block=
1025  (I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*)bufRef->getDataLocation();
1026 
1027  UInt_t iBlock =block->blockNb;
1028  UInt_t nBlock =block->nbBlocksInSuperFragment;
1029  UInt_t iSuperFrag=block->superFragmentNb;
1030  UInt_t nSuperFrag=block->nbSuperFragmentsInEvent;
1031 
1032  return ((iSuperFrag==nSuperFrag-1)&&(iBlock==nBlock-1));
1033 }
1034 
1035 //______________________________________________________________________________
1037 {
1038  for (UInt_t i=0;i<resources_.size();i++) {
1039  resources_[i]->scheduleCRCError();
1040  }
1041 }
1043 {
1044  std::cout << "Workloop status===============" << std::endl;
1045  std::cout << "==============================" << std::endl;
1046  if(wlSendData_!=0)
1047  std::cout << "SendData -> " << wlSendData_->isActive() << std::endl;
1048  if(wlSendDqm_!=0)
1049  std::cout << "SendDqm -> " << wlSendDqm_->isActive() << std::endl;
1050  if(wlDiscard_!=0)
1051  std::cout << "Discard -> " << wlDiscard_->isActive() << std::endl;
1052  std::cout << "Workloops Active -> " << isActive_ << std::endl;
1053 
1054 }
1055 
1056 
1058 {
1059  std::cout << "lastResort: " << shmBuffer_->nbRawCellsToRead()
1060  << " more rawcells to read " << std::endl;
1061  while(shmBuffer_->nbRawCellsToRead()!=0){
1062  FUShmRawCell* newCell=shmBuffer_->rawCellToRead();
1063  std::cout << "lastResort: " << shmBuffer_->nbRawCellsToRead() << std::endl;
1064  shmBuffer_->scheduleRawEmptyCellForDiscardServerSide(newCell);
1065  std::cout << "lastResort: schedule raw cell for discard" << std::endl;
1066  }
1067 }
unsigned int index() const
Definition: FUShmRecoCell.h:22
static const char runNumber_[]
unsigned int fuGuid() const
Definition: FUShmDqmCell.h:27
bool discardDataEvent(MemRef_t *bufRef)
int i
Definition: DBlmapReader.cc:9
static int shm_nattch(int shmid)
void process(MemRef_t *bufRef)
Definition: FUResource.cc:143
std::vector< UInt_t > UIntVec_t
Definition: FUTypes.h:15
std::vector< UInt_t > cellEvtNumbers() const
unsigned int type() const
Definition: FUShmRecoCell.h:29
FUResourceTable(bool segmentationMode, UInt_t nbRawCells, UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize, UInt_t recoCellSize, UInt_t dqmCellSize, BUProxy *bu, SMProxy *sm, log4cplus::Logger logger, unsigned int, EvffedFillerRB *frb, xdaq::Application *)
unsigned int index() const
Definition: FUShmDqmCell.h:22
bool fatalError() const
Definition: FUResource.h:62
unsigned int runNumber() const
Definition: FUShmRecoCell.h:24
std::vector< std::string > cellStates() const
unsigned int fuGuid() const
Definition: FUShmRecoCell.h:28
unsigned int folderId() const
Definition: FUShmDqmCell.h:25
bool handleCrashedEP(UInt_t runNumber, pid_t pid)
UInt_t nbErrors(bool reset=true)
Definition: FUResource.cc:794
unsigned int eventSize() const
Definition: FUShmRecoCell.h:33
std::ostream & logger()
Definition: fwLog.cc:41
toolbox::mem::Reference MemRef_t
Definition: FUTypes.h:10
unsigned int evtAtUpdate() const
Definition: FUShmDqmCell.h:24
void sendErrorEvent(UInt_t fuResourceId, UInt_t runNumber, UInt_t evtNumber, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize)
bool buildResource(MemRef_t *bufRef)
bool sendData(toolbox::task::WorkLoop *workLoop)
unsigned int fuProcessId() const
Definition: FUShmRecoCell.h:27
static bool releaseSharedMemory()
Definition: FUShmBuffer.cc:960
bool sendDqm(toolbox::task::WorkLoop *workLoop)
unsigned int evtNumber() const
Definition: FUShmRawCell.h:25
UInt_t nbClients() const
void sendDiscard(UInt_t buResourceId)
void dumpEvent(uint8 *buf)
Definition: DumpTools.cc:192
unsigned char * payloadAddr() const
void sendDataEvent(UInt_t fuResourceId, UInt_t runNumber, UInt_t evtNumber, UInt_t outModId, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize)
unsigned int evtNumber() const
Definition: FUShmRecoCell.h:25
unsigned int rawCellIndex() const
Definition: FUShmRecoCell.h:23
unsigned int fuProcessId() const
Definition: FUShmDqmCell.h:26
unsigned int runNumber() const
Definition: FUShmDqmCell.h:23
bool discard(toolbox::task::WorkLoop *workLoop)
unsigned char UChar_t
Definition: FUTypes.h:14
std::vector< time_t > cellTimeStamps() const
tuple result
Definition: query.py:137
std::vector< pid_t > clientPrcIds() const
int j
Definition: DBlmapReader.cc:9
bool isComplete() const
Definition: FUResource.h:146
block
Formating index page&#39;s pieces.
Definition: Association.py:187
unsigned char * fedAddr(unsigned int i) const
Definition: FUShmRawCell.cc:94
static FUShmBuffer * createShmBuffer(bool semgmentationMode, unsigned int nRawCells, unsigned int nRecoCells, unsigned int nDqmCells, unsigned int rawCellSize=0x400000, unsigned int recoCellSize=0x400000, unsigned int dqmCellSize=0x400000)
Definition: FUShmBuffer.cc:844
bool isLastMessageOfEvent(MemRef_t *bufRef)
unsigned int UInt_t
Definition: FUTypes.h:12
unsigned char * payloadAddr() const
Definition: FUShmDqmCell.cc:54
evf::FUShmRawCell * shmCell()
Definition: FUResource.h:77
std::string clientPrcIdsAsString() const
unsigned long long uint64_t
Definition: Time.h:15
bool isAllocated() const
Definition: FUResource.h:63
unsigned int eventSize() const
Definition: FUShmDqmCell.h:31
char state
Definition: procUtils.cc:75
void doCrcCheck(bool doCrcCheck)
Definition: FUResource.h:58
unsigned int index() const
Definition: FUShmRawCell.h:22
std::vector< pid_t > cellPrcIds() const
unsigned int nFed() const
Definition: FUShmRawCell.h:30
unsigned int outModId() const
Definition: FUShmRecoCell.h:26
void dumpEvent(evf::FUShmRawCell *cell)
unsigned int fuResourceId() const
Definition: FUShmRawCell.h:23
UInt_t nbCrcErrors(bool reset=true)
Definition: FUResource.cc:803
tuple cout
Definition: gather_cfg.py:41
unsigned int fedSize(unsigned int i) const
Definition: FUShmRawCell.cc:82
void postEndOfLumiSection(MemRef_t *bufRef)
void sendDqmEvent(UInt_t fuDqmId, UInt_t runNumber, UInt_t evtAtUpdate, UInt_t folderId, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize)
std::vector< std::string > dqmCellStates() const
void allocate(FUShmRawCell *shmCell)
Definition: FUResource.cc:88
bool discardDqmEvent(MemRef_t *bufRef)
unsigned int buResourceId() const
Definition: FUShmRawCell.h:24
void sendInitMessage(UInt_t fuResourceId, UInt_t outModId, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize)
void initialize(bool segmentationMode, UInt_t nbRawCells, UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize, UInt_t recoCellSize, UInt_t dqmCellSize)