CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Private Attributes
evf::FUResourceQueue Class Reference

#include <FUResourceQueue.h>

Inheritance diagram for evf::FUResourceQueue:
evf::IPCMethod

Public Member Functions

bool buildResource (MemRef_t *bufRef)
 
std::vector< UInt_tcellEvtNumbers () const
 
std::vector< pid_t > cellPrcIds () const
 
std::vector< std::string > cellStates () const
 
std::vector< time_t > cellTimeStamps () const
 
void clear ()
 
std::vector< pid_t > clientPrcIds () const
 
std::string clientPrcIdsAsString () const
 
bool discard ()
 
bool discardDataEvent (MemRef_t *bufRef)
 
bool discardDataEventWhileHalting (MemRef_t *bufRef)
 
bool discardDqmEvent (MemRef_t *bufRef)
 
bool discardDqmEventWhileHalting (MemRef_t *bufRef)
 
bool discardWhileHalting (bool sendDiscards)
 
std::vector< std::string > dqmCellStates () const
 
void dropEvent ()
 
void dumpEvent (evf::FUShmRawCell *cell)
 
 FUResourceQueue (bool segmentationMode, UInt_t nbRawCells, UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize, UInt_t recoCellSize, UInt_t dqmCellSize, int freeResReq, BUProxy *bu, SMProxy *sm, log4cplus::Logger logger, unsigned int, EvffedFillerRB *frb, xdaq::Application *) throw (evf::Exception)
 
bool handleCrashedEP (UInt_t runNumber, pid_t pid)
 
void initialize (bool segmentationMode, UInt_t nbRawCells, UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize, UInt_t recoCellSize, UInt_t dqmCellSize) throw (evf::Exception)
 
void lastResort ()
 
UInt_t nbClients () const
 
UInt_t nbResources () const
 
void postEndOfLumiSection (MemRef_t *bufRef)
 
void resetCounters ()
 
void resetIPC ()
 resets the underlying IPC method to the initial state More...
 
bool sendData ()
 
bool sendDataWhileHalting ()
 
bool sendDqm ()
 
bool sendDqmWhileHalting ()
 
void shutDownClients ()
 
virtual ~FUResourceQueue ()
 
- Public Member Functions inherited from evf::IPCMethod
UInt_t allocateResource ()
 
void dumpEvent (evf::FUShmRawCell *cell)
 
void injectCRCError ()
 
 IPCMethod (bool segmentationMode, UInt_t nbRawCells, UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize, UInt_t recoCellSize, UInt_t dqmCellSize, int freeResReq, BUProxy *bu, SMProxy *sm, log4cplus::Logger logger, unsigned int timeout, EvffedFillerRB *frb, xdaq::Application *app) throw (evf::Exception)
 
bool isActive () const
 
bool isLastMessageOfEvent (MemRef_t *bufRef)
 
bool isReadyToShutDown () const
 
void lock ()
 
UInt_t nbAllocated () const
 
UInt_t nbAllocSent () const
 
UInt_t nbCompleted () const
 
UInt_t nbCrcErrors () const
 
UInt_t nbDiscarded () const
 
UInt_t nbEolDiscarded () const
 
UInt_t nbEolPosted () const
 
UInt_t nbErrors () const
 
UInt_t nbFreeSlots () const
 
UInt_t nbLost () const
 
UInt_t nbPending () const
 
UInt_t nbPendingSMDiscards () const
 
int nbPendingSMDqmDiscards () const
 
UInt_t nbSent () const
 
UInt_t nbSentDqm () const
 
UInt_t nbSentError () const
 
virtual std::string printStatus ()
 
void releaseResources ()
 releases all FUResource's More...
 
void resetPendingAllocates ()
 resets free resources to the maximum number More...
 
void sendAllocate ()
 
void sendDataEvent (UInt_t fuResourceId, UInt_t runNumber, UInt_t evtNumber, UInt_t outModId, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize)
 
void sendDiscard (UInt_t buResourceId)
 
void sendDqmEvent (UInt_t fuDqmId, UInt_t runNumber, UInt_t evtAtUpdate, UInt_t folderId, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize)
 
void sendErrorEvent (UInt_t fuResourceId, UInt_t runNumber, UInt_t evtNumber, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize)
 
void sendInitMessage (UInt_t fuResourceId, UInt_t outModId, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize, UInt_t nExpectedEPs)
 
void setActive (bool activeValue)
 
void setDoCrcCheck (UInt_t doCrcCheck)
 
void setDoDumpEvents (UInt_t doDumpEvents)
 
void setReadyToShutDown (bool readyValue)
 
void setRunNumber (UInt_t runNumber)
 
void setStopFlag (bool status)
 
UInt_t shutdownStatus ()
 
UInt_t sumOfSizes () const
 
uint64_t sumOfSquares () const
 
void unlock ()
 
virtual ~IPCMethod ()
 

Private Attributes

RawCachecache_
 
UInt_t dqmCellSize_
 
MasterQueue msq_
 
UInt_t rawCellSize_
 
UInt_t recoCellSize_
 

Additional Inherited Members

- Protected Attributes inherited from evf::IPCMethod
bool * acceptSMDataDiscard_
 
int * acceptSMDqmDiscard_
 
xdaq::Application * app_
 
BUProxybu_
 
UInt_t doCrcCheck_
 
UInt_t doDumpEvents_
 
EvffedFillerRBfrb_
 
std::queue< UInt_tfreeResourceIds_
 
unsigned int freeResRequiredForAllocate_
 
bool isActive_
 
bool isReadyToShutDown_
 
sem_t lock_
 
log4cplus::Logger log_
 
UInt_t nbAllocated_
 
UInt_t nbAllocSent_
 
UInt_t nbClientsToShutDown_
 
UInt_t nbCompleted_
 
UInt_t nbCrcErrors_
 
UInt_t nbDiscarded_
 
UInt_t nbDqmCells_
 
UInt_t nbEolDiscarded_
 
UInt_t nbEolPosted_
 
UInt_t nbErrors_
 
UInt_t nbLost_
 
UInt_t nbPending_
 
UInt_t nbPendingSMDiscards_
 
std::atomic< int > nbPendingSMDqmDiscards_
 
UInt_t nbRawCells_
 
UInt_t nbRecoCells_
 
UInt_t nbSent_
 
UInt_t nbSentDqm_
 
UInt_t nbSentError_
 
FUResourceVec_t resources_
 
UInt_t runNumber_
 
UInt_t shutdownStatus_
 
unsigned int shutdownTimeout_
 
SMProxysm_
 
bool stopFlag_
 
UInt_t sumOfSizes_
 
uint64_t sumOfSquares_
 

Detailed Description

Definition at line 31 of file FUResourceQueue.h.

Constructor & Destructor Documentation

FUResourceQueue::FUResourceQueue ( bool  segmentationMode,
UInt_t  nbRawCells,
UInt_t  nbRecoCells,
UInt_t  nbDqmCells,
UInt_t  rawCellSize,
UInt_t  recoCellSize,
UInt_t  dqmCellSize,
int  freeResReq,
BUProxy bu,
SMProxy sm,
log4cplus::Logger  logger,
unsigned int  timeout,
EvffedFillerRB frb,
xdaq::Application *  app 
)
throw (evf::Exception
)

Definition at line 40 of file FUResourceQueue.cc.

44  :
45  IPCMethod(segmentationMode, nbRawCells, nbRecoCells, nbDqmCells,
46  rawCellSize, recoCellSize, dqmCellSize, freeResReq, bu, sm,
47  logger, timeout, frb, app), msq_(99) {
48  //improve fix UInt_t and msq_(99)
49 
50  initialize(segmentationMode, nbRawCells, nbRecoCells, nbDqmCells,
51  rawCellSize, recoCellSize, dqmCellSize);
52 
53 }
void initialize(bool segmentationMode, UInt_t nbRawCells, UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize, UInt_t recoCellSize, UInt_t dqmCellSize)
std::ostream & logger()
Definition: fwLog.cc:41
IPCMethod(bool segmentationMode, UInt_t nbRawCells, UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize, UInt_t recoCellSize, UInt_t dqmCellSize, int freeResReq, BUProxy *bu, SMProxy *sm, log4cplus::Logger logger, unsigned int timeout, EvffedFillerRB *frb, xdaq::Application *app)
Definition: IPCMethod.cc:28
FUResourceQueue::~FUResourceQueue ( )
virtual

Definition at line 56 of file FUResourceQueue.cc.

References clear(), evf::MasterQueue::disconnect(), evf::IPCMethod::log_, and msq_.

56  {
57  clear();
58 
59  // disconnect from queue
60  if (msq_.disconnect() == 0)
61  LOG4CPLUS_INFO(log_, "MESSAGE QUEUE SUCCESSFULLY RELEASED.");
62 
63  // needed??
64  /*
65  if (0 != acceptSMDataDiscard_)
66  delete[] acceptSMDataDiscard_;
67  if (0 != acceptSMDqmDiscard_)
68  delete[] acceptSMDqmDiscard_;
69  */
70 }
log4cplus::Logger log_
Definition: IPCMethod.h:342

Member Function Documentation

bool FUResourceQueue::buildResource ( MemRef_t bufRef)
virtual

Has to be implemented by subclasses, according to IPC type.

Implements evf::IPCMethod.

Definition at line 771 of file FUResourceQueue.cc.

