00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00012
00013 #include "EventFilter/ResourceBroker/interface/FUResourceQueue.h"
00014 #include "EventFilter/ResourceBroker/interface/RecoMsgBuf.h"
00015 #include "EventFilter/ResourceBroker/interface/DQMMsgBuf.h"
00016 #include "EventFilter/ResourceBroker/interface/msq_constants.h"
00017 #include "EventFilter/ShmBuffer/interface/FUShmDqmCell.h"
00018
00019 #include "EvffedFillerRB.h"
00020 #include "interface/evb/i2oEVBMsgs.h"
00021 #include "xcept/tools.h"
00022
00023 #include <fstream>
00024 #include <sstream>
00025 #include <iomanip>
00026 #include <unistd.h>
00027
00028 using std::cout;
00029 using std::endl;
00030 using std::string;
00031 using std::stringstream;
00032 using std::vector;
00033 using namespace evf;
00034
00036
00038
00039
00040 FUResourceQueue::FUResourceQueue(bool segmentationMode, UInt_t nbRawCells,
00041 UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize,
00042 UInt_t recoCellSize, UInt_t dqmCellSize, int freeResReq, BUProxy *bu,
00043 SMProxy *sm, log4cplus::Logger logger, unsigned int timeout,
00044 EvffedFillerRB *frb, xdaq::Application*app) throw (evf::Exception) :
00045 IPCMethod(segmentationMode, nbRawCells, nbRecoCells, nbDqmCells,
00046 rawCellSize, recoCellSize, dqmCellSize, freeResReq, bu, sm,
00047 logger, timeout, frb, app), msq_(99) {
00048
00049
00050 initialize(segmentationMode, nbRawCells, nbRecoCells, nbDqmCells,
00051 rawCellSize, recoCellSize, dqmCellSize);
00052
00053 }
00054
00055
00056 FUResourceQueue::~FUResourceQueue() {
00057 clear();
00058
00059
00060 if (msq_.disconnect() == 0)
00061 LOG4CPLUS_INFO(log_, "MESSAGE QUEUE SUCCESSFULLY RELEASED.");
00062
00063
00064
00065
00066
00067
00068
00069
00070 }
00071
00073
00075
00076
00077 void FUResourceQueue::initialize(bool segmentationMode, UInt_t nbRawCells,
00078 UInt_t nbRecoCells, UInt_t nbDqmCells, UInt_t rawCellSize,
00079 UInt_t recoCellSize, UInt_t dqmCellSize) throw (evf::Exception) {
00080
00081 rawCellSize_ = rawCellSize;
00082 recoCellSize_ = recoCellSize;
00083 dqmCellSize_ = dqmCellSize;
00084
00085 clear();
00086
00087 if (0 == &msq_ || 0 == msq_.id()) {
00088 string msg = "CREATION OF MESSAGE QUEUE FAILED!";
00089 LOG4CPLUS_FATAL(log_, msg);
00090 XCEPT_RAISE(evf::Exception, msg);
00091 }
00092
00093 cache_ = RawCache::getInstance();
00094 cache_->initialise(nbRawCells, rawCellSize);
00095
00096
00097 for (UInt_t i = 0; i < nbRawCells_; i++) {
00098 FUResource* newResource = new FUResource(i, log_, frb_, app_);
00099 newResource->release(false);
00100 resources_.push_back(newResource);
00101 freeResourceIds_.push(i);
00102 }
00103
00104
00105
00106
00107 resetCounters();
00108 }
00109
00110
00111
00112 bool FUResourceQueue::sendData() {
00113 bool reschedule = true;
00114
00115
00116 RecoMsgBuf recoMsg(recoCellSize_, RECO_MESSAGE_TYPE);
00117
00118 bool rcvSuccess = msq_.rcvQuiet(recoMsg);
00119 if (!rcvSuccess) {
00120 cout << "RCV failed!" << endl;
00121 ::sleep(5);
00122 return reschedule;
00123 }
00124
00125 FUShmRecoCell* cell = recoMsg.recoCell();
00126
00127
00128
00129 if (0 == cell->eventSize()) {
00130 LOG4CPLUS_INFO(log_, "Don't reschedule sendData workloop.");
00131
00132
00133
00134
00135
00136 reschedule = false;
00137
00138
00139 } else if (false) {
00140 LOG4CPLUS_INFO(log_, "sendData: isHalting, discard recoCell.");
00141
00142
00143
00144
00145
00146
00147 } else {
00148 try {
00149
00150 if (cell->type() == 0) {
00151 UInt_t cellIndex = cell->index();
00152 UInt_t cellOutModId = cell->outModId();
00153 UInt_t cellFUProcId = cell->fuProcessId();
00154 UInt_t cellFUGuid = cell->fuGuid();
00155 UChar_t* cellPayloadAddr = cell->payloadAddr();
00156 UInt_t cellEventSize = cell->eventSize();
00157 UInt_t cellExpectedEPs = cell->nExpectedEPs();
00158
00159
00160 lock();
00161 nbPendingSMDiscards_++;
00162 unlock();
00163
00164 sendInitMessage(cellIndex, cellOutModId, cellFUProcId,
00165 cellFUGuid, cellPayloadAddr, cellEventSize,
00166 cellExpectedEPs);
00167
00168
00169
00170
00171 } else if (cell->type() == 1) {
00172 UInt_t cellIndex = cell->index();
00173 UInt_t cellRawIndex = cell->rawCellIndex();
00174 UInt_t cellRunNumber = cell->runNumber();
00175 UInt_t cellEvtNumber = cell->evtNumber();
00176 UInt_t cellOutModId = cell->outModId();
00177 UInt_t cellFUProcId = cell->fuProcessId();
00178 UInt_t cellFUGuid = cell->fuGuid();
00179 UChar_t *cellPayloadAddr = cell->payloadAddr();
00180 UInt_t cellEventSize = cell->eventSize();
00181
00182 lock();
00183 nbPendingSMDiscards_++;
00184 resources_[cellRawIndex]->incNbSent();
00185 if (resources_[cellRawIndex]->nbSent() == 1)
00186 nbSent_++;
00187 unlock();
00188
00189 sendDataEvent(cellIndex, cellRunNumber, cellEvtNumber,
00190 cellOutModId, cellFUProcId, cellFUGuid,
00191 cellPayloadAddr, cellEventSize);
00192
00193
00194
00195 } else if (cell->type() == 2) {
00196 UInt_t cellIndex = cell->index();
00197 UInt_t cellRawIndex = cell->rawCellIndex();
00198
00199 UInt_t cellEvtNumber = cell->evtNumber();
00200 UInt_t cellFUProcId = cell->fuProcessId();
00201 UInt_t cellFUGuid = cell->fuGuid();
00202 UChar_t *cellPayloadAddr = cell->payloadAddr();
00203 UInt_t cellEventSize = cell->eventSize();
00204
00205
00206 lock();
00207 nbPendingSMDiscards_++;
00208 resources_[cellRawIndex]->incNbSent();
00209 if (resources_[cellRawIndex]->nbSent() == 1) {
00210 nbSent_++;
00211 nbSentError_++;
00212 }
00213 unlock();
00214
00215 sendErrorEvent(cellIndex, runNumber_, cellEvtNumber,
00216 cellFUProcId, cellFUGuid, cellPayloadAddr,
00217 cellEventSize);
00218 } else {
00219 string errmsg =
00220 "Unknown RecoCell type (neither INIT/DATA/ERROR).";
00221 XCEPT_RAISE(evf::Exception, errmsg);
00222 }
00223 } catch (xcept::Exception& e) {
00224 LOG4CPLUS_FATAL(
00225 log_,
00226 "Failed to send EVENT DATA to StorageManager: "
00227 << xcept::stdformat_exception_history(e));
00228 reschedule = false;
00229 }
00230 }
00231
00232 return reschedule;
00233 }
00234
00235
00236 bool FUResourceQueue::sendDataWhileHalting() {
00237 bool reschedule = true;
00238
00239
00240 RecoMsgBuf recoMsg(recoCellSize_, RECO_MESSAGE_TYPE);
00241
00242 bool rcvSuccess = msq_.rcvQuiet(recoMsg);
00243 if (!rcvSuccess) {
00244 cout << "RCV failed!" << endl;
00245 ::sleep(5);
00246 return reschedule;
00247 }
00248
00249 FUShmRecoCell* cell = recoMsg.recoCell();
00250
00251
00252
00253 if (0 == cell->eventSize()) {
00254 LOG4CPLUS_INFO(log_, "Don't reschedule sendData workloop.");
00255
00256
00257
00258
00259
00260 reschedule = false;
00261
00262
00263 } else if (true) {
00264 LOG4CPLUS_INFO(log_, "sendData: isHalting, discard recoCell.");
00265
00266
00267
00268
00269
00270
00271 } else {
00272 try {
00273
00274 if (cell->type() == 0) {
00275 UInt_t cellIndex = cell->index();
00276 UInt_t cellOutModId = cell->outModId();
00277 UInt_t cellFUProcId = cell->fuProcessId();
00278 UInt_t cellFUGuid = cell->fuGuid();
00279 UChar_t* cellPayloadAddr = cell->payloadAddr();
00280 UInt_t cellEventSize = cell->eventSize();
00281 UInt_t cellExpectedEPs = cell->nExpectedEPs();
00282
00283
00284 lock();
00285 nbPendingSMDiscards_++;
00286 unlock();
00287
00288 sendInitMessage(cellIndex, cellOutModId, cellFUProcId,
00289 cellFUGuid, cellPayloadAddr, cellEventSize,
00290 cellExpectedEPs);
00291
00292
00293
00294
00295 } else if (cell->type() == 1) {
00296 UInt_t cellIndex = cell->index();
00297 UInt_t cellRawIndex = cell->rawCellIndex();
00298 UInt_t cellRunNumber = cell->runNumber();
00299 UInt_t cellEvtNumber = cell->evtNumber();
00300 UInt_t cellOutModId = cell->outModId();
00301 UInt_t cellFUProcId = cell->fuProcessId();
00302 UInt_t cellFUGuid = cell->fuGuid();
00303 UChar_t *cellPayloadAddr = cell->payloadAddr();
00304 UInt_t cellEventSize = cell->eventSize();
00305
00306
00307 lock();
00308 nbPendingSMDiscards_++;
00309 resources_[cellRawIndex]->incNbSent();
00310 if (resources_[cellRawIndex]->nbSent() == 1)
00311 nbSent_++;
00312 unlock();
00313
00314 sendDataEvent(cellIndex, cellRunNumber, cellEvtNumber,
00315 cellOutModId, cellFUProcId, cellFUGuid,
00316 cellPayloadAddr, cellEventSize);
00317
00318
00319
00320 } else if (cell->type() == 2) {
00321 UInt_t cellIndex = cell->index();
00322 UInt_t cellRawIndex = cell->rawCellIndex();
00323
00324 UInt_t cellEvtNumber = cell->evtNumber();
00325 UInt_t cellFUProcId = cell->fuProcessId();
00326 UInt_t cellFUGuid = cell->fuGuid();
00327 UChar_t *cellPayloadAddr = cell->payloadAddr();
00328 UInt_t cellEventSize = cell->eventSize();
00329
00330
00331 lock();
00332 nbPendingSMDiscards_++;
00333 resources_[cellRawIndex]->incNbSent();
00334 if (resources_[cellRawIndex]->nbSent() == 1) {
00335 nbSent_++;
00336 nbSentError_++;
00337 }
00338 unlock();
00339
00340 sendErrorEvent(cellIndex, runNumber_, cellEvtNumber,
00341 cellFUProcId, cellFUGuid, cellPayloadAddr,
00342 cellEventSize);
00343 } else {
00344 string errmsg =
00345 "Unknown RecoCell type (neither INIT/DATA/ERROR).";
00346 XCEPT_RAISE(evf::Exception, errmsg);
00347 }
00348 } catch (xcept::Exception& e) {
00349 LOG4CPLUS_FATAL(
00350 log_,
00351 "Failed to send EVENT DATA to StorageManager: "
00352 << xcept::stdformat_exception_history(e));
00353 reschedule = false;
00354 }
00355 }
00356
00357 return reschedule;
00358 }
00359
00360
00361
00362 bool FUResourceQueue::sendDqm() {
00363 bool reschedule = true;
00364
00365
00366
00367
00368 DQMMsgBuf dqmMsg(dqmCellSize_, DQM_MESSAGE_TYPE);
00369
00370 bool rcvSuccess = msq_.rcvQuiet(dqmMsg);
00371 if (!rcvSuccess) {
00372 cout << "RCV failed!" << endl;
00373 ::sleep(5);
00374 return reschedule;
00375 }
00376 FUShmDqmCell* cell = dqmMsg.dqmCell();
00377
00378
00379
00380 if (false) {
00381 LOG4CPLUS_WARN(log_, "Don't reschedule sendDqm workloop.");
00382 cout << "shut down dqm workloop " << endl;
00383
00384
00385
00386
00387
00388 reschedule = false;
00389 } else if (false) {
00390
00391
00392
00393
00394
00395 } else {
00396 try {
00397 UInt_t cellIndex = cell->index();
00398 UInt_t cellRunNumber = cell->runNumber();
00399 UInt_t cellEvtAtUpdate = cell->evtAtUpdate();
00400 UInt_t cellFolderId = cell->folderId();
00401 UInt_t cellFUProcId = cell->fuProcessId();
00402 UInt_t cellFUGuid = cell->fuGuid();
00403 UChar_t *cellPayloadAddr = cell->payloadAddr();
00404 UInt_t cellEventSize = cell->eventSize();
00405 sendDqmEvent(cellIndex, cellRunNumber, cellEvtAtUpdate,
00406 cellFolderId, cellFUProcId, cellFUGuid, cellPayloadAddr,
00407 cellEventSize);
00408
00409 } catch (xcept::Exception& e) {
00410 LOG4CPLUS_FATAL(
00411 log_,
00412 "Failed to send DQM DATA to StorageManager: "
00413 << xcept::stdformat_exception_history(e));
00414 reschedule = false;
00415 }
00416 }
00417
00418 return reschedule;
00419 }
00420
00421
00422 bool FUResourceQueue::sendDqmWhileHalting() {
00423 bool reschedule = true;
00424
00425
00426
00427
00428 DQMMsgBuf dqmMsg(dqmCellSize_, DQM_MESSAGE_TYPE);
00429
00430 bool rcvSuccess = msq_.rcvQuiet(dqmMsg);
00431 if (!rcvSuccess) {
00432 cout << "RCV failed!" << endl;
00433 ::sleep(5);
00434 return reschedule;
00435 }
00436 FUShmDqmCell* cell = dqmMsg.dqmCell();
00437
00438
00439
00440 if (false) {
00441 LOG4CPLUS_WARN(log_, "Don't reschedule sendDqm workloop.");
00442 cout << "shut down dqm workloop " << endl;
00443
00444
00445
00446
00447
00448 reschedule = false;
00449 } else if (true) {
00450
00451
00452
00453
00454
00455 } else {
00456 try {
00457 UInt_t cellIndex = cell->index();
00458 UInt_t cellRunNumber = cell->runNumber();
00459 UInt_t cellEvtAtUpdate = cell->evtAtUpdate();
00460 UInt_t cellFolderId = cell->folderId();
00461 UInt_t cellFUProcId = cell->fuProcessId();
00462 UInt_t cellFUGuid = cell->fuGuid();
00463 UChar_t *cellPayloadAddr = cell->payloadAddr();
00464 UInt_t cellEventSize = cell->eventSize();
00465 sendDqmEvent(cellIndex, cellRunNumber, cellEvtAtUpdate,
00466 cellFolderId, cellFUProcId, cellFUGuid, cellPayloadAddr,
00467 cellEventSize);
00468
00469 } catch (xcept::Exception& e) {
00470 LOG4CPLUS_FATAL(
00471 log_,
00472 "Failed to send DQM DATA to StorageManager: "
00473 << xcept::stdformat_exception_history(e));
00474 reschedule = false;
00475 }
00476 }
00477
00478 return reschedule;
00479 }
00480
00481
00482
00483 bool FUResourceQueue::discard() {
00484
00485 bool reschedule = true;
00486
00487
00488
00489
00490 MsgBuf discardRaw(2 * sizeof(unsigned int), DISCARD_RAW_MESSAGE_TYPE);
00491 bool rcvSuccess = msq_.rcvQuiet(discardRaw);
00492
00493 if (!rcvSuccess) {
00494 cout << "RCV failed!" << endl;
00495 ::sleep(5);
00496 return reschedule;
00497 }
00498
00499 unsigned int* pBuID = (unsigned int*) discardRaw->mtext;
00500 unsigned int* pFuID = (unsigned int*) (discardRaw->mtext
00501 + sizeof(unsigned int));
00502
00503 unsigned int buResourceId = *pBuID;
00504 unsigned int fuResourceId = *pFuID;
00505
00506 cout << "Discard received for buResourceID: " << buResourceId
00507 << " fuResourceID " << fuResourceId << endl << endl;
00508
00509
00510
00511
00512
00513
00514
00515
00516
00517
00518
00519
00520
00521
00522
00523
00524
00525
00526
00527
00528
00529
00530
00531
00532
00533
00534
00535
00536
00537
00538
00539 if (true) {
00540
00541 resources_[fuResourceId]->release(false);
00542
00543 RawCache::getInstance()->releaseMsg(fuResourceId);
00544
00545 lock();
00546 freeResourceIds_.push(fuResourceId);
00547 assert(freeResourceIds_.size() <= resources_.size());
00548 unlock();
00549
00550 if (true) {
00551 sendDiscard(buResourceId);
00552 if (true)
00553 sendAllocate();
00554 }
00555 }
00556
00557
00558
00559
00560
00561
00562
00563
00564
00565
00566
00567
00568
00569
00570
00571
00572
00573
00574
00575
00576
00577
00578
00579
00580
00581
00582
00583
00584
00585
00586
00587
00588
00589
00590
00591
00592
00593
00594
00595
00596
00597
00598
00599
00600
00601
00602
00603
00604
00605
00606
00607
00608
00609
00610
00611
00612
00613
00614
00615
00616
00617
00618
00619
00620
00621 return reschedule;
00622 }
00623
00624
00625 bool FUResourceQueue::discardWhileHalting(bool sendDiscards) {
00626
00627 bool reschedule = true;
00628
00629
00630
00631
00632 MsgBuf discardRaw(2 * sizeof(unsigned int), DISCARD_RAW_MESSAGE_TYPE);
00633 bool rcvSuccess = msq_.rcvQuiet(discardRaw);
00634
00635 if (!rcvSuccess) {
00636 cout << "RCV failed!" << endl;
00637 ::sleep(5);
00638 return reschedule;
00639 }
00640
00641 unsigned int* pBuID = (unsigned int*) discardRaw->mtext;
00642 unsigned int* pFuID = (unsigned int*) (discardRaw->mtext
00643 + sizeof(unsigned int));
00644
00645 unsigned int buResourceId = *pBuID;
00646 unsigned int fuResourceId = *pFuID;
00647
00648 cout << "Discard received for buResourceID: " << buResourceId
00649 << " fuResourceID " << fuResourceId << endl << endl;
00650
00651
00652
00653
00654
00655
00656
00657
00658
00659
00660
00661
00662
00663
00664
00665
00666
00667
00668
00669
00670
00671
00672
00673
00674
00675
00676
00677
00678
00679
00680
00681 if (true) {
00682
00683 resources_[fuResourceId]->release(false);
00684
00685 RawCache::getInstance()->releaseMsg(fuResourceId);
00686
00687 lock();
00688 freeResourceIds_.push(fuResourceId);
00689 assert(freeResourceIds_.size() <= resources_.size());
00690 unlock();
00691
00692 if (false) {
00693 sendDiscard(buResourceId);
00694 if (false)
00695 sendAllocate();
00696 }
00697 }
00698
00699
00700
00701
00702
00703
00704
00705
00706
00707
00708
00709
00710
00711
00712
00713
00714
00715
00716
00717
00718
00719
00720
00721
00722
00723
00724
00725
00726
00727
00728
00729
00730
00731
00732
00733
00734
00735
00736
00737
00738
00739
00740
00741
00742
00743
00744
00745
00746
00747
00748
00749
00750
00751
00752
00753
00754
00755
00756
00757
00758
00759
00760
00761
00762
00763 return reschedule;
00764 }
00765
00766
00767
00768
00769
00770
00771 bool FUResourceQueue::buildResource(MemRef_t* bufRef) {
00772
00773 bool eventComplete = false;
00774
00775 I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME *block =
00776 (I2O_EVENT_DATA_BLOCK_MESSAGE_FRAME*) bufRef->getDataLocation();
00777
00778 UInt_t fuResourceId = (UInt_t) block->fuTransactionId;
00779 UInt_t buResourceId = (UInt_t) block->buResourceId;
00780
00781 FUResource* resource = resources_[fuResourceId];
00782
00783 RawMsgBuf* currentMessageToWrite = cache_->getMsgToWrite();
00784
00785 if (!resource->fatalError() && !resource->isAllocated()) {
00786 FUShmRawCell* rawCell = currentMessageToWrite->rawCell();
00787 rawCell->initialize(fuResourceId);
00788 resource->allocate(rawCell);
00789
00790 timeval now;
00791 gettimeofday(&now, 0);
00792
00793 frb_->setRBTimeStamp(
00794 ((uint64_t) (now.tv_sec) << 32) + (uint64_t) (now.tv_usec));
00795
00796 frb_->setRBEventCount(nbCompleted_);
00797
00798 if (doCrcCheck_ > 0 && 0 == nbAllocated_ % doCrcCheck_)
00799 resource->doCrcCheck(true);
00800 else
00801 resource->doCrcCheck(false);
00802 }
00803
00804
00805 if (!resource->fatalError()) {
00806 resource->process(bufRef);
00807
00808 lock();
00809 nbErrors_ += resource->nbErrors();
00810 nbCrcErrors_ += resource->nbCrcErrors();
00811 unlock();
00812
00813
00814 if (resource->isComplete()) {
00815 lock();
00816 nbCompleted_++;
00817 nbPending_--;
00818 unlock();
00819
00820
00821
00822
00823
00824
00825
00826
00827
00828
00829
00830
00831
00832 try {
00833 msq_.postLength(*currentMessageToWrite,
00834 currentMessageToWrite->usedSize());
00835 } catch (...) {
00836 string errmsg = "Failed to post message to Queue!";
00837 LOG4CPLUS_FATAL(log_, errmsg);
00838 XCEPT_RAISE(evf::Exception, errmsg);
00839
00840 }
00841
00842
00843
00844
00845
00846
00847
00848
00849
00850
00851 eventComplete = true;
00852 }
00853
00854 }
00855
00856 if (resource->fatalError()) {
00857 bool lastMsg = isLastMessageOfEvent(bufRef);
00858 if (lastMsg) {
00859 resource->release(false);
00860 lock();
00861 freeResourceIds_.push(fuResourceId);
00862 nbDiscarded_++;
00863 nbLost_++;
00864 nbPending_--;
00865 unlock();
00866 bu_->sendDiscard(buResourceId);
00867 sendAllocate();
00868 }
00869 bufRef->release();
00870 }
00871
00872 return eventComplete;
00873 }
00874
00875
00876
00877
00878 bool FUResourceQueue::discardDataEvent(MemRef_t* bufRef) {
00879
00880
00881
00882
00883
00884
00885
00886
00887
00888
00889
00890
00891
00892
00893
00894
00895
00896
00897
00898
00899
00900
00901
00902
00903
00904 return true;
00905 }
00906
00907
00908
00909 bool FUResourceQueue::discardDataEventWhileHalting(MemRef_t* bufRef) {
00910
00911
00912
00913
00914
00915
00916
00917
00918
00919
00920
00921
00922
00923
00924
00925
00926
00927
00928
00929
00930
00931
00932
00933
00934
00935 return true;
00936 }
00937
00938
00939
00940
00941 bool FUResourceQueue::discardDqmEvent(MemRef_t* bufRef) {
00942
00943
00944
00945
00946
00947
00948
00949
00950
00951
00952
00953
00954
00955
00956
00957
00958
00959
00960
00961
00962
00963
00964
00965
00966
00967
00968
00969
00970
00971
00972
00973
00974
00975
00976
00977
00978
00979
00980
00981
00982
00983
00984
00985
00986
00987
00988 return true;
00989 }
00990
00991
00992
00993 bool FUResourceQueue::discardDqmEventWhileHalting(MemRef_t* bufRef) {
00994
00995
00996
00997
00998
00999
01000
01001
01002
01003
01004
01005
01006
01007
01008
01009
01010
01011
01012
01013
01014
01015
01016
01017
01018
01019
01020
01021
01022
01023
01024
01025
01026
01027
01028
01029
01030
01031
01032
01033
01034
01035
01036
01037
01038
01039
01040 return true;
01041 }
01042
01043
01044
01045 void FUResourceQueue::postEndOfLumiSection(MemRef_t* bufRef) {
01046
01047
01048
01049
01050
01051
01052
01053
01054
01055
01056 }
01057
01058
01059
01060 void FUResourceQueue::dropEvent() {
01061
01062
01063
01064
01065
01066
01067 }
01068
01069
01070
01071 bool FUResourceQueue::handleCrashedEP(UInt_t runNumber, pid_t pid) {
01072 bool retval = false;
01073
01074
01075
01076
01077
01078
01079
01080
01081
01082
01083
01084
01085
01086
01087
01088
01089
01090
01091 return retval;
01092 }
01093
01094
01095
01096 void FUResourceQueue::shutDownClients() {
01097 isReadyToShutDown_ = true;
01098
01099
01100
01101
01102
01103
01104
01105
01106
01107
01108
01109
01110
01111
01112
01113
01114
01115
01116
01117
01118
01119
01120
01121
01122
01123
01124 }
01125
01126
01127 void FUResourceQueue::clear() {
01128 for (UInt_t i = 0; i < resources_.size(); i++) {
01129 resources_[i]->release(false);
01130 delete resources_[i];
01131 }
01132 resources_.clear();
01133
01134 while (!freeResourceIds_.empty())
01135 freeResourceIds_.pop();
01136 }
01137
01139
01141
01142
01143
01144 void FUResourceQueue::resetCounters() {
01145
01146
01147
01148
01149
01150
01151
01152
01153 nbAllocated_ = nbPending_;
01154 nbCompleted_ = 0;
01155 nbSent_ = 0;
01156 nbSentError_ = 0;
01157 nbSentDqm_ = 0;
01158 nbPendingSMDiscards_ = 0;
01159 nbPendingSMDqmDiscards_ = 0;
01160 nbDiscarded_ = 0;
01161 nbLost_ = 0;
01162
01163 nbErrors_ = 0;
01164 nbCrcErrors_ = 0;
01165 nbAllocSent_ = 0;
01166
01167 sumOfSquares_ = 0;
01168 sumOfSizes_ = 0;
01169
01170
01171 }
01172
01173
01174
01175 UInt_t FUResourceQueue::nbClients() const {
01176 UInt_t result(0);
01177
01178
01179
01180
01181
01182 return result;
01183 }
01184
01185
01186
01187 vector<pid_t> FUResourceQueue::clientPrcIds() const {
01188 vector<pid_t> result;
01189
01190
01191
01192
01193
01194
01195
01196
01197 return result;
01198 }
01199
01200
01201
01202 string FUResourceQueue::clientPrcIdsAsString() const {
01203 stringstream ss;
01204
01205
01206
01207
01208
01209
01210
01211
01212
01213
01214
01215 return ss.str();
01216 }
01217
01218
01219
01220 vector<string> FUResourceQueue::cellStates() const {
01221 vector<string> result;
01222
01223
01224
01225
01226
01227
01228
01229
01230
01231
01232
01233
01234
01235
01236
01237
01238
01239
01240
01241
01242
01243
01244
01245
01246
01247
01248
01249
01250
01251
01252
01253
01254
01255
01256
01257
01258
01259
01260 return result;
01261 }
01262
01263
01264 vector<string> FUResourceQueue::dqmCellStates() const {
01265 vector<string> result;
01266
01267
01268
01269
01270
01271
01272
01273
01274
01275
01276
01277
01278
01279
01280
01281
01282
01283
01284
01285
01286
01287
01288
01289 return result;
01290 }
01291
01292
01293
01294 vector<UInt_t> FUResourceQueue::cellEvtNumbers() const {
01295 vector<UInt_t> result;
01296
01297
01298
01299
01300
01301
01302
01303
01304
01305
01306 return result;
01307 }
01308
01309
01310
01311 vector<pid_t> FUResourceQueue::cellPrcIds() const {
01312 vector<pid_t> result;
01313
01314
01315
01316
01317
01318
01319
01320
01321
01322
01323 return result;
01324 }
01325
01326
01327
01328 vector<time_t> FUResourceQueue::cellTimeStamps() const {
01329 vector<time_t> result;
01330
01331
01332
01333
01334
01335
01336
01337
01338
01339
01340 return result;
01341 }
01342
01344
01346
01347
01348 void FUResourceQueue::lastResort() {
01349
01350
01351
01352
01353
01354
01355
01356
01357
01358
01359
01360
01361 }
01362