00001
00002
00003 #include "EventFilter/StorageManager/interface/FragmentCollector.h"
00004 #include "EventFilter/StorageManager/interface/ProgressMarker.h"
00005
00006
00007 #include "IOPool/Streamer/interface/MsgHeader.h"
00008 #include "IOPool/Streamer/interface/InitMessage.h"
00009 #include "IOPool/Streamer/interface/DQMEventMessage.h"
00010 #include "IOPool/Streamer/interface/FRDEventMessage.h"
00011
00012 #include "boost/bind.hpp"
00013
00014 #include "log4cplus/loggingmacros.h"
00015
00016 #include <algorithm>
00017 #include <utility>
00018 #include <cstdlib>
00019 #include <fstream>
00020
00021 using namespace edm;
00022 using namespace std;
00023
00024 static const bool debugme = getenv("FRAG_DEBUG")!=0;
00025 #define FR_DEBUG if(debugme) std::cerr
00026
00027
00028 namespace stor
00029 {
00030
00031
00032 struct SMFC_thread_data
00033 {
00034 SMFC_thread_data() {
00035 exception_in_thread = false;
00036 reason_for_exception = "";
00037 }
00038
00039 volatile bool exception_in_thread;
00040 std::string reason_for_exception;
00041 };
00042
00043 static SMFC_thread_data SMFragCollThread;
00044
00045 bool getSMFC_exceptionStatus() { return SMFragCollThread.exception_in_thread; }
00046 std::string getSMFC_reason4Exception() { return SMFragCollThread.reason_for_exception; }
00047
00048
00049 FragmentCollector::FragmentCollector(HLTInfo& h,Deleter d,
00050 log4cplus::Logger& applicationLogger,
00051 const string& config_str):
00052 cmd_q_(&(h.getCommandQueue())),
00053 evtbuf_q_(&(h.getEventQueue())),
00054 frag_q_(&(h.getFragmentQueue())),
00055 buffer_deleter_(d),
00056 prods_(0),
00057 info_(&h),
00058 lastStaleCheckTime_(time(0)),
00059 staleFragmentTimeout_(30),
00060 disks_(0),
00061 applicationLogger_(applicationLogger),
00062 writer_(new edm::ServiceManager(config_str)),
00063 dqmServiceManager_(new stor::DQMServiceManager())
00064 {
00065
00066
00067 event_area_.reserve(7000000);
00068 }
00069 FragmentCollector::FragmentCollector(std::auto_ptr<HLTInfo> info,Deleter d,
00070 log4cplus::Logger& applicationLogger,
00071 const string& config_str):
00072 cmd_q_(&(info.get()->getCommandQueue())),
00073 evtbuf_q_(&(info.get()->getEventQueue())),
00074 frag_q_(&(info.get()->getFragmentQueue())),
00075 buffer_deleter_(d),
00076 prods_(0),
00077 info_(info.get()),
00078 lastStaleCheckTime_(time(0)),
00079 staleFragmentTimeout_(30),
00080 disks_(0),
00081 applicationLogger_(applicationLogger),
00082 writer_(new edm::ServiceManager(config_str)),
00083 dqmServiceManager_(new stor::DQMServiceManager())
00084 {
00085
00086
00087 event_area_.reserve(7000000);
00088 }
00089
00090 FragmentCollector::~FragmentCollector()
00091 {
00092 }
00093
00094 void FragmentCollector::run(FragmentCollector* t)
00095 {
00096 try {
00097 t->processFragments();
00098 }
00099 catch(cms::Exception& e)
00100 {
00101 edm::LogError("StorageManager") << "Fatal error in FragmentCollector thread: " << "\n"
00102 << e.explainSelf() << std::endl;
00103 SMFragCollThread.exception_in_thread = true;
00104 SMFragCollThread.reason_for_exception = "Fatal error in FragmentCollector thread: \n" +
00105 e.explainSelf();
00106 }
00107
00108 }
00109
00110 void FragmentCollector::start()
00111 {
00112
00113
00114
00115
00116
00117 writer_->start();
00118 me_.reset(new boost::thread(boost::bind(FragmentCollector::run,this)));
00119 }
00120
00121 void FragmentCollector::join()
00122 {
00123 me_->join();
00124 }
00125
00126 void FragmentCollector::processFragments()
00127 {
00128
00129
00130
00131
00132
00133 bool done=false;
00134
00135 while(!done)
00136 {
00137 EventBuffer::ConsumerBuffer cb(*frag_q_);
00138 if(cb.size()==0) break;
00139 FragEntry* entry = (FragEntry*)cb.buffer();
00140 FR_DEBUG << "FragColl: " << (void*)this << " Got a buffer size="
00141 << entry->buffer_size_ << endl;
00142 switch(entry->code_)
00143 {
00144 case Header::EVENT:
00145 {
00146 FR_DEBUG << "FragColl: Got an Event" << endl;
00147 processEvent(entry);
00148 break;
00149 }
00150 case Header::DONE:
00151 {
00152
00153
00154 FR_DEBUG << "FragColl: Got a Done" << endl;
00155 done=true;
00156 break;
00157 }
00158 case Header::INIT:
00159 {
00160 FR_DEBUG << "FragColl: Got an Init" << endl;
00161 processHeader(entry);
00162 break;
00163 }
00164 case Header::DQM_EVENT:
00165 {
00166 FR_DEBUG << "FragColl: Got a DQM_Event" << endl;
00167 processDQMEvent(entry);
00168 break;
00169 }
00170 case Header::ERROR_EVENT:
00171 {
00172 FR_DEBUG << "FragColl: Got an Error_Event" << endl;
00173 processErrorEvent(entry);
00174 break;
00175 }
00176 case Header::FILE_CLOSE_REQUEST:
00177 {
00178 FR_DEBUG << "FragColl: Got a File Close Request message" << endl;
00179 writer_->closeFilesIfNeeded();
00180 break;
00181 }
00182 default:
00183 {
00184 FR_DEBUG << "FragColl: Got junk" << endl;
00185 break;
00186 }
00187 }
00188 }
00189
00190 FR_DEBUG << "FragColl: DONE!" << endl;
00191 writer_->stop();
00192 dqmServiceManager_->stop();
00193 }
00194
00195 void FragmentCollector::stop()
00196 {
00197
00198
00199
00200
00201 edm::EventBuffer::ProducerBuffer cb(*frag_q_);
00202 cb.commit();
00203 }
00204
00205 void FragmentCollector::processEvent(FragEntry* entry)
00206 {
00207 ProgressMarker::instance()->processing(true);
00208 if(entry->totalSegs_==1)
00209 {
00210 FR_DEBUG << "FragColl: Got an Event with one segment" << endl;
00211 FR_DEBUG << "FragColl: Event size " << entry->buffer_size_ << endl;
00212 FR_DEBUG << "FragColl: Event ID " << entry->id_ << endl;
00213
00214
00215 EventMsgView emsg(entry->buffer_address_);
00216 FR_DEBUG << "FragColl: writing event size " << entry->buffer_size_ << endl;
00217 writer_->manageEventMsg(emsg);
00218
00219 if (eventServer_.get() != NULL)
00220 {
00221 eventServer_->processEvent(emsg);
00222 }
00223
00224
00225 (*buffer_deleter_)(entry);
00226 return;
00227 }
00228
00229
00230 if (entry->segNumber_ < 1 || entry->segNumber_ > entry->totalSegs_)
00231 {
00232 LOG4CPLUS_ERROR(applicationLogger_,
00233 "Invalid fragment ID received for event " << entry->id_
00234 << " in run " << entry->run_ << " with output module ID of "
00235 << entry->secondaryId_ << ", FU PID = "
00236 << entry->originatorPid_ << ", FU GUID = "
00237 << entry->originatorGuid_ << ". Fragment id is "
00238 << entry->segNumber_ << ", total number of fragments is "
00239 << entry->totalSegs_ << ".");
00240 (*buffer_deleter_)(entry);
00241 return;
00242 }
00243
00244
00245
00246
00247 pair<Collection::iterator,bool> rc =
00248 fragment_area_.insert(make_pair(FragKey(entry->code_, entry->run_, entry->id_,
00249 entry->secondaryId_, entry->originatorPid_,
00250 entry->originatorGuid_),
00251 FragmentContainer()));
00252
00253
00254
00255 FragmentContainer& fragContainer = rc.first->second;
00256 std::map<int, FragEntry>& eventFragmentMap = fragContainer.fragmentMap_;
00257 pair<std::map<int, FragEntry>::iterator, bool> fragInsertResult =
00258 eventFragmentMap.insert(make_pair(entry->segNumber_, *entry));
00259 bool duplicateEntry = ! fragInsertResult.second;
00260
00261
00262
00263 if (duplicateEntry)
00264 {
00265 LOG4CPLUS_ERROR(applicationLogger_,
00266 "Duplicate fragment ID received for event " << entry->id_
00267 << " in run " << entry->run_ << " with output module ID of "
00268 << entry->secondaryId_ << ", FU PID = "
00269 << entry->originatorPid_ << ", FU GUID = "
00270 << entry->originatorGuid_ << ". Fragment id is "
00271 << entry->segNumber_ << ", total number of fragments is "
00272 << entry->totalSegs_ << ".");
00273 (*buffer_deleter_)(entry);
00274 return;
00275 }
00276
00277 else {
00278 fragContainer.lastFragmentTime_ = time(0);
00279 }
00280
00281 FR_DEBUG << "FragColl: added fragment with segment number "
00282 << entry->segNumber_ << endl;
00283
00284 if((int)eventFragmentMap.size()==entry->totalSegs_)
00285 {
00286 FR_DEBUG << "FragColl: completed an event with "
00287 << entry->totalSegs_ << " segments" << endl;
00288
00289
00290
00291
00292 int assembledSize = assembleFragments(eventFragmentMap);
00293
00294 EventMsgView emsg(&event_area_[0]);
00295 FR_DEBUG << "FragColl: writing event size " << assembledSize << endl;
00296 writer_->manageEventMsg(emsg);
00297
00298 if (eventServer_.get() != NULL)
00299 {
00300 eventServer_->processEvent(emsg);
00301 }
00302
00303
00304 fragment_area_.erase(rc.first);
00305
00306
00307 removeStaleFragments();
00308 }
00309 ProgressMarker::instance()->processing(false);
00310 }
00311
00312 void FragmentCollector::processHeader(FragEntry* entry)
00313 {
00314 ProgressMarker::instance()->processing(true);
00315 if(entry->totalSegs_==1)
00316 {
00317 FR_DEBUG << "FragColl: Got an INIT message with one segment" << endl;
00318 FR_DEBUG << "FragColl: Output Module ID " << entry->secondaryId_ << endl;
00319
00320
00321 InitMsgView msg(entry->buffer_address_);
00322 FR_DEBUG << "FragColl: writing INIT size " << entry->buffer_size_ << endl;
00323 writer_->manageInitMsg(catalog_, disks_, sourceId_, msg, *initMsgCollection_);
00324
00325 try
00326 {
00327 if (initMsgCollection_->addIfUnique(msg))
00328 {
00329
00330
00331
00332
00333
00334
00335 if (initMsgCollection_->size() > 1)
00336 {
00337 std::map< uint32, boost::shared_ptr<ConsumerPipe> > consumerTable =
00338 eventServer_->getConsumerTable();
00339 std::map< uint32, boost::shared_ptr<ConsumerPipe> >::const_iterator
00340 consumerIter;
00341 for (consumerIter = consumerTable.begin();
00342 consumerIter != consumerTable.end();
00343 ++consumerIter)
00344 {
00345 boost::shared_ptr<ConsumerPipe> consPtr = consumerIter->second;
00346
00347
00348
00349 if (! consPtr->isProxyServer())
00350 {
00351 if (consPtr->getHLTOutputSelection().empty())
00352 {
00353
00354
00355 std::string errorString;
00356 errorString.append("ERROR: The configuration for this ");
00357 errorString.append("consumer does not specify an HLT output ");
00358 errorString.append("module.\nPlease specify one of the HLT ");
00359 errorString.append("output modules listed below as the ");
00360 errorString.append("SelectHLTOutput parameter ");
00361 errorString.append("in the InputSource configuration.\n");
00362 errorString.append(initMsgCollection_->getSelectionHelpString());
00363 errorString.append("\n");
00364 consPtr->setRegistryWarning(errorString);
00365 }
00366 }
00367 }
00368 }
00369 }
00370 }
00371 catch(cms::Exception& excpt)
00372 {
00373 char tidString[32];
00374 sprintf(tidString, "%d", entry->hltTid_);
00375 std::string logMsg = "receiveRegistryMessage: Error processing a ";
00376 logMsg.append("registry message from URL ");
00377 logMsg.append(entry->hltURL_);
00378 logMsg.append(" and Tid ");
00379 logMsg.append(tidString);
00380 logMsg.append(":\n");
00381 logMsg.append(excpt.what());
00382 logMsg.append("\n");
00383 logMsg.append(initMsgCollection_->getSelectionHelpString());
00384 FDEBUG(9) << logMsg << std::endl;
00385 LOG4CPLUS_ERROR(applicationLogger_, logMsg);
00386
00387 throw excpt;
00388 }
00389
00390 smRBSenderList_->registerDataSender(&entry->hltURL_[0], &entry->hltClassName_[0],
00391 entry->hltLocalId_, entry->hltInstance_, entry->hltTid_,
00392 entry->segNumber_, entry->totalSegs_, msg.size(),
00393 msg.outputModuleLabel(), msg.outputModuleId(),
00394 entry->rbBufferID_);
00395
00396
00397 (*buffer_deleter_)(entry);
00398 return;
00399 }
00400
00401
00402 if (entry->segNumber_ < 1 || entry->segNumber_ > entry->totalSegs_)
00403 {
00404 LOG4CPLUS_ERROR(applicationLogger_,
00405 "Invalid fragment ID received for INIT " << entry->id_
00406 << " in run " << entry->run_ << " with output module ID of "
00407 << entry->secondaryId_ << ", FU PID = "
00408 << entry->originatorPid_ << ", FU GUID = "
00409 << entry->originatorGuid_ << ". Fragment id is "
00410 << entry->segNumber_ << ", total number of fragments is "
00411 << entry->totalSegs_ << ".");
00412 (*buffer_deleter_)(entry);
00413 return;
00414 }
00415
00416
00417
00418
00419 pair<Collection::iterator,bool> rc =
00420 fragment_area_.insert(make_pair(FragKey(entry->code_, entry->run_, entry->id_,
00421 entry->secondaryId_, entry->originatorPid_,
00422 entry->originatorGuid_),
00423 FragmentContainer()));
00424
00425
00426
00427 FragmentContainer& fragContainer = rc.first->second;
00428 std::map<int, FragEntry>& eventFragmentMap = fragContainer.fragmentMap_;
00429 pair<std::map<int, FragEntry>::iterator, bool> fragInsertResult =
00430 eventFragmentMap.insert(make_pair(entry->segNumber_, *entry));
00431 bool duplicateEntry = ! fragInsertResult.second;
00432
00433
00434
00435 if (duplicateEntry)
00436 {
00437 LOG4CPLUS_ERROR(applicationLogger_,
00438 "Duplicate fragment ID received for INIT " << entry->id_
00439 << " in run " << entry->run_ << " with output module ID of "
00440 << entry->secondaryId_ << ", FU PID = "
00441 << entry->originatorPid_ << ", FU GUID = "
00442 << entry->originatorGuid_ << ". Fragment id is "
00443 << entry->segNumber_ << ", total number of fragments is "
00444 << entry->totalSegs_ << ".");
00445 (*buffer_deleter_)(entry);
00446 return;
00447 }
00448
00449 else {
00450 fragContainer.lastFragmentTime_ = time(0);
00451 }
00452
00453 FR_DEBUG << "FragColl: added INIT fragment with segment number "
00454 << entry->segNumber_ << endl;
00455
00456 if((int)eventFragmentMap.size()==entry->totalSegs_)
00457 {
00458 FR_DEBUG << "FragColl: completed an INIT message with "
00459 << entry->totalSegs_ << " segments" << endl;
00460
00461
00462
00463
00464 int assembledSize = assembleFragments(eventFragmentMap);
00465
00466 InitMsgView msg(&event_area_[0]);
00467 FR_DEBUG << "FragColl: writing INIT size " << assembledSize << endl;
00468 writer_->manageInitMsg(catalog_, disks_, sourceId_, msg, *initMsgCollection_);
00469
00470 try
00471 {
00472 if (initMsgCollection_->addIfUnique(msg))
00473 {
00474
00475
00476
00477
00478
00479
00480 if (initMsgCollection_->size() > 1)
00481 {
00482 std::map< uint32, boost::shared_ptr<ConsumerPipe> > consumerTable =
00483 eventServer_->getConsumerTable();
00484 std::map< uint32, boost::shared_ptr<ConsumerPipe> >::const_iterator
00485 consumerIter;
00486 for (consumerIter = consumerTable.begin();
00487 consumerIter != consumerTable.end();
00488 ++consumerIter)
00489 {
00490 boost::shared_ptr<ConsumerPipe> consPtr = consumerIter->second;
00491
00492
00493
00494 if (! consPtr->isProxyServer())
00495 {
00496 if (consPtr->getHLTOutputSelection().empty())
00497 {
00498
00499
00500 std::string errorString;
00501 errorString.append("ERROR: The configuration for this ");
00502 errorString.append("consumer does not specify an HLT output ");
00503 errorString.append("module.\nPlease specify one of the HLT ");
00504 errorString.append("output modules listed below as the ");
00505 errorString.append("SelectHLTOutput parameter ");
00506 errorString.append("in the InputSource configuration.\n");
00507 errorString.append(initMsgCollection_->getSelectionHelpString());
00508 errorString.append("\n");
00509 consPtr->setRegistryWarning(errorString);
00510 }
00511 }
00512 }
00513 }
00514 }
00515 }
00516 catch(cms::Exception& excpt)
00517 {
00518 char tidString[32];
00519 sprintf(tidString, "%d", entry->hltTid_);
00520 std::string logMsg = "receiveRegistryMessage: Error processing a ";
00521 logMsg.append("registry message from URL ");
00522 logMsg.append(entry->hltURL_);
00523 logMsg.append(" and Tid ");
00524 logMsg.append(tidString);
00525 logMsg.append(":\n");
00526 logMsg.append(excpt.what());
00527 logMsg.append("\n");
00528 logMsg.append(initMsgCollection_->getSelectionHelpString());
00529 FDEBUG(9) << logMsg << std::endl;
00530 LOG4CPLUS_ERROR(applicationLogger_, logMsg);
00531
00532 throw excpt;
00533 }
00534
00535 smRBSenderList_->registerDataSender(&entry->hltURL_[0], &entry->hltClassName_[0],
00536 entry->hltLocalId_, entry->hltInstance_, entry->hltTid_,
00537 entry->segNumber_, entry->totalSegs_, msg.size(),
00538 msg.outputModuleLabel(), msg.outputModuleId(),
00539 entry->rbBufferID_);
00540
00541
00542 fragment_area_.erase(rc.first);
00543
00544
00545 removeStaleFragments();
00546 }
00547 ProgressMarker::instance()->processing(false);
00548 }
00549
00550 void FragmentCollector::processDQMEvent(FragEntry* entry)
00551 {
00552 ProgressMarker::instance()->processing(true);
00553 if(entry->totalSegs_==1)
00554 {
00555 FR_DEBUG << "FragColl: Got a DQM_Event with one segment" << endl;
00556 FR_DEBUG << "FragColl: DQM_Event size " << entry->buffer_size_ << endl;
00557 FR_DEBUG << "FragColl: DQM_Event ID " << entry->id_ << endl;
00558 FR_DEBUG << "FragColl: DQM_Event folderID " << entry->secondaryId_ << endl;
00559
00560 DQMEventMsgView dqmEventView(entry->buffer_address_);
00561 dqmServiceManager_->manageDQMEventMsg(dqmEventView);
00562 (*buffer_deleter_)(entry);
00563 return;
00564 }
00565
00566
00567 if (entry->segNumber_ < 1 || entry->segNumber_ > entry->totalSegs_)
00568 {
00569 LOG4CPLUS_ERROR(applicationLogger_,
00570 "Invalid fragment ID received for DQM event " << entry->id_
00571 << " in run " << entry->run_ << " with folder ID of "
00572 << entry->secondaryId_ << ", FU PID = "
00573 << entry->originatorPid_ << ", FU GUID = "
00574 << entry->originatorGuid_ << ". Fragment id is "
00575 << entry->segNumber_ << ", total number of fragments is "
00576 << entry->totalSegs_ << ".");
00577 (*buffer_deleter_)(entry);
00578 return;
00579 }
00580
00581
00582
00583
00584 pair<Collection::iterator,bool> rc =
00585 fragment_area_.insert(make_pair(FragKey(entry->code_, entry->run_, entry->id_,
00586 entry->secondaryId_, entry->originatorPid_,
00587 entry->originatorGuid_),
00588 FragmentContainer()));
00589
00590
00591
00592 FragmentContainer& fragContainer = rc.first->second;
00593 std::map<int, FragEntry>& eventFragmentMap = fragContainer.fragmentMap_;
00594 pair<std::map<int, FragEntry>::iterator, bool> fragInsertResult =
00595 eventFragmentMap.insert(make_pair(entry->segNumber_, *entry));
00596 bool duplicateEntry = ! fragInsertResult.second;
00597
00598
00599
00600 if (duplicateEntry)
00601 {
00602 LOG4CPLUS_ERROR(applicationLogger_,
00603 "Duplicate fragment ID received for DQM event " << entry->id_
00604 << " in run " << entry->run_ << " with folder ID of "
00605 << entry->secondaryId_ << ", FU PID = "
00606 << entry->originatorPid_ << ", FU GUID = "
00607 << entry->originatorGuid_ << ". Fragment id is "
00608 << entry->segNumber_ << ", total number of fragments is "
00609 << entry->totalSegs_ << ".");
00610 (*buffer_deleter_)(entry);
00611 return;
00612 }
00613
00614 else {
00615 fragContainer.lastFragmentTime_ = time(0);
00616 }
00617
00618 FR_DEBUG << "FragColl: added DQM fragment" << endl;
00619
00620 if((int)eventFragmentMap.size()==entry->totalSegs_)
00621 {
00622 FR_DEBUG << "FragColl: completed a DQM_event with "
00623 << entry->totalSegs_ << " segments" << endl;
00624
00625
00626
00627
00628 assembleFragments(eventFragmentMap);
00629
00630
00631 DQMEventMsgView dqmEventView(&event_area_[0]);
00632 dqmServiceManager_->manageDQMEventMsg(dqmEventView);
00633
00634
00635 fragment_area_.erase(rc.first);
00636
00637
00638 removeStaleFragments();
00639 }
00640 ProgressMarker::instance()->processing(false);
00641 }
00642
00643 void FragmentCollector::processErrorEvent(FragEntry* entry)
00644 {
00645 ProgressMarker::instance()->processing(true);
00646 if(entry->totalSegs_==1)
00647 {
00648 FR_DEBUG << "FragColl: Got an Error Event with one segment" << endl;
00649 FR_DEBUG << "FragColl: Event size " << entry->buffer_size_ << endl;
00650 FR_DEBUG << "FragColl: Event ID " << entry->id_ << endl;
00651
00652
00653 FRDEventMsgView emsg(entry->buffer_address_);
00654 FR_DEBUG << "FragColl: writing error event size " << entry->buffer_size_ << endl;
00655 writer_->manageErrorEventMsg(catalog_, disks_, sourceId_, emsg);
00656
00657
00658 (*buffer_deleter_)(entry);
00659 return;
00660 }
00661
00662
00663 if (entry->segNumber_ < 1 || entry->segNumber_ > entry->totalSegs_)
00664 {
00665 LOG4CPLUS_ERROR(applicationLogger_,
00666 "Invalid fragment ID received for Error event " << entry->id_
00667 << " in run " << entry->run_ << " with output module ID of "
00668 << entry->secondaryId_ << ", FU PID = "
00669 << entry->originatorPid_ << ", FU GUID = "
00670 << entry->originatorGuid_ << ". Fragment id is "
00671 << entry->segNumber_ << ", total number of fragments is "
00672 << entry->totalSegs_ << ".");
00673 (*buffer_deleter_)(entry);
00674 return;
00675 }
00676
00677
00678
00679
00680 pair<Collection::iterator,bool> rc =
00681 fragment_area_.insert(make_pair(FragKey(entry->code_, entry->run_, entry->id_,
00682 entry->secondaryId_, entry->originatorPid_,
00683 entry->originatorGuid_),
00684 FragmentContainer()));
00685
00686
00687
00688 FragmentContainer& fragContainer = rc.first->second;
00689 std::map<int, FragEntry>& eventFragmentMap = fragContainer.fragmentMap_;
00690 pair<std::map<int, FragEntry>::iterator, bool> fragInsertResult =
00691 eventFragmentMap.insert(make_pair(entry->segNumber_, *entry));
00692 bool duplicateEntry = ! fragInsertResult.second;
00693
00694
00695
00696 if (duplicateEntry)
00697 {
00698 LOG4CPLUS_ERROR(applicationLogger_,
00699 "Duplicate fragment ID received for Error event " << entry->id_
00700 << " in run " << entry->run_ << " with output module ID of "
00701 << entry->secondaryId_ << ", FU PID = "
00702 << entry->originatorPid_ << ", FU GUID = "
00703 << entry->originatorGuid_ << ". Fragment id is "
00704 << entry->segNumber_ << ", total number of fragments is "
00705 << entry->totalSegs_ << ".");
00706 (*buffer_deleter_)(entry);
00707 return;
00708 }
00709
00710 else {
00711 fragContainer.lastFragmentTime_ = time(0);
00712 }
00713
00714 FR_DEBUG << "FragColl: added Error event fragment" << endl;
00715
00716 if((int)eventFragmentMap.size()==entry->totalSegs_)
00717 {
00718 FR_DEBUG << "FragColl: completed an error event with "
00719 << entry->totalSegs_ << " segments" << endl;
00720
00721
00722
00723
00724 int assembledSize = assembleFragments(eventFragmentMap);
00725
00726 FRDEventMsgView emsg(&event_area_[0]);
00727 FR_DEBUG << "FragColl: writing error event size " << assembledSize << endl;
00728 writer_->manageErrorEventMsg(catalog_, disks_, sourceId_, emsg);
00729
00730
00731 fragment_area_.erase(rc.first);
00732
00733
00734 removeStaleFragments();
00735 }
00736 ProgressMarker::instance()->processing(false);
00737 }
00738
00752 int FragmentCollector::assembleFragments(std::map<int, FragEntry>& fragmentMap)
00753 {
00754
00755 unsigned int max_sizePerFrame = fragmentMap[1].buffer_size_;
00756 if((fragmentMap.size() * max_sizePerFrame) > event_area_.capacity())
00757 {
00758 event_area_.resize(fragmentMap.size() * max_sizePerFrame);
00759 }
00760 unsigned char* pos = (unsigned char*)&event_area_[0];
00761
00762 int sum=0;
00763 unsigned int lastpos=0;
00764 for (unsigned int idx = 1; idx <= fragmentMap.size(); ++idx)
00765 {
00766 FragEntry& workingEntry = fragmentMap[idx];
00767 int dsize = workingEntry.buffer_size_;
00768 sum+=dsize;
00769 unsigned char* from=(unsigned char*)workingEntry.buffer_address_;
00770 copy(from,from+dsize,pos+lastpos);
00771 lastpos = lastpos + dsize;
00772
00773 (*buffer_deleter_)(&workingEntry);
00774 }
00775
00776 return sum;
00777 }
00778
00786 int FragmentCollector::removeStaleFragments()
00787 {
00788
00789
00790 if (fragment_area_.size() == 0) {return 0;}
00791
00792
00793
00794
00795
00796
00797
00798
00799 time_t now = time(0);
00800 if ((now - lastStaleCheckTime_) < staleFragmentTimeout_) {return 0;}
00801
00802 lastStaleCheckTime_ = now;
00803
00804
00805
00806
00807
00808
00809 std::vector<FragKey> staleList;
00810 Collection::iterator areaIter;
00811 for (areaIter = fragment_area_.begin();
00812 areaIter != fragment_area_.end();
00813 ++areaIter)
00814 {
00815 FragmentContainer& fragContainer = areaIter->second;
00816 std::map<int, FragEntry>::iterator fragIter =
00817 fragContainer.fragmentMap_.begin();
00818 FragEntry& workingEntry = fragIter->second;
00819
00820
00821
00822
00823
00824
00825 if (fragContainer.lastFragmentTime_ > 0 &&
00826 fragContainer.lastFragmentTime_ >= fragContainer.creationTime_ &&
00827 (now - fragContainer.lastFragmentTime_) > staleFragmentTimeout_)
00828 {
00829 LOG4CPLUS_WARN(applicationLogger_,
00830 "Deleting a stale fragment set for event "
00831 << workingEntry.id_ << " in run " << workingEntry.run_
00832 << " with secondary ID of " << workingEntry.secondaryId_
00833 << ", FU PID = " << workingEntry.originatorPid_
00834 << ", FU GUID = " << workingEntry.originatorGuid_
00835 << ". The number of fragments received was "
00836 << fragContainer.fragmentMap_.size()
00837 << ", and the total number of fragments expected was "
00838 << workingEntry.totalSegs_ << ".");
00839 staleList.push_back(areaIter->first);
00840 }
00841 }
00842
00843
00844 for (unsigned int idx = 0; idx < staleList.size(); ++idx)
00845 {
00846 fragment_area_.erase(staleList[idx]);
00847 }
00848
00849 return staleList.size();
00850 }
00851 }