References evf::FUResource::allocate(), createPayload::block, evf::IPCMethod::bu_, cache_, evf::FUResource::doCrcCheck(), evf::IPCMethod::doCrcCheck_, evf::FUResource::fatalError(), evf::IPCMethod::frb_, evf::IPCMethod::freeResourceIds_, evf::RawCache::getMsgToWrite(), evf::FUShmRawCell::initialize(), evf::FUResource::isAllocated(), evf::FUResource::isComplete(), evf::IPCMethod::isLastMessageOfEvent(), evf::IPCMethod::lock(), evf::IPCMethod::log_, msq_, evf::IPCMethod::nbAllocated_, evf::IPCMethod::nbCompleted_, evf::FUResource::nbCrcErrors(), evf::IPCMethod::nbCrcErrors_, evf::IPCMethod::nbDiscarded_, evf::FUResource::nbErrors(), evf::IPCMethod::nbErrors_, evf::IPCMethod::nbLost_, evf::IPCMethod::nbPending_, cmsPerfSuiteHarvest::now, evf::MasterQueue::postLength(), evf::FUResource::process(), evf::RawMsgBuf::rawCell(), evf::FUResource::release(), evf::IPCMethod::resources_, evf::IPCMethod::sendAllocate(), evf::BUProxy::sendDiscard(), evf::EvffedFillerRB::setRBEventCount(), evf::EvffedFillerRB::setRBTimeStamp(), evf::IPCMethod::unlock(), and evf::RawMsgBuf::usedSize().

771  {
772 
773  bool eventComplete = false;
774 
775  I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *block =
776  (I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*) bufRef->getDataLocation();
777 
778  UInt_t fuResourceId = (UInt_t) block->fuTransactionId;
779  UInt_t buResourceId = (UInt_t) block->buResourceId;
780 
781  FUResource* resource = resources_[fuResourceId];
782 
783  RawMsgBuf* currentMessageToWrite = cache_->getMsgToWrite();
784 
785  if (!resource->fatalError() && !resource->isAllocated()) {
786  FUShmRawCell* rawCell = currentMessageToWrite->rawCell();
787  rawCell->initialize(fuResourceId);
788  resource->allocate(rawCell);
789 
790  timeval now;
791  gettimeofday(&now, 0);
792 
794  ((uint64_t) (now.tv_sec) << 32) + (uint64_t) (now.tv_usec));
795 
797 
798  if (doCrcCheck_ > 0 && 0 == nbAllocated_ % doCrcCheck_)
799  resource->doCrcCheck(true);
800  else
801  resource->doCrcCheck(false);
802  }
803 
804  // keep building this resource if it is healthy
805  if (!resource->fatalError()) {
806  resource->process(bufRef);
807 
808  lock();
809  nbErrors_ += resource->nbErrors();
810  nbCrcErrors_ += resource->nbCrcErrors();
811  unlock();
812 
813  // make resource available for pick-up
814  if (resource->isComplete()) {
815  lock();
816  nbCompleted_++;
817  nbPending_--;
818  unlock();
819  /*
820  cout << "POSTING to q: msqid = " << msq_.id()
821  << ", message buf is allocated for: "
822  << currentMessageToWrite->msize()
823  << "... Actually copied: "
824  << currentMessageToWrite->usedSize() << " fuResourceID = "
825  << currentMessageToWrite->rawCell()->fuResourceId()
826  << " buResourceID = "
827  << currentMessageToWrite->rawCell()->buResourceId() << endl;
828  */
829 
830  //msq_.post(*currentMessageToWrite);
831 
832  try {
833  msq_.postLength(*currentMessageToWrite,
834  currentMessageToWrite->usedSize());
835  } catch (...) {
836  string errmsg = "Failed to post message to Queue!";
837  LOG4CPLUS_FATAL(log_, errmsg);
838  XCEPT_RAISE(evf::Exception, errmsg);
839 
840  }
841 
842  // CURRENT RECEIVERS
843  /*
844  vector<int> receivers = msq_.getReceivers();
845  cout << "--Receiving processes: ";
846  for (unsigned int i = 0; i < receivers.size(); ++i)
847  cout << i << " " << receivers[i];
848  cout << endl;
849  */
850 
851  eventComplete = true;
852  }
853 
854  }
855  // bad event, release msg, and the whole resource if this was the last one
856  if (resource->fatalError()) {
857  bool lastMsg = isLastMessageOfEvent(bufRef);
858  if (lastMsg) {
859  resource->release(false);
860  lock();
861  freeResourceIds_.push(fuResourceId);
862  nbDiscarded_++;
863  nbLost_++;
864  nbPending_--;
865  unlock();
866  bu_->sendDiscard(buResourceId);
867  sendAllocate();
868  }
869  bufRef->release(); // this should now be safe re: appendToSuperFrag as corrupted blocks will be removed...
870  }
871 
872  return eventComplete;
873 }
int postLength(MsgBuf &ptr, unsigned int length)
Definition: MasterQueue.cc:36
void process(MemRef_t *bufRef)
Definition: FUResource.cc:126
bool fatalError() const
Definition: FUResource.h:110
UInt_t doCrcCheck_
Definition: IPCMethod.h:355
void setRBEventCount(uint32_t evtcnt)
void sendDiscard(UInt_t buResourceId)
Definition: BUProxy.cc:108
UInt_t nbErrors_
Definition: IPCMethod.h:377
UInt_t nbDiscarded_
Definition: IPCMethod.h:367
UInt_t nbErrors(bool reset=true)
Definition: FUResource.cc:308
void sendAllocate()
Definition: IPCMethod.cc:111
bool isLastMessageOfEvent(MemRef_t *bufRef)
Definition: IPCMethod.cc:248
log4cplus::Logger log_
Definition: IPCMethod.h:342
void release(bool detachResource)
Definition: FUResource.cc:82
FUShmRawCell * rawCell()
Definition: RawMsgBuf.h:35
RawMsgBuf * getMsgToWrite()
Definition: RawCache.cc:61
void initialize(unsigned int index)
Definition: FUShmRawCell.cc:73
UInt_t nbCompleted_
Definition: IPCMethod.h:361
UInt_t nbPending_
Definition: IPCMethod.h:360
UInt_t nbCrcErrors_
Definition: IPCMethod.h:378
FUResourceVec_t resources_
Definition: IPCMethod.h:393
bool isComplete() const
Definition: FUResource.h:221
UInt_t nbLost_
Definition: IPCMethod.h:368
unsigned int UInt_t
Definition: FUTypes.h:12
unsigned int usedSize()
Definition: RawMsgBuf.h:41
unsigned long long uint64_t
Definition: Time.h:15
bool isAllocated() const
Definition: FUResource.h:113
EvffedFillerRB * frb_
Definition: IPCMethod.h:390
void doCrcCheck(bool doCrcCheck)
Definition: FUResource.h:104
UInt_t nbAllocated_
Definition: IPCMethod.h:359
std::queue< UInt_t > freeResourceIds_
Definition: IPCMethod.h:348
UInt_t nbCrcErrors(bool reset=true)
Definition: FUResource.cc:316
void unlock()
Definition: IPCMethod.h:314
void setRBTimeStamp(uint64_t ts)
void allocate(FUShmRawCell *shmCell)
Definition: FUResource.cc:63
BUProxy * bu_
Definition: IPCMethod.h:339
vector< UInt_t > FUResourceQueue::cellEvtNumbers ( ) const
virtual

Implements evf::IPCMethod.

Definition at line 1294 of file FUResourceQueue.cc.

References query::result.

1294  {
1295  vector<UInt_t> result;
1296 
1297  /*
1298  if (0 != shmBuffer_) {
1299  UInt_t n = nbResources();
1300  shmBuffer_->lock();
1301  for (UInt_t i = 0; i < n; i++)
1302  result.push_back(shmBuffer_->evtNumber(i));
1303  shmBuffer_->unlock();
1304  }
1305  */
1306  return result;
1307 }
tuple result
Definition: query.py:137
vector< pid_t > FUResourceQueue::cellPrcIds ( ) const
virtual

Implements evf::IPCMethod.

Definition at line 1311 of file FUResourceQueue.cc.

References query::result.

1311  {
1312  vector<pid_t> result;
1313 
1314  /*
1315  if (0 != shmBuffer_) {
1316  UInt_t n = nbResources();
1317  shmBuffer_->lock();
1318  for (UInt_t i = 0; i < n; i++)
1319  result.push_back(shmBuffer_->evtPrcId(i));
1320  shmBuffer_->unlock();
1321  }
1322  */
1323  return result;
1324 }
tuple result
Definition: query.py:137
vector< string > FUResourceQueue::cellStates ( ) const
virtual

Implements evf::IPCMethod.

Definition at line 1220 of file FUResourceQueue.cc.

References query::result.

1220  {
1221  vector<string> result;
1222  /*
1223  if (0 != shmBuffer_) {
1224  UInt_t n = nbResources();
1225  shmBuffer_->lock();
1226  for (UInt_t i = 0; i < n; i++) {
1227  evt::State_t state = shmBuffer_->evtState(i);
1228  if (state == evt::EMPTY)
1229  result.push_back("EMPTY");
1230  else if (state == evt::STOP)
1231  result.push_back("STOP");
1232  else if (state == evt::LUMISECTION)
1233  result.push_back("LUMISECTION");
1234  else if (state == evt::RAWWRITING)
1235  result.push_back("RAWWRITING");
1236  else if (state == evt::RAWWRITTEN)
1237  result.push_back("RAWWRITTEN");
1238  else if (state == evt::RAWREADING)
1239  result.push_back("RAWREADING");
1240  else if (state == evt::RAWREAD)
1241  result.push_back("RAWREAD");
1242  else if (state == evt::PROCESSING)
1243  result.push_back("PROCESSING");
1244  else if (state == evt::PROCESSED)
1245  result.push_back("PROCESSED");
1246  else if (state == evt::RECOWRITING)
1247  result.push_back("RECOWRITING");
1248  else if (state == evt::RECOWRITTEN)
1249  result.push_back("RECOWRITTEN");
1250  else if (state == evt::SENDING)
1251  result.push_back("SENDING");
1252  else if (state == evt::SENT)
1253  result.push_back("SENT");
1254  else if (state == evt::DISCARDING)
1255  result.push_back("DISCARDING");
1256  }
1257  shmBuffer_->unlock();
1258  }
1259  */
1260  return result;
1261 }
tuple result
Definition: query.py:137
vector< time_t > FUResourceQueue::cellTimeStamps ( ) const
virtual

Implements evf::IPCMethod.

Definition at line 1328 of file FUResourceQueue.cc.

References query::result.

