CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
FUResourceQueue.cc
Go to the documentation of this file.
1 //
3 // >> UNDER DEVELOPMENT <<
4 //
5 // FUResourceQueue
6 // ---------------
7 //
8 // Main class for Message Queue interprocess communication.
9 //
10 // 28/10/2011 Andrei Spataru <andrei.cristian.spataru@cern.ch>
12 
18 
19 #include "EvffedFillerRB.h"
20 #include "interface/evb/i2oEVBMsgs.h"
21 #include "xcept/tools.h"
22 
23 #include <fstream>
24 #include <sstream>
25 #include <iomanip>
26 #include <unistd.h>
27 
28 using std::cout;
29 using std::endl;
30 using std::string;
31 using std::stringstream;
32 using std::vector;
33 using namespace evf;
34 
36 // construction/destruction
38 
39 //______________________________________________________________________________
40 FUResourceQueue::FUResourceQueue(bool segmentationMode, UInt_t nbRawCells,
41  UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize,
42  UInt_t recoCellSize, UInt_t dqmCellSize, int freeResReq, BUProxy *bu,
43  SMProxy *sm, log4cplus::Logger logger, unsigned int timeout,
44  EvffedFillerRB *frb, xdaq::Application*app) throw (evf::Exception) :
45  IPCMethod(segmentationMode, nbRawCells, nbRecoCells, nbDqmCells,
46  rawCellSize, recoCellSize, dqmCellSize, freeResReq, bu, sm,
47  logger, timeout, frb, app), msq_(99) {
48  //improve fix UInt_t and msq_(99)
49 
50  initialize(segmentationMode, nbRawCells, nbRecoCells, nbDqmCells,
51  rawCellSize, recoCellSize, dqmCellSize);
52 
53 }
54 
55 //______________________________________________________________________________
57  clear();
58 
59  // disconnect from queue
60  if (msq_.disconnect() == 0)
61  LOG4CPLUS_INFO(log_, "MESSAGE QUEUE SUCCESSFULLY RELEASED.");
62 
63  // needed??
64  /*
65  if (0 != acceptSMDataDiscard_)
66  delete[] acceptSMDataDiscard_;
67  if (0 != acceptSMDqmDiscard_)
68  delete[] acceptSMDqmDiscard_;
69  */
70 }
71 
73 // implementation of member functions
75 
76 //______________________________________________________________________________
77 void FUResourceQueue::initialize(bool segmentationMode, UInt_t nbRawCells,
78  UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize,
79  UInt_t recoCellSize, UInt_t dqmCellSize) throw (evf::Exception) {
80 
81  rawCellSize_ = rawCellSize;
82  recoCellSize_ = recoCellSize;
83  dqmCellSize_ = dqmCellSize;
84 
85  clear();
86 
87  if (0 == &msq_ || 0 == msq_.id()) {
88  string msg = "CREATION OF MESSAGE QUEUE FAILED!";
89  LOG4CPLUS_FATAL(log_, msg);
90  XCEPT_RAISE(evf::Exception, msg);
91  }
92 
93  cache_ = RawCache::getInstance();
94  cache_->initialise(nbRawCells, rawCellSize);
95 
96  // SEC need cap on max resources
97  for (UInt_t i = 0; i < nbRawCells_; i++) {
98  FUResource* newResource = new FUResource(i, log_, frb_, app_);
99  newResource->release(false);
100  resources_.push_back(newResource);
101  freeResourceIds_.push(i);
102  }
103 
104  //acceptSMDataDiscard_ = new bool[nbRecoCells];
105  //acceptSMDqmDiscard_ = new int[nbDqmCells];
106 
107  resetCounters();
108 }
109 
110 // work loop to send data events to storage manager
111 //______________________________________________________________________________
113  bool reschedule = true;
114 
115  //FUShmRecoCell* cell = shmBuffer_->recoCellToRead();
117 
118  bool rcvSuccess = msq_.rcvQuiet(recoMsg);
119  if (!rcvSuccess) {
120  cout << "RCV failed!" << endl;
121  ::sleep(5);
122  return reschedule;
123  }
124 
125  FUShmRecoCell* cell = recoMsg.recoCell();
126 
127  // event size 0 -> stop
128 
129  if (0 == cell->eventSize()) {
130  LOG4CPLUS_INFO(log_, "Don't reschedule sendData workloop.");
131  //UInt_t cellIndex = cell->index();
132  /*
133  shmBuffer_->finishReadingRecoCell(cell);
134  shmBuffer_->discardRecoCell(cellIndex);
135  */
136  reschedule = false;
137 
138  // halting
139  } else if (/*isHalting_*/false) {
140  LOG4CPLUS_INFO(log_, "sendData: isHalting, discard recoCell.");
141  //UInt_t cellIndex = cell->index();
142  /*
143  shmBuffer_->finishReadingRecoCell(cell);
144  shmBuffer_->discardRecoCell(cellIndex);
145  */
146 
147  } else {
148  try {
149  //init message
150  if (cell->type() == 0) {
151  UInt_t cellIndex = cell->index();
152  UInt_t cellOutModId = cell->outModId();
153  UInt_t cellFUProcId = cell->fuProcessId();
154  UInt_t cellFUGuid = cell->fuGuid();
155  UChar_t* cellPayloadAddr = cell->payloadAddr();
156  UInt_t cellEventSize = cell->eventSize();
157  UInt_t cellExpectedEPs = cell->nExpectedEPs();
158  //shmBuffer_->finishReadingRecoCell(cell);
159 
160  lock();
162  unlock();
163 
164  sendInitMessage(cellIndex, cellOutModId, cellFUProcId,
165  cellFUGuid, cellPayloadAddr, cellEventSize,
166  cellExpectedEPs);
167 
168  //
169  // DATA event message
170  //
171  } else if (cell->type() == 1) {
172  UInt_t cellIndex = cell->index();
173  UInt_t cellRawIndex = cell->rawCellIndex();
174  UInt_t cellRunNumber = cell->runNumber();
175  UInt_t cellEvtNumber = cell->evtNumber();
176  UInt_t cellOutModId = cell->outModId();
177  UInt_t cellFUProcId = cell->fuProcessId();
178  UInt_t cellFUGuid = cell->fuGuid();
179  UChar_t *cellPayloadAddr = cell->payloadAddr();
180  UInt_t cellEventSize = cell->eventSize();
181  //shmBuffer_->finishReadingRecoCell(cell);
182  lock();
184  resources_[cellRawIndex]->incNbSent();
185  if (resources_[cellRawIndex]->nbSent() == 1)
186  nbSent_++;
187  unlock();
188 
189  sendDataEvent(cellIndex, cellRunNumber, cellEvtNumber,
190  cellOutModId, cellFUProcId, cellFUGuid,
191  cellPayloadAddr, cellEventSize);
192  //
193  // ERROR event message
194  //
195  } else if (cell->type() == 2) {
196  UInt_t cellIndex = cell->index();
197  UInt_t cellRawIndex = cell->rawCellIndex();
198  //UInt_t cellRunNumber = cell->runNumber();
199  UInt_t cellEvtNumber = cell->evtNumber();
200  UInt_t cellFUProcId = cell->fuProcessId();
201  UInt_t cellFUGuid = cell->fuGuid();
202  UChar_t *cellPayloadAddr = cell->payloadAddr();
203  UInt_t cellEventSize = cell->eventSize();
204  //shmBuffer_->finishReadingRecoCell(cell);
205 
206  lock();
208  resources_[cellRawIndex]->incNbSent();
209  if (resources_[cellRawIndex]->nbSent() == 1) {
210  nbSent_++;
211  nbSentError_++;
212  }
213  unlock();
214 
215  sendErrorEvent(cellIndex, runNumber_, cellEvtNumber,
216  cellFUProcId, cellFUGuid, cellPayloadAddr,
217  cellEventSize);
218  } else {
219  string errmsg =
220  "Unknown RecoCell type (neither INIT/DATA/ERROR).";
221  XCEPT_RAISE(evf::Exception, errmsg);
222  }
223  } catch (xcept::Exception& e) {
224  LOG4CPLUS_FATAL(
225  log_,
226  "Failed to send EVENT DATA to StorageManager: "
227  << xcept::stdformat_exception_history(e));
228  reschedule = false;
229  }
230  }
231 
232  return reschedule;
233 }
234 
235 //______________________________________________________________________________
236 bool FUResourceQueue::sendDataWhileHalting(/*toolbox::task::WorkLoop* wl */) {
237  bool reschedule = true;
238 
239  //FUShmRecoCell* cell = shmBuffer_->recoCellToRead();
241 
242  bool rcvSuccess = msq_.rcvQuiet(recoMsg);
243  if (!rcvSuccess) {
244  cout << "RCV failed!" << endl;
245  ::sleep(5);
246  return reschedule;
247  }
248 
249  FUShmRecoCell* cell = recoMsg.recoCell();
250 
251  // event size 0 -> stop
252 
253  if (0 == cell->eventSize()) {
254  LOG4CPLUS_INFO(log_, "Don't reschedule sendData workloop.");
255  //UInt_t cellIndex = cell->index();
256  /*
257  shmBuffer_->finishReadingRecoCell(cell);
258  shmBuffer_->discardRecoCell(cellIndex);
259  */
260  reschedule = false;
261 
262  // halting
263  } else if (/*isHalting_*/true) {
264  LOG4CPLUS_INFO(log_, "sendData: isHalting, discard recoCell.");
265  //UInt_t cellIndex = cell->index();
266  /*
267  shmBuffer_->finishReadingRecoCell(cell);
268  shmBuffer_->discardRecoCell(cellIndex);
269  */
270 
271  } else {
272  try {
273  //init message
274  if (cell->type() == 0) {
275  UInt_t cellIndex = cell->index();
276  UInt_t cellOutModId = cell->outModId();
277  UInt_t cellFUProcId = cell->fuProcessId();
278  UInt_t cellFUGuid = cell->fuGuid();
279  UChar_t* cellPayloadAddr = cell->payloadAddr();
280  UInt_t cellEventSize = cell->eventSize();
281  UInt_t cellExpectedEPs = cell->nExpectedEPs();
282  //shmBuffer_->finishReadingRecoCell(cell);
283 
284  lock();
286  unlock();
287 
288  sendInitMessage(cellIndex, cellOutModId, cellFUProcId,
289  cellFUGuid, cellPayloadAddr, cellEventSize,
290  cellExpectedEPs);
291 
292  //
293  // DATA event message
294  //
295  } else if (cell->type() == 1) {
296  UInt_t cellIndex = cell->index();
297  UInt_t cellRawIndex = cell->rawCellIndex();
298  UInt_t cellRunNumber = cell->runNumber();
299  UInt_t cellEvtNumber = cell->evtNumber();
300  UInt_t cellOutModId = cell->outModId();
301  UInt_t cellFUProcId = cell->fuProcessId();
302  UInt_t cellFUGuid = cell->fuGuid();
303  UChar_t *cellPayloadAddr = cell->payloadAddr();
304  UInt_t cellEventSize = cell->eventSize();
305 
306  //shmBuffer_->finishReadingRecoCell(cell);
307  lock();
309  resources_[cellRawIndex]->incNbSent();
310  if (resources_[cellRawIndex]->nbSent() == 1)
311  nbSent_++;
312  unlock();
313 
314  sendDataEvent(cellIndex, cellRunNumber, cellEvtNumber,
315  cellOutModId, cellFUProcId, cellFUGuid,
316  cellPayloadAddr, cellEventSize);
317  //
318  // ERROR event message
319  //
320  } else if (cell->type() == 2) {
321  UInt_t cellIndex = cell->index();
322  UInt_t cellRawIndex = cell->rawCellIndex();
323  //UInt_t cellRunNumber = cell->runNumber();
324  UInt_t cellEvtNumber = cell->evtNumber();
325  UInt_t cellFUProcId = cell->fuProcessId();
326  UInt_t cellFUGuid = cell->fuGuid();
327  UChar_t *cellPayloadAddr = cell->payloadAddr();
328  UInt_t cellEventSize = cell->eventSize();
329  //shmBuffer_->finishReadingRecoCell(cell);
330 
331  lock();
333  resources_[cellRawIndex]->incNbSent();
334  if (resources_[cellRawIndex]->nbSent() == 1) {
335  nbSent_++;
336  nbSentError_++;
337  }
338  unlock();
339 
340  sendErrorEvent(cellIndex, runNumber_, cellEvtNumber,
341  cellFUProcId, cellFUGuid, cellPayloadAddr,
342  cellEventSize);
343  } else {
344  string errmsg =
345  "Unknown RecoCell type (neither INIT/DATA/ERROR).";
346  XCEPT_RAISE(evf::Exception, errmsg);
347  }
348  } catch (xcept::Exception& e) {
349  LOG4CPLUS_FATAL(
350  log_,
351  "Failed to send EVENT DATA to StorageManager: "
352  << xcept::stdformat_exception_history(e));
353  reschedule = false;
354  }
355  }
356 
357  return reschedule;
358 }
359 
360 // work loop to send dqm events to storage manager
361 //______________________________________________________________________________
363  bool reschedule = true;
364 
365  //FUShmDqmCell* cell = shmBuffer_->dqmCellToRead();
366  //dqm::State_t state = shmBuffer_->dqmState(cell->index());
367 
369 
370  bool rcvSuccess = msq_.rcvQuiet(dqmMsg);
371  if (!rcvSuccess) {
372  cout << "RCV failed!" << endl;
373  ::sleep(5);
374  return reschedule;
375  }
376  FUShmDqmCell* cell = dqmMsg.dqmCell();
377 
378  // concept add stop messages (there is no more "state")
379  //if (state == dqm::EMPTY) {
380  if (false) {
381  LOG4CPLUS_WARN(log_, "Don't reschedule sendDqm workloop.");
382  cout << "shut down dqm workloop " << endl;
383  //UInt_t cellIndex = cell->index();
384  /*
385  shmBuffer_->finishReadingDqmCell(cell);
386  shmBuffer_->discardDqmCell(cellIndex);
387  */
388  reschedule = false;
389  } else if (/*isHalting_*/false) {
390  //UInt_t cellIndex = cell->index();
391  /*
392  shmBuffer_->finishReadingDqmCell(cell);
393  shmBuffer_->discardDqmCell(cellIndex);
394  */
395  } else {
396  try {
397  UInt_t cellIndex = cell->index();
398  UInt_t cellRunNumber = cell->runNumber();
399  UInt_t cellEvtAtUpdate = cell->evtAtUpdate();
400  UInt_t cellFolderId = cell->folderId();
401  UInt_t cellFUProcId = cell->fuProcessId();
402  UInt_t cellFUGuid = cell->fuGuid();
403  UChar_t *cellPayloadAddr = cell->payloadAddr();
404  UInt_t cellEventSize = cell->eventSize();
405  sendDqmEvent(cellIndex, cellRunNumber, cellEvtAtUpdate,
406  cellFolderId, cellFUProcId, cellFUGuid, cellPayloadAddr,
407  cellEventSize);
408  //shmBuffer_->finishReadingDqmCell(cell);
409  } catch (xcept::Exception& e) {
410  LOG4CPLUS_FATAL(
411  log_,
412  "Failed to send DQM DATA to StorageManager: "
413  << xcept::stdformat_exception_history(e));
414  reschedule = false;
415  }
416  }
417 
418  return reschedule;
419 }
420 
421 //______________________________________________________________________________
422 bool FUResourceQueue::sendDqmWhileHalting(/*toolbox::task::WorkLoop* wl*/) {
423  bool reschedule = true;
424 
425  //FUShmDqmCell* cell = shmBuffer_->dqmCellToRead();
426  //dqm::State_t state = shmBuffer_->dqmState(cell->index());
427 
429 
430  bool rcvSuccess = msq_.rcvQuiet(dqmMsg);
431  if (!rcvSuccess) {
432  cout << "RCV failed!" << endl;
433  ::sleep(5);
434  return reschedule;
435  }
436  FUShmDqmCell* cell = dqmMsg.dqmCell();
437 
438  // concept add stop messages (there is no more "state")
439  //if (state == dqm::EMPTY) {
440  if (false) {
441  LOG4CPLUS_WARN(log_, "Don't reschedule sendDqm workloop.");
442  cout << "shut down dqm workloop " << endl;
443  //UInt_t cellIndex = cell->index();
444  /*
445  shmBuffer_->finishReadingDqmCell(cell);
446  shmBuffer_->discardDqmCell(cellIndex);
447  */
448  reschedule = false;
449  } else if (/*isHalting_*/true) {
450  //UInt_t cellIndex = cell->index();
451  /*
452  shmBuffer_->finishReadingDqmCell(cell);
453  shmBuffer_->discardDqmCell(cellIndex);
454  */
455  } else {
456  try {
457  UInt_t cellIndex = cell->index();
458  UInt_t cellRunNumber = cell->runNumber();
459  UInt_t cellEvtAtUpdate = cell->evtAtUpdate();
460  UInt_t cellFolderId = cell->folderId();
461  UInt_t cellFUProcId = cell->fuProcessId();
462  UInt_t cellFUGuid = cell->fuGuid();
463  UChar_t *cellPayloadAddr = cell->payloadAddr();
464  UInt_t cellEventSize = cell->eventSize();
465  sendDqmEvent(cellIndex, cellRunNumber, cellEvtAtUpdate,
466  cellFolderId, cellFUProcId, cellFUGuid, cellPayloadAddr,
467  cellEventSize);
468  //shmBuffer_->finishReadingDqmCell(cell);
469  } catch (xcept::Exception& e) {
470  LOG4CPLUS_FATAL(
471  log_,
472  "Failed to send DQM DATA to StorageManager: "
473  << xcept::stdformat_exception_history(e));
474  reschedule = false;
475  }
476  }
477 
478  return reschedule;
479 }
480 
481 // work loop to discard events to builder unit
482 //______________________________________________________________________________
483 bool FUResourceQueue::discard(/*toolbox::task::WorkLoop* wl*/) {
484 
485  bool reschedule = true;
486 
487  /*
488  * DISCARDING raw msg buffers
489  */
490  MsgBuf discardRaw(2 * sizeof(unsigned int), DISCARD_RAW_MESSAGE_TYPE);
491  bool rcvSuccess = msq_.rcvQuiet(discardRaw);
492 
493  if (!rcvSuccess) {
494  cout << "RCV failed!" << endl;
495  ::sleep(5);
496  return reschedule;
497  }
498 
499  unsigned int* pBuID = (unsigned int*) discardRaw->mtext;
500  unsigned int* pFuID = (unsigned int*) (discardRaw->mtext
501  + sizeof(unsigned int));
502 
503  unsigned int buResourceId = *pBuID;
504  unsigned int fuResourceId = *pFuID;
505 
506  cout << "Discard received for buResourceID: " << buResourceId
507  << " fuResourceID " << fuResourceId << endl << endl;
508 
509  //FUShmRawCell* cell = shmBuffer_->rawCellToDiscard();
510  //evt::State_t state = shmBuffer_->evtState(cell->index());
511 
512  /*
513  bool shutDown = (state == evt::STOP);
514  bool isLumi = (state == evt::LUMISECTION);
515  */
516  //UInt_t fuResourceId = cell->fuResourceId();
517  //UInt_t buResourceId = cell->buResourceId();
518 
519  // cout << "discard loop, state, shutDown, isLumi " << state << " "
520  // << shutDown << " " << isLumi << endl;
521  // cout << "resource ids " << fuResourceId << " " << buResourceId << endl;
522 
523  /*
524  if (shutDown) {
525  LOG4CPLUS_INFO(log_, "nbClientsToShutDown = " << nbClientsToShutDown_);
526  if (nbClientsToShutDown_ > 0)
527  --nbClientsToShutDown_;
528  if (nbClientsToShutDown_ == 0) {
529  LOG4CPLUS_INFO(log_, "Don't reschedule discard-workloop.");
530  isActive_ = false;
531  reschedule = false;
532  }
533  }
534  */
535 
536  //shmBuffer_->discardRawCell(cell);
537 
538  //if (!shutDown && !isLumi) {
539  if (true) {
540  // (false = no shmdt)
541  resources_[fuResourceId]->release(false);
542  // also release space in RawCache
543  RawCache::getInstance()->releaseMsg(fuResourceId);
544 
545  lock();
546  freeResourceIds_.push(fuResourceId);
547  assert(freeResourceIds_.size() <= resources_.size());
548  unlock();
549 
550  if (true) {
551  sendDiscard(buResourceId);
552  if (true)
553  sendAllocate();
554  }
555  }
556 
557  // concept shutdown cycle
558  /*
559  if (!reschedule) {
560  cout << " entered shutdown cycle " << endl;
561  shmBuffer_->writeRecoEmptyEvent();
562  UInt_t count = 0;
563  while (count < 100) {
564  cout << " shutdown cycle " << shmBuffer_->nClients() << " "
565  << FUShmBuffer::shm_nattch(shmBuffer_->shmid())
566  << endl;
567  if (shmBuffer_->nClients() == 0 && FUShmBuffer::shm_nattch(
568  shmBuffer_->shmid()) == 1) {
569  // isReadyToShutDown_ = true;
570  break;
571  } else {
572  count++;
573  cout << " shutdown cycle attempt " << count << endl;
574  LOG4CPLUS_DEBUG(
575  log_,
576  "FUResourceTable: Wait for all clients to detach,"
577  << " nClients=" << shmBuffer_->nClients()
578  << " nattch=" << FUShmBuffer::shm_nattch(
579  shmBuffer_->shmid()) << " (" << count << ")");
580  ::usleep(shutdownTimeout_);
581  if (count * shutdownTimeout_ > 10000000)
582  LOG4CPLUS_WARN(
583  log_,
584  "FUResourceTable:LONG Wait (>10s) for all clients to detach,"
585  << " nClients=" << shmBuffer_->nClients()
586  << " nattch=" << FUShmBuffer::shm_nattch(
587  shmBuffer_->shmid()) << " (" << count
588  << ")");
589 
590  }
591  }
592  bool allEmpty = false;
593  cout << "Checking if all dqm cells are empty " << endl;
594  while (!allEmpty) {
595  UInt_t n = nbDqmCells_;
596  allEmpty = true;
597  shmBuffer_->lock();
598  for (UInt_t i = 0; i < n; i++) {
599  dqm::State_t state = shmBuffer_->dqmState(i);
600  if (state != dqm::EMPTY)
601  allEmpty = false;
602  }
603  shmBuffer_->unlock();
604  }
605  cout << "Making sure there are no dqm pending discards "
606  << endl;
607  if (nbPendingSMDqmDiscards_ != 0) {
608  LOG4CPLUS_WARN(
609  log_,
610  "FUResourceTable: pending DQM discards not zero: ="
611  << nbPendingSMDqmDiscards_
612  << " while cells are all empty. This may cause problems at next start ");
613 
614  }
615  shmBuffer_->writeDqmEmptyEvent();
616  isReadyToShutDown_ = true; // moved here from within the first while loop to make sure the
617  // sendDqm loop has been shut down as well
618  }
619  */
620 
621  return reschedule;
622 }
623 
624 //______________________________________________________________________________
625 bool FUResourceQueue::discardWhileHalting(bool sendDiscards) {
626 
627  bool reschedule = true;
628 
629  /*
630  * DISCARDING raw msg buffers
631  */
632  MsgBuf discardRaw(2 * sizeof(unsigned int), DISCARD_RAW_MESSAGE_TYPE);
633  bool rcvSuccess = msq_.rcvQuiet(discardRaw);
634 
635  if (!rcvSuccess) {
636  cout << "RCV failed!" << endl;
637  ::sleep(5);
638  return reschedule;
639  }
640 
641  unsigned int* pBuID = (unsigned int*) discardRaw->mtext;
642  unsigned int* pFuID = (unsigned int*) (discardRaw->mtext
643  + sizeof(unsigned int));
644 
645  unsigned int buResourceId = *pBuID;
646  unsigned int fuResourceId = *pFuID;
647 
648  cout << "Discard received for buResourceID: " << buResourceId
649  << " fuResourceID " << fuResourceId << endl << endl;
650 
651  //FUShmRawCell* cell = shmBuffer_->rawCellToDiscard();
652  //evt::State_t state = shmBuffer_->evtState(cell->index());
653 
654  /*
655  bool shutDown = (state == evt::STOP);
656  bool isLumi = (state == evt::LUMISECTION);
657  */
658  //UInt_t fuResourceId = cell->fuResourceId();
659  //UInt_t buResourceId = cell->buResourceId();
660 
661  // cout << "discard loop, state, shutDown, isLumi " << state << " "
662  // << shutDown << " " << isLumi << endl;
663  // cout << "resource ids " << fuResourceId << " " << buResourceId << endl;
664 
665  /*
666  if (shutDown) {
667  LOG4CPLUS_INFO(log_, "nbClientsToShutDown = " << nbClientsToShutDown_);
668  if (nbClientsToShutDown_ > 0)
669  --nbClientsToShutDown_;
670  if (nbClientsToShutDown_ == 0) {
671  LOG4CPLUS_INFO(log_, "Don't reschedule discard-workloop.");
672  isActive_ = false;
673  reschedule = false;
674  }
675  }
676  */
677 
678  //shmBuffer_->discardRawCell(cell);
679 
680  //if (!shutDown && !isLumi) {
681  if (true) {
682  // (false = no shmdt)
683  resources_[fuResourceId]->release(false);
684  // also release space in RawCache
685  RawCache::getInstance()->releaseMsg(fuResourceId);
686 
687  lock();
688  freeResourceIds_.push(fuResourceId);
689  assert(freeResourceIds_.size() <= resources_.size());
690  unlock();
691 
692  if (false) {
693  sendDiscard(buResourceId);
694  if (false)
695  sendAllocate();
696  }
697  }
698 
699  // concept shutdown cycle
700  /*
701  if (!reschedule) {
702  cout << " entered shutdown cycle " << endl;
703  shmBuffer_->writeRecoEmptyEvent();
704  UInt_t count = 0;
705  while (count < 100) {
706  cout << " shutdown cycle " << shmBuffer_->nClients() << " "
707  << FUShmBuffer::shm_nattch(shmBuffer_->shmid())
708  << endl;
709  if (shmBuffer_->nClients() == 0 && FUShmBuffer::shm_nattch(
710  shmBuffer_->shmid()) == 1) {
711  // isReadyToShutDown_ = true;
712  break;
713  } else {
714  count++;
715  cout << " shutdown cycle attempt " << count << endl;
716  LOG4CPLUS_DEBUG(
717  log_,
718  "FUResourceTable: Wait for all clients to detach,"
719  << " nClients=" << shmBuffer_->nClients()
720  << " nattch=" << FUShmBuffer::shm_nattch(
721  shmBuffer_->shmid()) << " (" << count << ")");
722  ::usleep(shutdownTimeout_);
723  if (count * shutdownTimeout_ > 10000000)
724  LOG4CPLUS_WARN(
725  log_,
726  "FUResourceTable:LONG Wait (>10s) for all clients to detach,"
727  << " nClients=" << shmBuffer_->nClients()
728  << " nattch=" << FUShmBuffer::shm_nattch(
729  shmBuffer_->shmid()) << " (" << count
730  << ")");
731 
732  }
733  }
734  bool allEmpty = false;
735  cout << "Checking if all dqm cells are empty " << endl;
736  while (!allEmpty) {
737  UInt_t n = nbDqmCells_;
738  allEmpty = true;
739  shmBuffer_->lock();
740  for (UInt_t i = 0; i < n; i++) {
741  dqm::State_t state = shmBuffer_->dqmState(i);
742  if (state != dqm::EMPTY)
743  allEmpty = false;
744  }
745  shmBuffer_->unlock();
746  }
747  cout << "Making sure there are no dqm pending discards "
748  << endl;
749  if (nbPendingSMDqmDiscards_ != 0) {
750  LOG4CPLUS_WARN(
751  log_,
752  "FUResourceTable: pending DQM discards not zero: ="
753  << nbPendingSMDqmDiscards_
754  << " while cells are all empty. This may cause problems at next start ");
755 
756  }
757  shmBuffer_->writeDqmEmptyEvent();
758  isReadyToShutDown_ = true; // moved here from within the first while loop to make sure the
759  // sendDqm loop has been shut down as well
760  }
761  */
762 
763  return reschedule;
764 }
765 
766 // process buffer received via I2O_FU_TAKE message
767 //______________________________________________________________________________
768 
769 // BUILD RESOURCE (RAW CELL WRITING)
770 
772 
773  bool eventComplete = false;
774 
775  I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *block =
776  (I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*) bufRef->getDataLocation();
777 
778  UInt_t fuResourceId = (UInt_t) block->fuTransactionId;
779  UInt_t buResourceId = (UInt_t) block->buResourceId;
780 
781  FUResource* resource = resources_[fuResourceId];
782 
783  RawMsgBuf* currentMessageToWrite = cache_->getMsgToWrite();
784 
785  if (!resource->fatalError() && !resource->isAllocated()) {
786  FUShmRawCell* rawCell = currentMessageToWrite->rawCell();
787  rawCell->initialize(fuResourceId);
788  resource->allocate(rawCell);
789 
790  timeval now;
791  gettimeofday(&now, 0);
792 
794  ((uint64_t) (now.tv_sec) << 32) + (uint64_t) (now.tv_usec));
795 
797 
798  if (doCrcCheck_ > 0 && 0 == nbAllocated_ % doCrcCheck_)
799  resource->doCrcCheck(true);
800  else
801  resource->doCrcCheck(false);
802  }
803 
804  // keep building this resource if it is healthy
805  if (!resource->fatalError()) {
806  resource->process(bufRef);
807 
808  lock();
809  nbErrors_ += resource->nbErrors();
810  nbCrcErrors_ += resource->nbCrcErrors();
811  unlock();
812 
813  // make resource available for pick-up
814  if (resource->isComplete()) {
815  lock();
816  nbCompleted_++;
817  nbPending_--;
818  unlock();
819  /*
820  cout << "POSTING to q: msqid = " << msq_.id()
821  << ", message buf is allocated for: "
822  << currentMessageToWrite->msize()
823  << "... Actually copied: "
824  << currentMessageToWrite->usedSize() << " fuResourceID = "
825  << currentMessageToWrite->rawCell()->fuResourceId()
826  << " buResourceID = "
827  << currentMessageToWrite->rawCell()->buResourceId() << endl;
828  */
829 
830  //msq_.post(*currentMessageToWrite);
831 
832  try {
833  msq_.postLength(*currentMessageToWrite,
834  currentMessageToWrite->usedSize());
835  } catch (...) {
836  string errmsg = "Failed to post message to Queue!";
837  LOG4CPLUS_FATAL(log_, errmsg);
838  XCEPT_RAISE(evf::Exception, errmsg);
839 
840  }
841 
842  // CURRENT RECEIVERS
843  /*
844  vector<int> receivers = msq_.getReceivers();
845  cout << "--Receiving processes: ";
846  for (unsigned int i = 0; i < receivers.size(); ++i)
847  cout << i << " " << receivers[i];
848  cout << endl;
849  */
850 
851  eventComplete = true;
852  }
853 
854  }
855  // bad event, release msg, and the whole resource if this was the last one
856  if (resource->fatalError()) {
857  bool lastMsg = isLastMessageOfEvent(bufRef);
858  if (lastMsg) {
859  resource->release(false);
860  lock();
861  freeResourceIds_.push(fuResourceId);
862  nbDiscarded_++;
863  nbLost_++;
864  nbPending_--;
865  unlock();
866  bu_->sendDiscard(buResourceId);
867  sendAllocate();
868  }
869  bufRef->release(); // this should now be safe re: appendToSuperFrag as corrupted blocks will be removed...
870  }
871 
872  return eventComplete;
873 }
874 
875 //concept discardDataEvent still required
876 // process buffer received via I2O_SM_DATA_DISCARD message
877 //______________________________________________________________________________
879 
880  /*
881  I2O_FU_DATA_DISCARD_MESSAGE_FRAME *msg;
882  msg = (I2O_FU_DATA_DISCARD_MESSAGE_FRAME*) bufRef->getDataLocation();
883  UInt_t recoIndex = msg->rbBufferID;
884 
885  if (acceptSMDataDiscard_[recoIndex]) {
886  lock();
887  nbPendingSMDiscards_--;
888  unlock();
889  acceptSMDataDiscard_[recoIndex] = false;
890 
891  if (!isHalting_) {
892  shmBuffer_->discardRecoCell(recoIndex);
893  bufRef->release();
894  }
895  } else {
896  LOG4CPLUS_ERROR(log_, "Spurious DATA discard by StorageManager, skip!");
897  }
898 
899  if (isHalting_) {
900  bufRef->release();
901  return false;
902  }
903  */
904  return true;
905 }
906 
907 // process buffer received via I2O_SM_DATA_DISCARD message
908 //______________________________________________________________________________
910 
911  /*
912  I2O_FU_DATA_DISCARD_MESSAGE_FRAME *msg;
913  msg = (I2O_FU_DATA_DISCARD_MESSAGE_FRAME*) bufRef->getDataLocation();
914  UInt_t recoIndex = msg->rbBufferID;
915 
916  if (acceptSMDataDiscard_[recoIndex]) {
917  lock();
918  nbPendingSMDiscards_--;
919  unlock();
920  acceptSMDataDiscard_[recoIndex] = false;
921 
922  if (!isHalting_) {
923  shmBuffer_->discardRecoCell(recoIndex);
924  bufRef->release();
925  }
926  } else {
927  LOG4CPLUS_ERROR(log_, "Spurious DATA discard by StorageManager, skip!");
928  }
929 
930  if (isHalting_) {
931  bufRef->release();
932  return false;
933  }
934  */
935  return true;
936 }
937 
938 //concept discardDqmEvent still required?
939 // process buffer received via I2O_SM_DQM_DISCARD message
940 //______________________________________________________________________________
942  /*
943  I2O_FU_DQM_DISCARD_MESSAGE_FRAME *msg;
944  msg = (I2O_FU_DQM_DISCARD_MESSAGE_FRAME*) bufRef->getDataLocation();
945  UInt_t dqmIndex = msg->rbBufferID;
946  unsigned int ntries = 0;
947  while (shmBuffer_->dqmState(dqmIndex) != dqm::SENT) {
948  LOG4CPLUS_WARN(
949  log_,
950  "DQM discard for cell " << dqmIndex
951  << " which is not yer in SENT state - waiting");
952  ::usleep(10000);
953  if (ntries++ > 10) {
954  LOG4CPLUS_ERROR(
955  log_,
956  "DQM cell " << dqmIndex
957  << " discard timed out while cell still in state "
958  << shmBuffer_->dqmState(dqmIndex));
959  bufRef->release();
960  return true;
961  }
962  }
963  if (acceptSMDqmDiscard_[dqmIndex] > 0) {
964  acceptSMDqmDiscard_[dqmIndex]--;
965  if (nbPendingSMDqmDiscards_ > 0) {
966  nbPendingSMDqmDiscards_--;
967  } else {
968  LOG4CPLUS_WARN (log_,"Spurious??? DQM discard by StorageManager, index " << dqmIndex
969  << " cell state " << shmBuffer_->dqmState(dqmIndex) << " accept flag " << acceptSMDqmDiscard_[dqmIndex];);
970  }
971 
972  if (!isHalting_) {
973  shmBuffer_->discardDqmCell(dqmIndex);
974  bufRef->release();
975  }
976 
977  }
978  else {
979  LOG4CPLUS_ERROR(log_,"Spurious DQM discard for cell " << dqmIndex
980  << " from StorageManager while cell is not accepting discards");
981  }
982 
983  if (isHalting_) {
984  bufRef->release();
985  return false;
986  }
987  */
988  return true;
989 }
990 
991 // process buffer received via I2O_SM_DQM_DISCARD message
992 //______________________________________________________________________________
994  /*
995  I2O_FU_DQM_DISCARD_MESSAGE_FRAME *msg;
996  msg = (I2O_FU_DQM_DISCARD_MESSAGE_FRAME*) bufRef->getDataLocation();
997  UInt_t dqmIndex = msg->rbBufferID;
998  unsigned int ntries = 0;
999  while (shmBuffer_->dqmState(dqmIndex) != dqm::SENT) {
1000  LOG4CPLUS_WARN(
1001  log_,
1002  "DQM discard for cell " << dqmIndex
1003  << " which is not yer in SENT state - waiting");
1004  ::usleep(10000);
1005  if (ntries++ > 10) {
1006  LOG4CPLUS_ERROR(
1007  log_,
1008  "DQM cell " << dqmIndex
1009  << " discard timed out while cell still in state "
1010  << shmBuffer_->dqmState(dqmIndex));
1011  bufRef->release();
1012  return true;
1013  }
1014  }
1015  if (acceptSMDqmDiscard_[dqmIndex] > 0) {
1016  acceptSMDqmDiscard_[dqmIndex]--;
1017  if (nbPendingSMDqmDiscards_ > 0) {
1018  nbPendingSMDqmDiscards_--;
1019  } else {
1020  LOG4CPLUS_WARN (log_,"Spurious??? DQM discard by StorageManager, index " << dqmIndex
1021  << " cell state " << shmBuffer_->dqmState(dqmIndex) << " accept flag " << acceptSMDqmDiscard_[dqmIndex];);
1022  }
1023 
1024  if (!isHalting_) {
1025  shmBuffer_->discardDqmCell(dqmIndex);
1026  bufRef->release();
1027  }
1028 
1029  }
1030  else {
1031  LOG4CPLUS_ERROR(log_,"Spurious DQM discard for cell " << dqmIndex
1032  << " from StorageManager while cell is not accepting discards");
1033  }
1034 
1035  if (isHalting_) {
1036  bufRef->release();
1037  return false;
1038  }
1039  */
1040  return true;
1041 }
1042 
1043 //concept add message type, post end-of-ls event to msq
1044 //______________________________________________________________________________
1046  /*
1047  I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME
1048  *msg =
1049  (I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *) bufRef->getDataLocation();
1050  //make sure to fill up the shmem so no process will miss it
1051  // but processes will have to handle duplicates
1052 
1053  for (unsigned int i = 0; i < nbRawCells_; i++)
1054  shmBuffer_->writeRawLumiSectionEvent(msg->lumiSection);
1055  */
1056 }
1057 
1058 //concept dropEvent required?
1059 //______________________________________________________________________________
1061  /*
1062  FUShmRawCell* cell = shmBuffer_->rawCellToRead();
1063  UInt_t fuResourceId = cell->fuResourceId();
1064  shmBuffer_->finishReadingRawCell(cell);
1065  shmBuffer_->scheduleRawCellForDiscard(fuResourceId);
1066  */
1067 }
1068 
1069 //concept switch to error message send
1070 //______________________________________________________________________________
1072  bool retval = false;
1073  /*
1074  vector<pid_t> pids = cellPrcIds();
1075  UInt_t iRawCell = pids.size();
1076  for (UInt_t i = 0; i < pids.size(); i++) {
1077  if (pid == pids[i]) {
1078  iRawCell = i;
1079  break;
1080  }
1081  }
1082 
1083  if (iRawCell < pids.size()) {
1084  shmBuffer_->writeErrorEventData(runNumber, pid, iRawCell);
1085  retval = true;
1086  } else
1087  LOG4CPLUS_WARN(log_,
1088  "No raw data to send to error stream for process " << pid);
1089  shmBuffer_->removeClientPrcId(pid);
1090  */
1091  return retval;
1092 }
1093 
1094 //concept RAW with nothing in it
1095 //______________________________________________________________________________
1097  isReadyToShutDown_ = true;
1098  /*
1099  nbClientsToShutDown_ = nbClients();
1100  isReadyToShutDown_ = false;
1101 
1102  if (nbClientsToShutDown_ == 0) {
1103  LOG4CPLUS_INFO(
1104  log_,
1105  "No clients to shut down. Checking if there are raw cells not assigned to any process yet");
1106  UInt_t n = nbResources();
1107  for (UInt_t i = 0; i < n; i++) {
1108  evt::State_t state = shmBuffer_->evtState(i);
1109  if (state != evt::EMPTY) {
1110  LOG4CPLUS_WARN(
1111  log_,
1112  "Schedule discard at STOP for orphaned event in state "
1113  << state);
1114  shmBuffer_->scheduleRawCellForDiscardServerSide(i);
1115  }
1116  }
1117  shmBuffer_->scheduleRawEmptyCellForDiscard();
1118  } else {
1119  UInt_t n = nbClientsToShutDown_;
1120  for (UInt_t i = 0; i < n; ++i)
1121  shmBuffer_->writeRawEmptyEvent();
1122  }
1123  */
1124 }
1125 
1126 //______________________________________________________________________________
1128  for (UInt_t i = 0; i < resources_.size(); i++) {
1129  resources_[i]->release(false);
1130  delete resources_[i];
1131  }
1132  resources_.clear();
1133 
1134  while (!freeResourceIds_.empty())
1135  freeResourceIds_.pop();
1136 }
1137 
1139 // implementation of private member functions
1141 
1142 //concept adapt resetCounters
1143 //______________________________________________________________________________
1145  /*
1146  if (0 != shmBuffer_) {
1147  for (UInt_t i = 0; i < shmBuffer_->nRecoCells(); i++)
1148  acceptSMDataDiscard_[i] = false;
1149  for (UInt_t i = 0; i < shmBuffer_->nDqmCells(); i++)
1150  acceptSMDqmDiscard_[i] = 0;
1151  }
1152  */
1154  nbCompleted_ = 0;
1155  nbSent_ = 0;
1156  nbSentError_ = 0;
1157  nbSentDqm_ = 0;
1160  nbDiscarded_ = 0;
1161  nbLost_ = 0;
1162 
1163  nbErrors_ = 0;
1164  nbCrcErrors_ = 0;
1165  nbAllocSent_ = 0;
1166 
1167  sumOfSquares_ = 0;
1168  sumOfSizes_ = 0;
1169  //isStopping_ = false;
1170 
1171 }
1172 
1173 //concept adapt nbClients
1174 //______________________________________________________________________________
1176  UInt_t result(0);
1177 
1178  /*
1179  if (0 != shmBuffer_)
1180  result = shmBuffer_->nClients();
1181  */
1182  return result;
1183 }
1184 
1185 //concept adapt clientPrcIds
1186 //______________________________________________________________________________
1187 vector<pid_t> FUResourceQueue::clientPrcIds() const {
1188  vector<pid_t> result;
1189 
1190  /*
1191  if (0 != shmBuffer_) {
1192  UInt_t n = nbClients();
1193  for (UInt_t i = 0; i < n; i++)
1194  result.push_back(shmBuffer_->clientPrcId(i));
1195  }
1196  */
1197  return result;
1198 }
1199 
1200 //concept adapt clientPrcIdsAsString
1201 //______________________________________________________________________________
1203  stringstream ss;
1204 
1205  /*
1206  if (0 != shmBuffer_) {
1207  UInt_t n = nbClients();
1208  for (UInt_t i = 0; i < n; i++) {
1209  if (i > 0)
1210  ss << ",";
1211  ss << shmBuffer_->clientPrcId(i);
1212  }
1213  }
1214  */
1215  return ss.str();
1216 }
1217 
1218 //concept adapt cellStates
1219 //______________________________________________________________________________
1220 vector<string> FUResourceQueue::cellStates() const {
1221  vector<string> result;
1222  /*
1223  if (0 != shmBuffer_) {
1224  UInt_t n = nbResources();
1225  shmBuffer_->lock();
1226  for (UInt_t i = 0; i < n; i++) {
1227  evt::State_t state = shmBuffer_->evtState(i);
1228  if (state == evt::EMPTY)
1229  result.push_back("EMPTY");
1230  else if (state == evt::STOP)
1231  result.push_back("STOP");
1232  else if (state == evt::LUMISECTION)
1233  result.push_back("LUMISECTION");
1234  else if (state == evt::RAWWRITING)
1235  result.push_back("RAWWRITING");
1236  else if (state == evt::RAWWRITTEN)
1237  result.push_back("RAWWRITTEN");
1238  else if (state == evt::RAWREADING)
1239  result.push_back("RAWREADING");
1240  else if (state == evt::RAWREAD)
1241  result.push_back("RAWREAD");
1242  else if (state == evt::PROCESSING)
1243  result.push_back("PROCESSING");
1244  else if (state == evt::PROCESSED)
1245  result.push_back("PROCESSED");
1246  else if (state == evt::RECOWRITING)
1247  result.push_back("RECOWRITING");
1248  else if (state == evt::RECOWRITTEN)
1249  result.push_back("RECOWRITTEN");
1250  else if (state == evt::SENDING)
1251  result.push_back("SENDING");
1252  else if (state == evt::SENT)
1253  result.push_back("SENT");
1254  else if (state == evt::DISCARDING)
1255  result.push_back("DISCARDING");
1256  }
1257  shmBuffer_->unlock();
1258  }
1259  */
1260  return result;
1261 }
1262 
1263 //concept adapt dqmCellStates
1264 vector<string> FUResourceQueue::dqmCellStates() const {
1265  vector<string> result;
1266 
1267  /*
1268  if (0 != shmBuffer_) {
1269  UInt_t n = nbDqmCells_;
1270  shmBuffer_->lock();
1271  for (UInt_t i = 0; i < n; i++) {
1272  dqm::State_t state = shmBuffer_->dqmState(i);
1273  if (state == dqm::EMPTY)
1274  result.push_back("EMPTY");
1275  else if (state == dqm::WRITING)
1276  result.push_back("WRITING");
1277  else if (state == dqm::WRITTEN)
1278  result.push_back("WRITTEN");
1279  else if (state == dqm::SENDING)
1280  result.push_back("SENDING");
1281  else if (state == dqm::SENT)
1282  result.push_back("SENT");
1283  else if (state == dqm::DISCARDING)
1284  result.push_back("DISCARDING");
1285  }
1286  shmBuffer_->unlock();
1287  }
1288  */
1289  return result;
1290 }
1291 
1292 //concept adapt cellEvtNumbers
1293 //______________________________________________________________________________
1294 vector<UInt_t> FUResourceQueue::cellEvtNumbers() const {
1295  vector<UInt_t> result;
1296 
1297  /*
1298  if (0 != shmBuffer_) {
1299  UInt_t n = nbResources();
1300  shmBuffer_->lock();
1301  for (UInt_t i = 0; i < n; i++)
1302  result.push_back(shmBuffer_->evtNumber(i));
1303  shmBuffer_->unlock();
1304  }
1305  */
1306  return result;
1307 }
1308 
1309 //concept adapt cellPrcIds
1310 //______________________________________________________________________________
1311 vector<pid_t> FUResourceQueue::cellPrcIds() const {
1312  vector<pid_t> result;
1313 
1314  /*
1315  if (0 != shmBuffer_) {
1316  UInt_t n = nbResources();
1317  shmBuffer_->lock();
1318  for (UInt_t i = 0; i < n; i++)
1319  result.push_back(shmBuffer_->evtPrcId(i));
1320  shmBuffer_->unlock();
1321  }
1322  */
1323  return result;
1324 }
1325 
1326 //concept adapt cellTimeStamps
1327 //______________________________________________________________________________
1328 vector<time_t> FUResourceQueue::cellTimeStamps() const {
1329  vector<time_t> result;
1330 
1331  /*
1332  if (0 != shmBuffer_) {
1333  UInt_t n = nbResources();
1334  shmBuffer_->lock();
1335  for (UInt_t i = 0; i < n; i++)
1336  result.push_back(shmBuffer_->evtTimeStamp(i));
1337  shmBuffer_->unlock();
1338  }
1339  */
1340  return result;
1341 }
1342 
1344 // implementation of private member functions
1346 
1347 //concept adapt lastResort
1349 
1350  /*
1351  cout << "lastResort: " << shmBuffer_->nbRawCellsToRead()
1352  << " more rawcells to read " << endl;
1353  while (shmBuffer_->nbRawCellsToRead() != 0) {
1354  FUShmRawCell* newCell = shmBuffer_->rawCellToRead();
1355  cout << "lastResort: " << shmBuffer_->nbRawCellsToRead()
1356  << endl;
1357  shmBuffer_->scheduleRawEmptyCellForDiscardServerSide(newCell);
1358  cout << "lastResort: schedule raw cell for discard" << endl;
1359  }
1360  */
1361 }
1362 
unsigned int index() const
Definition: FUShmRecoCell.h:22
unsigned int fuGuid() const
Definition: FUShmDqmCell.h:27
int postLength(MsgBuf &ptr, unsigned int length)
Definition: MasterQueue.cc:36
int i
Definition: DBlmapReader.cc:9
bool handleCrashedEP(UInt_t runNumber, pid_t pid)
void process(MemRef_t *bufRef)
Definition: FUResource.cc:126
void sendDiscard(UInt_t buResourceId)
Definition: IPCMethod.cc:150
unsigned int type() const
Definition: FUShmRecoCell.h:29
void initialize(bool segmentationMode, UInt_t nbRawCells, UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize, UInt_t recoCellSize, UInt_t dqmCellSize)
unsigned int index() const
Definition: FUShmDqmCell.h:22
bool fatalError() const
Definition: FUResource.h:110
unsigned int runNumber() const
Definition: FUShmRecoCell.h:24
uint64_t sumOfSquares_
Definition: IPCMethod.h:381
std::vector< pid_t > clientPrcIds() const
unsigned int fuGuid() const
Definition: FUShmRecoCell.h:28
UInt_t doCrcCheck_
Definition: IPCMethod.h:355
void setRBEventCount(uint32_t evtcnt)
unsigned int folderId() const
Definition: FUShmDqmCell.h:25
void sendDiscard(UInt_t buResourceId)
Definition: BUProxy.cc:108
UInt_t nbErrors_
Definition: IPCMethod.h:377
UInt_t nbDiscarded_
Definition: IPCMethod.h:367
UInt_t nbErrors(bool reset=true)
Definition: FUResource.cc:308
void sendAllocate()
Definition: IPCMethod.cc:111
unsigned int eventSize() const
Definition: FUShmRecoCell.h:33
std::ostream & logger()
Definition: fwLog.cc:41
static const unsigned int DISCARD_RAW_MESSAGE_TYPE
Definition: msq_constants.h:19
toolbox::mem::Reference MemRef_t
Definition: FUTypes.h:10
unsigned int evtAtUpdate() const
Definition: FUShmDqmCell.h:24
bool isReadyToShutDown_
Definition: IPCMethod.h:374
std::vector< std::string > cellStates() const
bool isLastMessageOfEvent(MemRef_t *bufRef)
Definition: IPCMethod.cc:248
log4cplus::Logger log_
Definition: IPCMethod.h:342
unsigned int fuProcessId() const
Definition: FUShmRecoCell.h:27
void release(bool detachResource)
Definition: FUResource.cc:82
UInt_t nbSentDqm_
Definition: IPCMethod.h:364
FUShmRawCell * rawCell()
Definition: RawMsgBuf.h:35
void sleep(Duration_t)
Definition: Utils.h:163
UInt_t nbClients() const
RawMsgBuf * getMsgToWrite()
Definition: RawCache.cc:61
unsigned char * payloadAddr() const
unsigned int evtNumber() const
Definition: FUShmRecoCell.h:25
UInt_t nbPendingSMDiscards_
Definition: IPCMethod.h:365
unsigned int nExpectedEPs() const
Definition: FUShmRecoCell.h:34
unsigned int rawCellIndex() const
Definition: FUShmRecoCell.h:23
unsigned int fuProcessId() const
Definition: FUShmDqmCell.h:26
void initialize(unsigned int index)
Definition: FUShmRawCell.cc:73
static RawCache * getInstance()
Definition: RawCache.cc:25
std::vector< time_t > cellTimeStamps() const
unsigned int runNumber() const
Definition: FUShmDqmCell.h:23
UInt_t nbCompleted_
Definition: IPCMethod.h:361
FUResourceQueue(bool segmentationMode, UInt_t nbRawCells, UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize, UInt_t recoCellSize, UInt_t dqmCellSize, int freeResReq, BUProxy *bu, SMProxy *sm, log4cplus::Logger logger, unsigned int, EvffedFillerRB *frb, xdaq::Application *)
UInt_t nbPending_
Definition: IPCMethod.h:360
unsigned char UChar_t
Definition: FUTypes.h:14
void clear(CLHEP::HepGenMatrix &m)
Helper function: Reset all elements of a matrix to 0.
Definition: matutil.cc:168
FUShmDqmCell * dqmCell()
Definition: DQMMsgBuf.h:37
tuple result
Definition: query.py:137
void postEndOfLumiSection(MemRef_t *bufRef)
UInt_t nbSentError_
Definition: IPCMethod.h:363
UInt_t nbCrcErrors_
Definition: IPCMethod.h:378
bool discardDqmEventWhileHalting(MemRef_t *bufRef)
void sendDqmEvent(UInt_t fuDqmId, UInt_t runNumber, UInt_t evtAtUpdate, UInt_t folderId, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize)
Definition: IPCMethod.cc:224
FUResourceVec_t resources_
Definition: IPCMethod.h:393
static const unsigned int RECO_MESSAGE_TYPE
Definition: msq_constants.h:15
bool isComplete() const
Definition: FUResource.h:221
void releaseMsg(unsigned int fuResourceId)
Definition: RawCache.cc:77
UInt_t runNumber_
Definition: IPCMethod.h:384
UInt_t nbLost_
Definition: IPCMethod.h:368
unsigned int UInt_t
Definition: FUTypes.h:12
unsigned char * payloadAddr() const
Definition: FUShmDqmCell.cc:54
unsigned int usedSize()
Definition: RawMsgBuf.h:41
unsigned long long uint64_t
Definition: Time.h:15
bool isAllocated() const
Definition: FUResource.h:113
bool buildResource(MemRef_t *bufRef)
unsigned int eventSize() const
Definition: FUShmDqmCell.h:31
std::atomic< int > nbPendingSMDqmDiscards_
Definition: IPCMethod.h:366
EvffedFillerRB * frb_
Definition: IPCMethod.h:390
void doCrcCheck(bool doCrcCheck)
Definition: FUResource.h:104
void sendErrorEvent(UInt_t fuResourceId, UInt_t runNumber, UInt_t evtNumber, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize)
Definition: IPCMethod.cc:186
UInt_t nbAllocSent_
Definition: IPCMethod.h:379
UInt_t sumOfSizes_
Definition: IPCMethod.h:382
std::string clientPrcIdsAsString() const
unsigned int outModId() const
Definition: FUShmRecoCell.h:26
bool discardDataEvent(MemRef_t *bufRef)
bool discardDqmEvent(MemRef_t *bufRef)
bool rcvQuiet(MsgBuf &ptr)
Definition: MasterQueue.cc:74
bool discardDataEventWhileHalting(MemRef_t *bufRef)
UInt_t nbSent_
Definition: IPCMethod.h:362
void sendDataEvent(UInt_t fuResourceId, UInt_t runNumber, UInt_t evtNumber, UInt_t outModId, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize)
Definition: IPCMethod.cc:171
std::vector< std::string > dqmCellStates() const
UInt_t nbAllocated_
Definition: IPCMethod.h:359
FUShmRecoCell * recoCell()
Definition: RecoMsgBuf.h:35
std::queue< UInt_t > freeResourceIds_
Definition: IPCMethod.h:348
UInt_t nbCrcErrors(bool reset=true)
Definition: FUResource.cc:316
void unlock()
Definition: IPCMethod.h:314
void setRBTimeStamp(uint64_t ts)
tuple cout
Definition: gather_cfg.py:121
UInt_t nbSent() const
Definition: IPCMethod.h:216
void sendInitMessage(UInt_t fuResourceId, UInt_t outModId, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize, UInt_t nExpectedEPs)
Definition: IPCMethod.cc:156
static const unsigned int DQM_MESSAGE_TYPE
Definition: msq_constants.h:16
std::vector< UInt_t > cellEvtNumbers() const
void allocate(FUShmRawCell *shmCell)
Definition: FUResource.cc:63
BUProxy * bu_
Definition: IPCMethod.h:339
std::vector< pid_t > cellPrcIds() const
bool discardWhileHalting(bool sendDiscards)