1328  {
1329  vector<time_t> result;
1330 
1331  /*
1332  if (0 != shmBuffer_) {
1333  UInt_t n = nbResources();
1334  shmBuffer_->lock();
1335  for (UInt_t i = 0; i < n; i++)
1336  result.push_back(shmBuffer_->evtTimeStamp(i));
1337  shmBuffer_->unlock();
1338  }
1339  */
1340  return result;
1341 }
tuple result
Definition: query.py:137
void FUResourceQueue::clear ( )
virtual
vector< pid_t > FUResourceQueue::clientPrcIds ( ) const
virtual

Implements evf::IPCMethod.

Definition at line 1187 of file FUResourceQueue.cc.

References query::result.

1187  {
1188  vector<pid_t> result;
1189 
1190  /*
1191  if (0 != shmBuffer_) {
1192  UInt_t n = nbClients();
1193  for (UInt_t i = 0; i < n; i++)
1194  result.push_back(shmBuffer_->clientPrcId(i));
1195  }
1196  */
1197  return result;
1198 }
tuple result
Definition: query.py:137
string FUResourceQueue::clientPrcIdsAsString ( ) const
virtual

Implements evf::IPCMethod.

Definition at line 1202 of file FUResourceQueue.cc.

1202  {
1203  stringstream ss;
1204 
1205  /*
1206  if (0 != shmBuffer_) {
1207  UInt_t n = nbClients();
1208  for (UInt_t i = 0; i < n; i++) {
1209  if (i > 0)
1210  ss << ",";
1211  ss << shmBuffer_->clientPrcId(i);
1212  }
1213  }
1214  */
1215  return ss.str();
1216 }
bool FUResourceQueue::discard ( )
virtual

Has to be implemented by subclasses, according to IPC type.

isHalting_

Implements evf::IPCMethod.

Definition at line 483 of file FUResourceQueue.cc.

References gather_cfg::cout, evf::DISCARD_RAW_MESSAGE_TYPE, evf::IPCMethod::freeResourceIds_, evf::RawCache::getInstance(), evf::IPCMethod::lock(), msq_, evf::MasterQueue::rcvQuiet(), evf::RawCache::releaseMsg(), evf::IPCMethod::resources_, evf::IPCMethod::sendAllocate(), evf::IPCMethod::sendDiscard(), stor::utils::sleep(), and evf::IPCMethod::unlock().

483  {
484 
485  bool reschedule = true;
486 
487  /*
488  * DISCARDING raw msg buffers
489  */
490  MsgBuf discardRaw(2 * sizeof(unsigned int), DISCARD_RAW_MESSAGE_TYPE);
491  bool rcvSuccess = msq_.rcvQuiet(discardRaw);
492 
493  if (!rcvSuccess) {
494  cout << "RCV failed!" << endl;
495  ::sleep(5);
496  return reschedule;
497  }
498 
499  unsigned int* pBuID = (unsigned int*) discardRaw->mtext;
500  unsigned int* pFuID = (unsigned int*) (discardRaw->mtext
501  + sizeof(unsigned int));
502 
503  unsigned int buResourceId = *pBuID;
504  unsigned int fuResourceId = *pFuID;
505 
506  cout << "Discard received for buResourceID: " << buResourceId
507  << " fuResourceID " << fuResourceId << endl << endl;
508 
509  //FUShmRawCell* cell = shmBuffer_->rawCellToDiscard();
510  //evt::State_t state = shmBuffer_->evtState(cell->index());
511 
512  /*
513  bool shutDown = (state == evt::STOP);
514  bool isLumi = (state == evt::LUMISECTION);
515  */
516  //UInt_t fuResourceId = cell->fuResourceId();
517  //UInt_t buResourceId = cell->buResourceId();
518 
519  // cout << "discard loop, state, shutDown, isLumi " << state << " "
520  // << shutDown << " " << isLumi << endl;
521  // cout << "resource ids " << fuResourceId << " " << buResourceId << endl;
522 
523  /*
524  if (shutDown) {
525  LOG4CPLUS_INFO(log_, "nbClientsToShutDown = " << nbClientsToShutDown_);
526  if (nbClientsToShutDown_ > 0)
527  --nbClientsToShutDown_;
528  if (nbClientsToShutDown_ == 0) {
529  LOG4CPLUS_INFO(log_, "Don't reschedule discard-workloop.");
530  isActive_ = false;
531  reschedule = false;
532  }
533  }
534  */
535 
536  //shmBuffer_->discardRawCell(cell);
537 
538  //if (!shutDown && !isLumi) {
539  if (true) {
540  // (false = no shmdt)
541  resources_[fuResourceId]->release(false);
542  // also release space in RawCache
543  RawCache::getInstance()->releaseMsg(fuResourceId);
544 
545  lock();
546  freeResourceIds_.push(fuResourceId);
547  assert(freeResourceIds_.size() <= resources_.size());
548  unlock();
549 
550  if (true) {
551  sendDiscard(buResourceId);
552  if (true)
553  sendAllocate();
554  }
555  }
556 
557  // concept shutdown cycle
558  /*
559  if (!reschedule) {
560  cout << " entered shutdown cycle " << endl;
561  shmBuffer_->writeRecoEmptyEvent();
562  UInt_t count = 0;
563  while (count < 100) {
564  cout << " shutdown cycle " << shmBuffer_->nClients() << " "
565  << FUShmBuffer::shm_nattch(shmBuffer_->shmid())
566  << endl;
567  if (shmBuffer_->nClients() == 0 && FUShmBuffer::shm_nattch(
568  shmBuffer_->shmid()) == 1) {
569  // isReadyToShutDown_ = true;
570  break;
571  } else {
572  count++;
573  cout << " shutdown cycle attempt " << count << endl;
574  LOG4CPLUS_DEBUG(
575  log_,
576  "FUResourceTable: Wait for all clients to detach,"
577  << " nClients=" << shmBuffer_->nClients()
578  << " nattch=" << FUShmBuffer::shm_nattch(
579  shmBuffer_->shmid()) << " (" << count << ")");
580  ::usleep(shutdownTimeout_);
581  if (count * shutdownTimeout_ > 10000000)
582  LOG4CPLUS_WARN(
583  log_,
584  "FUResourceTable:LONG Wait (>10s) for all clients to detach,"
585  << " nClients=" << shmBuffer_->nClients()
586  << " nattch=" << FUShmBuffer::shm_nattch(
587  shmBuffer_->shmid()) << " (" << count
588  << ")");
589 
590  }
591  }
592  bool allEmpty = false;
593  cout << "Checking if all dqm cells are empty " << endl;
594  while (!allEmpty) {
595  UInt_t n = nbDqmCells_;
596  allEmpty = true;
597  shmBuffer_->lock();
598  for (UInt_t i = 0; i < n; i++) {
599  dqm::State_t state = shmBuffer_->dqmState(i);
600  if (state != dqm::EMPTY)
601  allEmpty = false;
602  }
603  shmBuffer_->unlock();
604  }
605  cout << "Making sure there are no dqm pending discards "
606  << endl;
607  if (nbPendingSMDqmDiscards_ != 0) {
608  LOG4CPLUS_WARN(
609  log_,
610  "FUResourceTable: pending DQM discards not zero: ="
611  << nbPendingSMDqmDiscards_
612  << " while cells are all empty. This may cause problems at next start ");
613 
614  }
615  shmBuffer_->writeDqmEmptyEvent();
616  isReadyToShutDown_ = true; // moved here from within the first while loop to make sure the
617  // sendDqm loop has been shut down as well
618  }
619  */
620 
621  return reschedule;
622 }
void sendDiscard(UInt_t buResourceId)
Definition: IPCMethod.cc:150
void sendAllocate()
Definition: IPCMethod.cc:111
static const unsigned int DISCARD_RAW_MESSAGE_TYPE
Definition: msq_constants.h:19
void sleep(Duration_t)
Definition: Utils.h:163
static RawCache * getInstance()
Definition: RawCache.cc:25
FUResourceVec_t resources_
Definition: IPCMethod.h:393
void releaseMsg(unsigned int fuResourceId)
Definition: RawCache.cc:77
bool rcvQuiet(MsgBuf &ptr)
Definition: MasterQueue.cc:74
std::queue< UInt_t > freeResourceIds_
Definition: IPCMethod.h:348
void unlock()
Definition: IPCMethod.h:314
tuple cout
Definition: gather_cfg.py:121
bool FUResourceQueue::discardDataEvent ( MemRef_t bufRef)
virtual

Has to be implemented by subclasses, according to IPC type.

Implements evf::IPCMethod.

Definition at line 878 of file FUResourceQueue.cc.

878  {
879 
880  /*
881  I2O_FU_DATA_DISCARD_MESSAGE_FRAME *msg;
882  msg = (I2O_FU_DATA_DISCARD_MESSAGE_FRAME*) bufRef->getDataLocation();
883  UInt_t recoIndex = msg->rbBufferID;
884 
885  if (acceptSMDataDiscard_[recoIndex]) {
886  lock();
887  nbPendingSMDiscards_--;
888  unlock();
889  acceptSMDataDiscard_[recoIndex] = false;
890 
891  if (!isHalting_) {
892  shmBuffer_->discardRecoCell(recoIndex);
893  bufRef->release();
894  }
895  } else {
896  LOG4CPLUS_ERROR(log_, "Spurious DATA discard by StorageManager, skip!");
897  }
898 
899  if (isHalting_) {
900  bufRef->release();
901  return false;
902  }
903  */
904  return true;
905 }
bool FUResourceQueue::discardDataEventWhileHalting ( MemRef_t bufRef)
virtual

Has to be implemented by subclasses, according to IPC type.

Implements evf::IPCMethod.

Definition at line 909 of file FUResourceQueue.cc.

909  {
910 
911  /*
912  I2O_FU_DATA_DISCARD_MESSAGE_FRAME *msg;
913  msg = (I2O_FU_DATA_DISCARD_MESSAGE_FRAME*) bufRef->getDataLocation();
914  UInt_t recoIndex = msg->rbBufferID;
915 
916  if (acceptSMDataDiscard_[recoIndex]) {
917  lock();
918  nbPendingSMDiscards_--;
919  unlock();
920  acceptSMDataDiscard_[recoIndex] = false;
921 
922  if (!isHalting_) {
923  shmBuffer_->discardRecoCell(recoIndex);
924  bufRef->release();
925  }
926  } else {
927  LOG4CPLUS_ERROR(log_, "Spurious DATA discard by StorageManager, skip!");
928  }
929 
930  if (isHalting_) {
931  bufRef->release();
932  return false;
933  }
934  */
935  return true;
936 }
bool FUResourceQueue::discardDqmEvent ( MemRef_t bufRef)
virtual

Has to be implemented by subclasses, according to IPC type.

Implements evf::IPCMethod.

Definition at line 941 of file FUResourceQueue.cc.

941  {
942  /*
943  I2O_FU_DQM_DISCARD_MESSAGE_FRAME *msg;
944  msg = (I2O_FU_DQM_DISCARD_MESSAGE_FRAME*) bufRef->getDataLocation();
945  UInt_t dqmIndex = msg->rbBufferID;
946  unsigned int ntries = 0;
947  while (shmBuffer_->dqmState(dqmIndex) != dqm::SENT) {
948  LOG4CPLUS_WARN(
949  log_,
950  "DQM discard for cell " << dqmIndex
951  << " which is not yer in SENT state - waiting");
952  ::usleep(10000);
953  if (ntries++ > 10) {
954  LOG4CPLUS_ERROR(
955  log_,
956  "DQM cell " << dqmIndex
957  << " discard timed out while cell still in state "
958  << shmBuffer_->dqmState(dqmIndex));
959  bufRef->release();
960  return true;
961  }
962  }
963  if (acceptSMDqmDiscard_[dqmIndex] > 0) {
964  acceptSMDqmDiscard_[dqmIndex]--;
965  if (nbPendingSMDqmDiscards_ > 0) {
966  nbPendingSMDqmDiscards_--;
967  } else {
968  LOG4CPLUS_WARN (log_,"Spurious??? DQM discard by StorageManager, index " << dqmIndex
969  << " cell state " << shmBuffer_->dqmState(dqmIndex) << " accept flag " << acceptSMDqmDiscard_[dqmIndex];);
970  }
971 
972  if (!isHalting_) {
973  shmBuffer_->discardDqmCell(dqmIndex);
974  bufRef->release();
975  }
976 
977  }
978  else {
979  LOG4CPLUS_ERROR(log_,"Spurious DQM discard for cell " << dqmIndex
980  << " from StorageManager while cell is not accepting discards");
981  }
982 
983  if (isHalting_) {
984  bufRef->release();
985  return false;
986  }
987  */
988  return true;
989 }
bool FUResourceQueue::discardDqmEventWhileHalting ( MemRef_t bufRef)
virtual

Has to be implemented by subclasses, according to IPC type.

Implements evf::IPCMethod.

Definition at line 993 of file FUResourceQueue.cc.

993  {
994  /*
995  I2O_FU_DQM_DISCARD_MESSAGE_FRAME *msg;
996  msg = (I2O_FU_DQM_DISCARD_MESSAGE_FRAME*) bufRef->getDataLocation();
997  UInt_t dqmIndex = msg->rbBufferID;
998  unsigned int ntries = 0;
999  while (shmBuffer_->dqmState(dqmIndex) != dqm::SENT) {
1000  LOG4CPLUS_WARN(
1001  log_,
1002  "DQM discard for cell " << dqmIndex
1003  << " which is not yer in SENT state - waiting");
1004  ::usleep(10000);
1005  if (ntries++ > 10) {
1006  LOG4CPLUS_ERROR(
1007  log_,
1008  "DQM cell " << dqmIndex
1009  << " discard timed out while cell still in state "
1010  << shmBuffer_->dqmState(dqmIndex));
1011  bufRef->release();
1012  return true;
1013  }
1014  }
1015  if (acceptSMDqmDiscard_[dqmIndex] > 0) {
1016  acceptSMDqmDiscard_[dqmIndex]--;
1017  if (nbPendingSMDqmDiscards_ > 0) {
1018  nbPendingSMDqmDiscards_--;
1019  } else {
1020  LOG4CPLUS_WARN (log_,"Spurious??? DQM discard by StorageManager, index " << dqmIndex
1021  << " cell state " << shmBuffer_->dqmState(dqmIndex) << " accept flag " << acceptSMDqmDiscard_[dqmIndex];);
1022  }
1023 
1024  if (!isHalting_) {
1025  shmBuffer_->discardDqmCell(dqmIndex);
1026  bufRef->release();
1027  }
1028 
1029  }
1030  else {
1031  LOG4CPLUS_ERROR(log_,"Spurious DQM discard for cell " << dqmIndex
1032  << " from StorageManager while cell is not accepting discards");
1033  }
1034 
1035  if (isHalting_) {
1036  bufRef->release();
1037  return false;
1038  }
1039  */
1040  return true;
1041 }
bool FUResourceQueue::discardWhileHalting ( bool  sendDiscards)
virtual

Has to be implemented by subclasses, according to IPC type.

isHalting_

Implements evf::IPCMethod.

Definition at line 625 of file FUResourceQueue.cc.

References gather_cfg::cout, evf::DISCARD_RAW_MESSAGE_TYPE, evf::IPCMethod::freeResourceIds_, evf::RawCache::getInstance(), evf::IPCMethod::lock(), msq_, evf::MasterQueue::rcvQuiet(), evf::RawCache::releaseMsg(), evf::IPCMethod::resources_, evf::IPCMethod::sendAllocate(), evf::IPCMethod::sendDiscard(), stor::utils::sleep(), and evf::IPCMethod::unlock().

625  {
626 
627  bool reschedule = true;
628 
629  /*
630  * DISCARDING raw msg buffers
631  */
632  MsgBuf discardRaw(2 * sizeof(unsigned int), DISCARD_RAW_MESSAGE_TYPE);
633  bool rcvSuccess = msq_.rcvQuiet(discardRaw);
634 
635  if (!rcvSuccess) {
636  cout << "RCV failed!" << endl;
637  ::sleep(5);
638  return reschedule;
639  }
640 
641  unsigned int* pBuID = (unsigned int*) discardRaw->mtext;
642  unsigned int* pFuID = (unsigned int*) (discardRaw->mtext
643  + sizeof(unsigned int));
644 
645  unsigned int buResourceId = *pBuID;
646  unsigned int fuResourceId = *pFuID;
647 
648  cout << "Discard received for buResourceID: " << buResourceId
649  << " fuResourceID " << fuResourceId << endl << endl;
650 
651  //FUShmRawCell* cell = shmBuffer_->rawCellToDiscard();
652  //evt::State_t state = shmBuffer_->evtState(cell->index());
653 
654  /*
655  bool shutDown = (state == evt::STOP);
656  bool isLumi = (state == evt::LUMISECTION);
657  */
658  //UInt_t fuResourceId = cell->fuResourceId();
659  //UInt_t buResourceId = cell->buResourceId();
660 
661  // cout << "discard loop, state, shutDown, isLumi " << state << " "
662  // << shutDown << " " << isLumi << endl;
663  // cout << "resource ids " << fuResourceId << " " << buResourceId << endl;
664 
665  /*
666  if (shutDown) {
667  LOG4CPLUS_INFO(log_, "nbClientsToShutDown = " << nbClientsToShutDown_);
668  if (nbClientsToShutDown_ > 0)
669  --nbClientsToShutDown_;
670  if (nbClientsToShutDown_ == 0) {
671  LOG4CPLUS_INFO(log_, "Don't reschedule discard-workloop.");
672  isActive_ = false;
673  reschedule = false;
674  }
675  }
676  */
677 
678  //shmBuffer_->discardRawCell(cell);
679 
680  //if (!shutDown && !isLumi) {
681  if (true) {
682  // (false = no shmdt)
683  resources_[fuResourceId]->release(false);
684  // also release space in RawCache
685  RawCache::getInstance()->releaseMsg(fuResourceId);
686 
687  lock();
688  freeResourceIds_.push(fuResourceId);
689  assert(freeResourceIds_.size() <= resources_.size());
690  unlock();
691 
692  if (false) {
693  sendDiscard(buResourceId);
694  if (false)
695  sendAllocate();
696  }
697  }
698 
699  // concept shutdown cycle
700  /*
701  if (!reschedule) {
702  cout << " entered shutdown cycle " << endl;
703  shmBuffer_->writeRecoEmptyEvent();
704  UInt_t count = 0;
705  while (count < 100) {
706  cout << " shutdown cycle " << shmBuffer_->nClients() << " "
707  << FUShmBuffer::shm_nattch(shmBuffer_->shmid())
708  << endl;
709  if (shmBuffer_->nClients() == 0 && FUShmBuffer::shm_nattch(
710  shmBuffer_->shmid()) == 1) {
711  // isReadyToShutDown_ = true;
712  break;
713  } else {
714  count++;
715  cout << " shutdown cycle attempt " << count << endl;
716  LOG4CPLUS_DEBUG(
717  log_,
718  "FUResourceTable: Wait for all clients to detach,"
719  << " nClients=" << shmBuffer_->nClients()
720  << " nattch=" << FUShmBuffer::shm_nattch(
721  shmBuffer_->shmid()) << " (" << count << ")");
722  ::usleep(shutdownTimeout_);
723  if (count * shutdownTimeout_ > 10000000)
724  LOG4CPLUS_WARN(
725  log_,
726  "FUResourceTable:LONG Wait (>10s) for all clients to detach,"
727  << " nClients=" << shmBuffer_->nClients()
728  << " nattch=" << FUShmBuffer::shm_nattch(
729  shmBuffer_->shmid()) << " (" << count
730  << ")");
731 
732  }
733  }
734  bool allEmpty = false;
735  cout << "Checking if all dqm cells are empty " << endl;
736  while (!allEmpty) {
737  UInt_t n = nbDqmCells_;
738  allEmpty = true;
739  shmBuffer_->lock();
740  for (UInt_t i = 0; i < n; i++) {
741  dqm::State_t state = shmBuffer_->dqmState(i);
742  if (state != dqm::EMPTY)
743  allEmpty = false;
744  }
745  shmBuffer_->unlock();
746  }
747  cout << "Making sure there are no dqm pending discards "
748  << endl;
749  if (nbPendingSMDqmDiscards_ != 0) {
750  LOG4CPLUS_WARN(
751  log_,
752  "FUResourceTable: pending DQM discards not zero: ="
753  << nbPendingSMDqmDiscards_
754  << " while cells are all empty. This may cause problems at next start ");
755 
756  }
757  shmBuffer_->writeDqmEmptyEvent();
758  isReadyToShutDown_ = true; // moved here from within the first while loop to make sure the
759  // sendDqm loop has been shut down as well
760  }
761  */
762 
763  return reschedule;
764 }
void sendDiscard(UInt_t buResourceId)
Definition: IPCMethod.cc:150
void sendAllocate()
Definition: IPCMethod.cc:111
static const unsigned int DISCARD_RAW_MESSAGE_TYPE
Definition: msq_constants.h:19
void sleep(Duration_t)
Definition: Utils.h:163
static RawCache * getInstance()
Definition: RawCache.cc:25
FUResourceVec_t resources_
Definition: IPCMethod.h:393
void releaseMsg(unsigned int fuResourceId)
Definition: RawCache.cc:77
bool rcvQuiet(MsgBuf &ptr)
Definition: MasterQueue.cc:74
std::queue< UInt_t > freeResourceIds_
Definition: IPCMethod.h:348
void unlock()
Definition: IPCMethod.h:314
tuple cout
Definition: gather_cfg.py:121
vector< string > FUResourceQueue::dqmCellStates ( ) const
virtual

Implements evf::IPCMethod.

Definition at line 1264 of file FUResourceQueue.cc.

References query::result.

1264  {
1265  vector<string> result;
1266 
1267  /*
1268  if (0 != shmBuffer_) {
1269  UInt_t n = nbDqmCells_;
1270  shmBuffer_->lock();
1271  for (UInt_t i = 0; i < n; i++) {
1272  dqm::State_t state = shmBuffer_->dqmState(i);
1273  if (state == dqm::EMPTY)
1274  result.push_back("EMPTY");
1275  else if (state == dqm::WRITING)
1276  result.push_back("WRITING");
1277  else if (state == dqm::WRITTEN)
1278  result.push_back("WRITTEN");
1279  else if (state == dqm::SENDING)
1280  result.push_back("SENDING");
1281  else if (state == dqm::SENT)
1282  result.push_back("SENT");
1283  else if (state == dqm::DISCARDING)
1284  result.push_back("DISCARDING");
1285  }
1286  shmBuffer_->unlock();
1287  }
1288  */
1289  return result;
1290 }
tuple result
Definition: query.py:137
void FUResourceQueue::dropEvent ( )
virtual

Has to be implemented by subclasses, according to IPC type.

Implements evf::IPCMethod.

Definition at line 1060 of file FUResourceQueue.cc.

1060  {
1061  /*
1062  FUShmRawCell* cell = shmBuffer_->rawCellToRead();
1063  UInt_t fuResourceId = cell->fuResourceId();
1064  shmBuffer_->finishReadingRawCell(cell);
1065  shmBuffer_->scheduleRawCellForDiscard(fuResourceId);
1066  */
1067 }
void evf::FUResourceQueue::dumpEvent ( evf::FUShmRawCell cell)
bool FUResourceQueue::handleCrashedEP ( UInt_t  runNumber,
pid_t  pid 
)
virtual

Has to be implemented by subclasses, according to IPC type.

Implements evf::IPCMethod.

Definition at line 1071 of file FUResourceQueue.cc.

1071  {
1072  bool retval = false;
1073  /*
1074  vector<pid_t> pids = cellPrcIds();
1075  UInt_t iRawCell = pids.size();
1076  for (UInt_t i = 0; i < pids.size(); i++) {
1077  if (pid == pids[i]) {
1078  iRawCell = i;
1079  break;
1080  }
1081  }
1082 
1083  if (iRawCell < pids.size()) {
1084  shmBuffer_->writeErrorEventData(runNumber, pid, iRawCell);
1085  retval = true;
1086  } else
1087  LOG4CPLUS_WARN(log_,
1088  "No raw data to send to error stream for process " << pid);
1089  shmBuffer_->removeClientPrcId(pid);
1090  */
1091  return retval;
1092 }
void FUResourceQueue::initialize ( bool  segmentationMode,
UInt_t  nbRawCells,
UInt_t  nbRecoCells,
UInt_t  nbDqmCells,
UInt_t  rawCellSize,
UInt_t  recoCellSize,
UInt_t  dqmCellSize 
)
throw (evf::Exception
)

Definition at line 77 of file FUResourceQueue.cc.

References hitfit::clear(), evf::RawCache::getInstance(), i, lumiQueryAPI::msg, and evf::FUResource::release().

79  {
80 
81  rawCellSize_ = rawCellSize;
82  recoCellSize_ = recoCellSize;
83  dqmCellSize_ = dqmCellSize;
84 
85  clear();
86 
87  if (0 == &msq_ || 0 == msq_.id()) {
88  string msg = "CREATION OF MESSAGE QUEUE FAILED!";
89  LOG4CPLUS_FATAL(log_, msg);
90  XCEPT_RAISE(evf::Exception, msg);
91  }
92 
94  cache_->initialise(nbRawCells, rawCellSize);
95 
96  // SEC need cap on max resources
97  for (UInt_t i = 0; i < nbRawCells_; i++) {
98  FUResource* newResource = new FUResource(i, log_, frb_, app_);
99  newResource->release(false);
100  resources_.push_back(newResource);
101  freeResourceIds_.push(i);
102  }
103 
104  //acceptSMDataDiscard_ = new bool[nbRecoCells];
105  //acceptSMDqmDiscard_ = new int[nbDqmCells];
106 
107  resetCounters();
108 }
int i
Definition: DBlmapReader.cc:9
log4cplus::Logger log_
Definition: IPCMethod.h:342
void release(bool detachResource)
Definition: FUResource.cc:82
UInt_t nbRawCells_
Definition: IPCMethod.h:345
static RawCache * getInstance()
Definition: RawCache.cc:25
FUResourceVec_t resources_
Definition: IPCMethod.h:393
xdaq::Application * app_
Definition: IPCMethod.h:391
unsigned int UInt_t
Definition: FUTypes.h:12
void initialise(unsigned int nMsgs, unsigned int cellSize)
Definition: RawCache.cc:32
EvffedFillerRB * frb_
Definition: IPCMethod.h:390
std::queue< UInt_t > freeResourceIds_
Definition: IPCMethod.h:348
void FUResourceQueue::lastResort ( )
virtual

Has to be implemented by subclasses, according to IPC type.

Implements evf::IPCMethod.

Definition at line 1348 of file FUResourceQueue.cc.

1348  {
1349 
1350  /*
1351  cout << "lastResort: " << shmBuffer_->nbRawCellsToRead()
1352  << " more rawcells to read " << endl;
1353  while (shmBuffer_->nbRawCellsToRead() != 0) {
1354  FUShmRawCell* newCell = shmBuffer_->rawCellToRead();
1355  cout << "lastResort: " << shmBuffer_->nbRawCellsToRead()
1356  << endl;
1357  shmBuffer_->scheduleRawEmptyCellForDiscardServerSide(newCell);
1358  cout << "lastResort: schedule raw cell for discard" << endl;
1359  }
1360  */
1361 }
UInt_t FUResourceQueue::nbClients ( ) const
virtual

Implements evf::IPCMethod.

Definition at line 1175 of file FUResourceQueue.cc.

References query::result.

1175  {
1176  UInt_t result(0);
1177 
1178  /*
1179  if (0 != shmBuffer_)
1180  result = shmBuffer_->nClients();
1181  */
1182  return result;
1183 }
tuple result
Definition: query.py:137
unsigned int UInt_t
Definition: FUTypes.h:12
UInt_t evf::FUResourceQueue::nbResources ( ) const
inlinevirtual

Implements evf::IPCMethod.

Definition at line 96 of file FUResourceQueue.h.

References evf::IPCMethod::resources_.

96  {
97  return resources_.size();
98  }
FUResourceVec_t resources_
Definition: IPCMethod.h:393
void FUResourceQueue::postEndOfLumiSection ( MemRef_t bufRef)
virtual

Has to be implemented by subclasses, according to IPC type.

Implements evf::IPCMethod.

Definition at line 1045 of file FUResourceQueue.cc.

1045  {
1046  /*
1047  I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME
1048  *msg =
1049  (I2O_EVM_END_OF_LUMISECTION_MESSAGE_FRAME *) bufRef->getDataLocation();
1050  //make sure to fill up the shmem so no process will miss it
1051  // but processes will have to handle duplicates
1052 
1053  for (unsigned int i = 0; i < nbRawCells_; i++)
1054  shmBuffer_->writeRawLumiSectionEvent(msg->lumiSection);
1055  */
1056 }
void FUResourceQueue::resetCounters ( )
virtual

Has to be implemented by subclasses, according to IPC type.

Implements evf::IPCMethod.

Definition at line 1144 of file FUResourceQueue.cc.

References evf::IPCMethod::nbAllocated_, evf::IPCMethod::nbAllocSent_, evf::IPCMethod::nbCompleted_, evf::IPCMethod::nbCrcErrors_, evf::IPCMethod::nbDiscarded_, evf::IPCMethod::nbErrors_, evf::IPCMethod::nbLost_, evf::IPCMethod::nbPending_, evf::IPCMethod::nbPendingSMDiscards_, evf::IPCMethod::nbPendingSMDqmDiscards_, evf::IPCMethod::nbSent_, evf::IPCMethod::nbSentDqm_, evf::IPCMethod::nbSentError_, evf::IPCMethod::sumOfSizes_, and evf::IPCMethod::sumOfSquares_.

1144  {
1145  /*
1146  if (0 != shmBuffer_) {
1147  for (UInt_t i = 0; i < shmBuffer_->nRecoCells(); i++)
1148  acceptSMDataDiscard_[i] = false;
1149  for (UInt_t i = 0; i < shmBuffer_->nDqmCells(); i++)
1150  acceptSMDqmDiscard_[i] = 0;
1151  }
1152  */
1154  nbCompleted_ = 0;
1155  nbSent_ = 0;
1156  nbSentError_ = 0;
1157  nbSentDqm_ = 0;
1160  nbDiscarded_ = 0;
1161  nbLost_ = 0;
1162 
1163  nbErrors_ = 0;
1164  nbCrcErrors_ = 0;
1165  nbAllocSent_ = 0;
1166 
1167  sumOfSquares_ = 0;
1168  sumOfSizes_ = 0;
1169  //isStopping_ = false;
1170 
1171 }
uint64_t sumOfSquares_
Definition: IPCMethod.h:381
UInt_t nbErrors_
Definition: IPCMethod.h:377
UInt_t nbDiscarded_
Definition: IPCMethod.h:367
UInt_t nbSentDqm_
Definition: IPCMethod.h:364
UInt_t nbPendingSMDiscards_
Definition: IPCMethod.h:365
UInt_t nbCompleted_
Definition: IPCMethod.h:361
UInt_t nbPending_
Definition: IPCMethod.h:360
UInt_t nbSentError_
Definition: IPCMethod.h:363
UInt_t nbCrcErrors_
Definition: IPCMethod.h:378
UInt_t nbLost_
Definition: IPCMethod.h:368
std::atomic< int > nbPendingSMDqmDiscards_
Definition: IPCMethod.h:366
UInt_t nbAllocSent_
Definition: IPCMethod.h:379
UInt_t sumOfSizes_
Definition: IPCMethod.h:382
UInt_t nbSent_
Definition: IPCMethod.h:362
UInt_t nbAllocated_
Definition: IPCMethod.h:359
void evf::FUResourceQueue::resetIPC ( )
inlinevirtual

resets the underlying IPC method to the initial state

Implements evf::IPCMethod.

Definition at line 114 of file FUResourceQueue.h.

114  {
115  // nothing here
116  }
bool FUResourceQueue::sendData ( )
virtual

Has to be implemented by subclasses, according to IPC type.

Implements evf::IPCMethod.

Definition at line 112 of file FUResourceQueue.cc.

References gather_cfg::cout, alignCSCRings::e, evf::FUShmRecoCell::eventSize(), evf::FUShmRecoCell::evtNumber(), edm::hlt::Exception, evf::FUShmRecoCell::fuGuid(), evf::FUShmRecoCell::fuProcessId(), evf::FUShmRecoCell::index(), evf::IPCMethod::lock(), evf::IPCMethod::log_, msq_, evf::IPCMethod::nbPendingSMDiscards_, evf::IPCMethod::nbSent(), evf::IPCMethod::nbSent_, evf::IPCMethod::nbSentError_, evf::FUShmRecoCell::nExpectedEPs(), evf::FUShmRecoCell::outModId(), evf::FUShmRecoCell::payloadAddr(), evf::FUShmRecoCell::rawCellIndex(), evf::MasterQueue::rcvQuiet(), evf::RECO_MESSAGE_TYPE, evf::RecoMsgBuf::recoCell(), recoCellSize_, evf::IPCMethod::resources_, evf::FUShmRecoCell::runNumber(), evf::IPCMethod::runNumber_, evf::IPCMethod::sendDataEvent(), evf::IPCMethod::sendErrorEvent(), evf::IPCMethod::sendInitMessage(), stor::utils::sleep(), evf::FUShmRecoCell::type(), and evf::IPCMethod::unlock().

112  {
113  bool reschedule = true;
114 
115  //FUShmRecoCell* cell = shmBuffer_->recoCellToRead();
117 
118  bool rcvSuccess = msq_.rcvQuiet(recoMsg);
119  if (!rcvSuccess) {
120  cout << "RCV failed!" << endl;
121  ::sleep(5);
122  return reschedule;
123  }
124 
125  FUShmRecoCell* cell = recoMsg.recoCell();
126 
127  // event size 0 -> stop
128 
129  if (0 == cell->eventSize()) {
130  LOG4CPLUS_INFO(log_, "Don't reschedule sendData workloop.");
131  //UInt_t cellIndex = cell->index();
132  /*
133  shmBuffer_->finishReadingRecoCell(cell);
134  shmBuffer_->discardRecoCell(cellIndex);
135  */
136  reschedule = false;
137 
138  // halting
139  } else if (/*isHalting_*/false) {
140  LOG4CPLUS_INFO(log_, "sendData: isHalting, discard recoCell.");
141  //UInt_t cellIndex = cell->index();
142  /*
143  shmBuffer_->finishReadingRecoCell(cell);
144  shmBuffer_->discardRecoCell(cellIndex);
145  */
146 
147  } else {
148  try {
149  //init message
150  if (cell->type() == 0) {
151  UInt_t cellIndex = cell->index();
152  UInt_t cellOutModId = cell->outModId();
153  UInt_t cellFUProcId = cell->fuProcessId();
154  UInt_t cellFUGuid = cell->fuGuid();
155  UChar_t* cellPayloadAddr = cell->payloadAddr();
156  UInt_t cellEventSize = cell->eventSize();
157  UInt_t cellExpectedEPs = cell->nExpectedEPs();
158  //shmBuffer_->finishReadingRecoCell(cell);
159 
160  lock();
162  unlock();
163 
164  sendInitMessage(cellIndex, cellOutModId, cellFUProcId,
165  cellFUGuid, cellPayloadAddr, cellEventSize,
166  cellExpectedEPs);
167 
168  //
169  // DATA event message
170  //
171  } else if (cell->type() == 1) {
172  UInt_t cellIndex = cell->index();
173  UInt_t cellRawIndex = cell->rawCellIndex();
174  UInt_t cellRunNumber = cell->runNumber();
175  UInt_t cellEvtNumber = cell->evtNumber();
176  UInt_t cellOutModId = cell->outModId();
177  UInt_t cellFUProcId = cell->fuProcessId();
178  UInt_t cellFUGuid = cell->fuGuid();
179  UChar_t *cellPayloadAddr = cell->payloadAddr();
180  UInt_t cellEventSize = cell->eventSize();
181  //shmBuffer_->finishReadingRecoCell(cell);
182  lock();
184  resources_[cellRawIndex]->incNbSent();
185  if (resources_[cellRawIndex]->nbSent() == 1)
186  nbSent_++;
187  unlock();
188 
189  sendDataEvent(cellIndex, cellRunNumber, cellEvtNumber,
190  cellOutModId, cellFUProcId, cellFUGuid,
191  cellPayloadAddr, cellEventSize);
192  //
193  // ERROR event message
194  //
195  } else if (cell->type() == 2) {
196  UInt_t cellIndex = cell->index();
197  UInt_t cellRawIndex = cell->rawCellIndex();
198  //UInt_t cellRunNumber = cell->runNumber();
199  UInt_t cellEvtNumber = cell->evtNumber();
200  UInt_t cellFUProcId = cell->fuProcessId();
201  UInt_t cellFUGuid = cell->fuGuid();
202  UChar_t *cellPayloadAddr = cell->payloadAddr();
203  UInt_t cellEventSize = cell->eventSize();
204  //shmBuffer_->finishReadingRecoCell(cell);
205 
206  lock();
208  resources_[cellRawIndex]->incNbSent();
209  if (resources_[cellRawIndex]->nbSent() == 1) {
210  nbSent_++;
211  nbSentError_++;
212  }
213  unlock();
214 
215  sendErrorEvent(cellIndex, runNumber_, cellEvtNumber,
216  cellFUProcId, cellFUGuid, cellPayloadAddr,
217  cellEventSize);
218  } else {
219  string errmsg =
220  "Unknown RecoCell type (neither INIT/DATA/ERROR).";
221  XCEPT_RAISE(evf::Exception, errmsg);
222  }
223  } catch (xcept::Exception& e) {
224  LOG4CPLUS_FATAL(
225  log_,
226  "Failed to send EVENT DATA to StorageManager: "
227  << xcept::stdformat_exception_history(e));
228  reschedule = false;
229  }
230  }
231 
232  return reschedule;
233 }
unsigned int index() const
Definition: FUShmRecoCell.h:22
unsigned int type() const
Definition: FUShmRecoCell.h:29
unsigned int runNumber() const
Definition: FUShmRecoCell.h:24
unsigned int fuGuid() const
Definition: FUShmRecoCell.h:28
unsigned int eventSize() const
Definition: FUShmRecoCell.h:33
log4cplus::Logger log_
Definition: IPCMethod.h:342
unsigned int fuProcessId() const
Definition: FUShmRecoCell.h:27
void sleep(Duration_t)
Definition: Utils.h:163
unsigned char * payloadAddr() const
unsigned int evtNumber() const
Definition: FUShmRecoCell.h:25
UInt_t nbPendingSMDiscards_
Definition: IPCMethod.h:365
unsigned int nExpectedEPs() const
Definition: FUShmRecoCell.h:34
unsigned int rawCellIndex() const
Definition: FUShmRecoCell.h:23
unsigned char UChar_t
Definition: FUTypes.h:14
UInt_t nbSentError_
Definition: IPCMethod.h:363
FUResourceVec_t resources_
Definition: IPCMethod.h:393
static const unsigned int RECO_MESSAGE_TYPE
Definition: msq_constants.h:15
UInt_t runNumber_
Definition: IPCMethod.h:384
unsigned int UInt_t
Definition: FUTypes.h:12
void sendErrorEvent(UInt_t fuResourceId, UInt_t runNumber, UInt_t evtNumber, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize)
Definition: IPCMethod.cc:186
unsigned int outModId() const
Definition: FUShmRecoCell.h:26
bool rcvQuiet(MsgBuf &ptr)
Definition: MasterQueue.cc:74
UInt_t nbSent_
Definition: IPCMethod.h:362
void sendDataEvent(UInt_t fuResourceId, UInt_t runNumber, UInt_t evtNumber, UInt_t outModId, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize)
Definition: IPCMethod.cc:171
void unlock()
Definition: IPCMethod.h:314
tuple cout
Definition: gather_cfg.py:121
UInt_t nbSent() const
Definition: IPCMethod.h:216
void sendInitMessage(UInt_t fuResourceId, UInt_t outModId, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize, UInt_t nExpectedEPs)
Definition: IPCMethod.cc:156
bool FUResourceQueue::sendDataWhileHalting ( )
virtual

Has to be implemented by subclasses, according to IPC type.

Implements evf::IPCMethod.

Definition at line 236 of file FUResourceQueue.cc.

References gather_cfg::cout, alignCSCRings::e, evf::FUShmRecoCell::eventSize(), evf::FUShmRecoCell::evtNumber(), edm::hlt::Exception, evf::FUShmRecoCell::fuGuid(), evf::FUShmRecoCell::fuProcessId(), evf::FUShmRecoCell::index(), evf::IPCMethod::lock(), evf::IPCMethod::log_, msq_, evf::IPCMethod::nbPendingSMDiscards_, evf::IPCMethod::nbSent(), evf::IPCMethod::nbSent_, evf::IPCMethod::nbSentError_, evf::FUShmRecoCell::nExpectedEPs(), evf::FUShmRecoCell::outModId(), evf::FUShmRecoCell::payloadAddr(), evf::FUShmRecoCell::rawCellIndex(), evf::MasterQueue::rcvQuiet(), evf::RECO_MESSAGE_TYPE, evf::RecoMsgBuf::recoCell(), recoCellSize_, evf::IPCMethod::resources_, evf::FUShmRecoCell::runNumber(), evf::IPCMethod::runNumber_, evf::IPCMethod::sendDataEvent(), evf::IPCMethod::sendErrorEvent(), evf::IPCMethod::sendInitMessage(), stor::utils::sleep(), evf::FUShmRecoCell::type(), and evf::IPCMethod::unlock().

236  {
237  bool reschedule = true;
238 
239  //FUShmRecoCell* cell = shmBuffer_->recoCellToRead();
241 
242  bool rcvSuccess = msq_.rcvQuiet(recoMsg);
243  if (!rcvSuccess) {
244  cout << "RCV failed!" << endl;
245  ::sleep(5);
246  return reschedule;
247  }
248 
249  FUShmRecoCell* cell = recoMsg.recoCell();
250 
251  // event size 0 -> stop
252 
253  if (0 == cell->eventSize()) {
254  LOG4CPLUS_INFO(log_, "Don't reschedule sendData workloop.");
255  //UInt_t cellIndex = cell->index();
256  /*
257  shmBuffer_->finishReadingRecoCell(cell);
258  shmBuffer_->discardRecoCell(cellIndex);
259  */
260  reschedule = false;
261 
262  // halting
263  } else if (/*isHalting_*/true) {
264  LOG4CPLUS_INFO(log_, "sendData: isHalting, discard recoCell.");
265  //UInt_t cellIndex = cell->index();
266  /*
267  shmBuffer_->finishReadingRecoCell(cell);
268  shmBuffer_->discardRecoCell(cellIndex);
269  */
270 
271  } else {
272  try {
273  //init message
274  if (cell->type() == 0) {
275  UInt_t cellIndex = cell->index();
276  UInt_t cellOutModId = cell->outModId();
277  UInt_t cellFUProcId = cell->fuProcessId();
278  UInt_t cellFUGuid = cell->fuGuid();
279  UChar_t* cellPayloadAddr = cell->payloadAddr();
280  UInt_t cellEventSize = cell->eventSize();
281  UInt_t cellExpectedEPs = cell->nExpectedEPs();
282  //shmBuffer_->finishReadingRecoCell(cell);
283 
284  lock();
286  unlock();
287 
288  sendInitMessage(cellIndex, cellOutModId, cellFUProcId,
289  cellFUGuid, cellPayloadAddr, cellEventSize,
290  cellExpectedEPs);
291 
292  //
293  // DATA event message
294  //
295  } else if (cell->type() == 1) {
296  UInt_t cellIndex = cell->index();
297  UInt_t cellRawIndex = cell->rawCellIndex();
298  UInt_t cellRunNumber = cell->runNumber();
299  UInt_t cellEvtNumber = cell->evtNumber();
300  UInt_t cellOutModId = cell->outModId();
301  UInt_t cellFUProcId = cell->fuProcessId();
302  UInt_t cellFUGuid = cell->fuGuid();
303  UChar_t *cellPayloadAddr = cell->payloadAddr();
304  UInt_t cellEventSize = cell->eventSize();
305 
306  //shmBuffer_->finishReadingRecoCell(cell);
307  lock();
309  resources_[cellRawIndex]->incNbSent();
310  if (resources_[cellRawIndex]->nbSent() == 1)
311  nbSent_++;
312  unlock();
313 
314  sendDataEvent(cellIndex, cellRunNumber, cellEvtNumber,
315  cellOutModId, cellFUProcId, cellFUGuid,
316  cellPayloadAddr, cellEventSize);
317  //
318  // ERROR event message
319  //
320  } else if (cell->type() == 2) {
321  UInt_t cellIndex = cell->index();
322  UInt_t cellRawIndex = cell->rawCellIndex();
323  //UInt_t cellRunNumber = cell->runNumber();
324  UInt_t cellEvtNumber = cell->evtNumber();
325  UInt_t cellFUProcId = cell->fuProcessId();
326  UInt_t cellFUGuid = cell->fuGuid();
327  UChar_t *cellPayloadAddr = cell->payloadAddr();
328  UInt_t cellEventSize = cell->eventSize();
329  //shmBuffer_->finishReadingRecoCell(cell);
330 
331  lock();
333  resources_[cellRawIndex]->incNbSent();
334  if (resources_[cellRawIndex]->nbSent() == 1) {
335  nbSent_++;
336  nbSentError_++;
337  }
338  unlock();
339 
340  sendErrorEvent(cellIndex, runNumber_, cellEvtNumber,
341  cellFUProcId, cellFUGuid, cellPayloadAddr,
342  cellEventSize);
343  } else {
344  string errmsg =
345  "Unknown RecoCell type (neither INIT/DATA/ERROR).";
346  XCEPT_RAISE(evf::Exception, errmsg);
347  }
348  } catch (xcept::Exception& e) {
349  LOG4CPLUS_FATAL(
350  log_,
351  "Failed to send EVENT DATA to StorageManager: "
352  << xcept::stdformat_exception_history(e));
353  reschedule = false;
354  }
355  }
356 
357  return reschedule;
358 }
unsigned int index() const
Definition: FUShmRecoCell.h:22
unsigned int type() const
Definition: FUShmRecoCell.h:29
unsigned int runNumber() const
Definition: FUShmRecoCell.h:24
unsigned int fuGuid() const
Definition: FUShmRecoCell.h:28
unsigned int eventSize() const
Definition: FUShmRecoCell.h:33
log4cplus::Logger log_
Definition: IPCMethod.h:342
unsigned int fuProcessId() const
Definition: FUShmRecoCell.h:27
void sleep(Duration_t)
Definition: Utils.h:163
unsigned char * payloadAddr() const
unsigned int evtNumber() const
Definition: FUShmRecoCell.h:25
UInt_t nbPendingSMDiscards_
Definition: IPCMethod.h:365
unsigned int nExpectedEPs() const
Definition: FUShmRecoCell.h:34
unsigned int rawCellIndex() const
Definition: FUShmRecoCell.h:23
unsigned char UChar_t
Definition: FUTypes.h:14
UInt_t nbSentError_
Definition: IPCMethod.h:363
FUResourceVec_t resources_
Definition: IPCMethod.h:393
static const unsigned int RECO_MESSAGE_TYPE
Definition: msq_constants.h:15
UInt_t runNumber_
Definition: IPCMethod.h:384
unsigned int UInt_t
Definition: FUTypes.h:12
void sendErrorEvent(UInt_t fuResourceId, UInt_t runNumber, UInt_t evtNumber, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize)
Definition: IPCMethod.cc:186
unsigned int outModId() const
Definition: FUShmRecoCell.h:26
bool rcvQuiet(MsgBuf &ptr)
Definition: MasterQueue.cc:74
UInt_t nbSent_
Definition: IPCMethod.h:362
void sendDataEvent(UInt_t fuResourceId, UInt_t runNumber, UInt_t evtNumber, UInt_t outModId, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize)
Definition: IPCMethod.cc:171
void unlock()
Definition: IPCMethod.h:314
tuple cout
Definition: gather_cfg.py:121
UInt_t nbSent() const
Definition: IPCMethod.h:216
void sendInitMessage(UInt_t fuResourceId, UInt_t outModId, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize, UInt_t nExpectedEPs)
Definition: IPCMethod.cc:156
bool FUResourceQueue::sendDqm ( )
virtual

Has to be implemented by subclasses, according to IPC type.

Implements evf::IPCMethod.

Definition at line 362 of file FUResourceQueue.cc.

References gather_cfg::cout, evf::DQM_MESSAGE_TYPE, evf::DQMMsgBuf::dqmCell(), dqmCellSize_, alignCSCRings::e, evf::FUShmDqmCell::eventSize(), evf::FUShmDqmCell::evtAtUpdate(), edm::hlt::Exception, evf::FUShmDqmCell::folderId(), evf::FUShmDqmCell::fuGuid(), evf::FUShmDqmCell::fuProcessId(), evf::FUShmDqmCell::index(), evf::IPCMethod::log_, msq_, evf::FUShmDqmCell::payloadAddr(), evf::MasterQueue::rcvQuiet(), evf::FUShmDqmCell::runNumber(), evf::IPCMethod::sendDqmEvent(), and stor::utils::sleep().

362  {
363  bool reschedule = true;
364 
365  //FUShmDqmCell* cell = shmBuffer_->dqmCellToRead();
366  //dqm::State_t state = shmBuffer_->dqmState(cell->index());
367 
369 
370  bool rcvSuccess = msq_.rcvQuiet(dqmMsg);
371  if (!rcvSuccess) {
372  cout << "RCV failed!" << endl;
373  ::sleep(5);
374  return reschedule;
375  }
376  FUShmDqmCell* cell = dqmMsg.dqmCell();
377 
378  // concept add stop messages (there is no more "state")
379  //if (state == dqm::EMPTY) {
380  if (false) {
381  LOG4CPLUS_WARN(log_, "Don't reschedule sendDqm workloop.");
382  cout << "shut down dqm workloop " << endl;
383  //UInt_t cellIndex = cell->index();
384  /*
385  shmBuffer_->finishReadingDqmCell(cell);
386  shmBuffer_->discardDqmCell(cellIndex);
387  */
388  reschedule = false;
389  } else if (/*isHalting_*/false) {
390  //UInt_t cellIndex = cell->index();
391  /*
392  shmBuffer_->finishReadingDqmCell(cell);
393  shmBuffer_->discardDqmCell(cellIndex);
394  */
395  } else {
396  try {
397  UInt_t cellIndex = cell->index();
398  UInt_t cellRunNumber = cell->runNumber();
399  UInt_t cellEvtAtUpdate = cell->evtAtUpdate();
400  UInt_t cellFolderId = cell->folderId();
401  UInt_t cellFUProcId = cell->fuProcessId();
402  UInt_t cellFUGuid = cell->fuGuid();
403  UChar_t *cellPayloadAddr = cell->payloadAddr();
404  UInt_t cellEventSize = cell->eventSize();
405  sendDqmEvent(cellIndex, cellRunNumber, cellEvtAtUpdate,
406  cellFolderId, cellFUProcId, cellFUGuid, cellPayloadAddr,
407  cellEventSize);
408  //shmBuffer_->finishReadingDqmCell(cell);
409  } catch (xcept::Exception& e) {
410  LOG4CPLUS_FATAL(
411  log_,
412  "Failed to send DQM DATA to StorageManager: "
413  << xcept::stdformat_exception_history(e));
414  reschedule = false;
415  }
416  }
417 
418  return reschedule;
419 }
unsigned int fuGuid() const
Definition: FUShmDqmCell.h:27
unsigned int index() const
Definition: FUShmDqmCell.h:22
unsigned int folderId() const
Definition: FUShmDqmCell.h:25
unsigned int evtAtUpdate() const
Definition: FUShmDqmCell.h:24
log4cplus::Logger log_
Definition: IPCMethod.h:342
void sleep(Duration_t)
Definition: Utils.h:163
unsigned int fuProcessId() const
Definition: FUShmDqmCell.h:26
unsigned int runNumber() const
Definition: FUShmDqmCell.h:23
unsigned char UChar_t
Definition: FUTypes.h:14
void sendDqmEvent(UInt_t fuDqmId, UInt_t runNumber, UInt_t evtAtUpdate, UInt_t folderId, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize)
Definition: IPCMethod.cc:224
unsigned int UInt_t
Definition: FUTypes.h:12
unsigned char * payloadAddr() const
Definition: FUShmDqmCell.cc:54
unsigned int eventSize() const
Definition: FUShmDqmCell.h:31
bool rcvQuiet(MsgBuf &ptr)
Definition: MasterQueue.cc:74
tuple cout
Definition: gather_cfg.py:121
static const unsigned int DQM_MESSAGE_TYPE
Definition: msq_constants.h:16
bool FUResourceQueue::sendDqmWhileHalting ( )
virtual

Has to be implemented by subclasses, according to IPC type.

Implements evf::IPCMethod.

Definition at line 422 of file FUResourceQueue.cc.

References gather_cfg::cout, evf::DQM_MESSAGE_TYPE, evf::DQMMsgBuf::dqmCell(), dqmCellSize_, alignCSCRings::e, evf::FUShmDqmCell::eventSize(), evf::FUShmDqmCell::evtAtUpdate(), edm::hlt::Exception, evf::FUShmDqmCell::folderId(), evf::FUShmDqmCell::fuGuid(), evf::FUShmDqmCell::fuProcessId(), evf::FUShmDqmCell::index(), evf::IPCMethod::log_, msq_, evf::FUShmDqmCell::payloadAddr(), evf::MasterQueue::rcvQuiet(), evf::FUShmDqmCell::runNumber(), evf::IPCMethod::sendDqmEvent(), and stor::utils::sleep().

422  {
423  bool reschedule = true;
424 
425  //FUShmDqmCell* cell = shmBuffer_->dqmCellToRead();
426  //dqm::State_t state = shmBuffer_->dqmState(cell->index());
427 
429 
430  bool rcvSuccess = msq_.rcvQuiet(dqmMsg);
431  if (!rcvSuccess) {
432  cout << "RCV failed!" << endl;
433  ::sleep(5);
434  return reschedule;
435  }
436  FUShmDqmCell* cell = dqmMsg.dqmCell();
437 
438  // concept add stop messages (there is no more "state")
439  //if (state == dqm::EMPTY) {
440  if (false) {
441  LOG4CPLUS_WARN(log_, "Don't reschedule sendDqm workloop.");
442  cout << "shut down dqm workloop " << endl;
443  //UInt_t cellIndex = cell->index();
444  /*
445  shmBuffer_->finishReadingDqmCell(cell);
446  shmBuffer_->discardDqmCell(cellIndex);
447  */
448  reschedule = false;
449  } else if (/*isHalting_*/true) {
450  //UInt_t cellIndex = cell->index();
451  /*
452  shmBuffer_->finishReadingDqmCell(cell);
453  shmBuffer_->discardDqmCell(cellIndex);
454  */
455  } else {
456  try {
457  UInt_t cellIndex = cell->index();
458  UInt_t cellRunNumber = cell->runNumber();
459  UInt_t cellEvtAtUpdate = cell->evtAtUpdate();
460  UInt_t cellFolderId = cell->folderId();
461  UInt_t cellFUProcId = cell->fuProcessId();
462  UInt_t cellFUGuid = cell->fuGuid();
463  UChar_t *cellPayloadAddr = cell->payloadAddr();
464  UInt_t cellEventSize = cell->eventSize();
465  sendDqmEvent(cellIndex, cellRunNumber, cellEvtAtUpdate,
466  cellFolderId, cellFUProcId, cellFUGuid, cellPayloadAddr,
467  cellEventSize);
468  //shmBuffer_->finishReadingDqmCell(cell);
469  } catch (xcept::Exception& e) {
470  LOG4CPLUS_FATAL(
471  log_,
472  "Failed to send DQM DATA to StorageManager: "
473  << xcept::stdformat_exception_history(e));
474  reschedule = false;
475  }
476  }
477 
478  return reschedule;
479 }
unsigned int fuGuid() const
Definition: FUShmDqmCell.h:27
unsigned int index() const
Definition: FUShmDqmCell.h:22
unsigned int folderId() const
Definition: FUShmDqmCell.h:25
unsigned int evtAtUpdate() const
Definition: FUShmDqmCell.h:24
log4cplus::Logger log_
Definition: IPCMethod.h:342
void sleep(Duration_t)
Definition: Utils.h:163
unsigned int fuProcessId() const
Definition: FUShmDqmCell.h:26
unsigned int runNumber() const
Definition: FUShmDqmCell.h:23
unsigned char UChar_t
Definition: FUTypes.h:14
void sendDqmEvent(UInt_t fuDqmId, UInt_t runNumber, UInt_t evtAtUpdate, UInt_t folderId, UInt_t fuProcessId, UInt_t fuGuid, UChar_t *data, UInt_t dataSize)
Definition: IPCMethod.cc:224
unsigned int UInt_t
Definition: FUTypes.h:12
unsigned char * payloadAddr() const
Definition: FUShmDqmCell.cc:54
unsigned int eventSize() const
Definition: FUShmDqmCell.h:31
bool rcvQuiet(MsgBuf &ptr)
Definition: MasterQueue.cc:74
tuple cout
Definition: gather_cfg.py:121
static const unsigned int DQM_MESSAGE_TYPE
Definition: msq_constants.h:16
void FUResourceQueue::shutDownClients ( )
virtual

Has to be implemented by subclasses, according to IPC type.

Implements evf::IPCMethod.

Definition at line 1096 of file FUResourceQueue.cc.

References evf::IPCMethod::isReadyToShutDown_.

1096  {
1097  isReadyToShutDown_ = true;
1098  /*
1099  nbClientsToShutDown_ = nbClients();
1100  isReadyToShutDown_ = false;
1101 
1102  if (nbClientsToShutDown_ == 0) {
1103  LOG4CPLUS_INFO(
1104  log_,
1105  "No clients to shut down. Checking if there are raw cells not assigned to any process yet");
1106  UInt_t n = nbResources();
1107  for (UInt_t i = 0; i < n; i++) {
1108  evt::State_t state = shmBuffer_->evtState(i);
1109  if (state != evt::EMPTY) {
1110  LOG4CPLUS_WARN(
1111  log_,
1112  "Schedule discard at STOP for orphaned event in state "
1113  << state);
1114  shmBuffer_->scheduleRawCellForDiscardServerSide(i);
1115  }
1116  }
1117  shmBuffer_->scheduleRawEmptyCellForDiscard();
1118  } else {
1119  UInt_t n = nbClientsToShutDown_;
1120  for (UInt_t i = 0; i < n; ++i)
1121  shmBuffer_->writeRawEmptyEvent();
1122  }
1123  */
1124 }
bool isReadyToShutDown_
Definition: IPCMethod.h:374

Member Data Documentation

RawCache* evf::FUResourceQueue::cache_
private

Definition at line 124 of file FUResourceQueue.h.

Referenced by buildResource().

UInt_t evf::FUResourceQueue::dqmCellSize_
private

Definition at line 127 of file FUResourceQueue.h.

Referenced by sendDqm(), and sendDqmWhileHalting().

MasterQueue evf::FUResourceQueue::msq_
private
UInt_t evf::FUResourceQueue::rawCellSize_
private

Definition at line 127 of file FUResourceQueue.h.

UInt_t evf::FUResourceQueue::recoCellSize_
private

Definition at line 127 of file FUResourceQueue.h.

Referenced by sendData(), and sendDataWhileHalting().