CMS 3D CMS Logo

FragmentCollector.cc

Go to the documentation of this file.
00001 // $Id: FragmentCollector.cc,v 1.44 2008/12/19 23:29:10 biery Exp $
00002 
00003 #include "EventFilter/StorageManager/interface/FragmentCollector.h"
00004 #include "EventFilter/StorageManager/interface/ProgressMarker.h"
00005 
00006 
00007 #include "IOPool/Streamer/interface/MsgHeader.h"
00008 #include "IOPool/Streamer/interface/InitMessage.h"
00009 #include "IOPool/Streamer/interface/DQMEventMessage.h"
00010 #include "IOPool/Streamer/interface/FRDEventMessage.h"
00011 
00012 #include "boost/bind.hpp"
00013 
00014 #include "log4cplus/loggingmacros.h"
00015 
00016 #include <algorithm>
00017 #include <utility>
00018 #include <cstdlib>
00019 #include <fstream>
00020 
00021 using namespace edm;
00022 using namespace std;
00023 
00024 static const bool debugme = getenv("FRAG_DEBUG")!=0;  
00025 #define FR_DEBUG if(debugme) std::cerr
00026 
00027 
00028 namespace stor
00029 {
00030 
00031 // TODO fixme: this quick fix to give thread status should be reviewed!
00032   struct SMFC_thread_data
00033   {
00034     SMFC_thread_data() {
00035       exception_in_thread = false;
00036       reason_for_exception = "";
00037     }
00038 
00039     volatile bool exception_in_thread;
00040     std::string reason_for_exception;
00041   };
00042 
00043   static SMFC_thread_data SMFragCollThread;
00044 
00045   bool getSMFC_exceptionStatus() { return SMFragCollThread.exception_in_thread; }
00046   std::string getSMFC_reason4Exception() { return SMFragCollThread.reason_for_exception; }
00047 
00048 
00049   FragmentCollector::FragmentCollector(HLTInfo& h,Deleter d,
00050                                        log4cplus::Logger& applicationLogger,
00051                                        const string& config_str):
00052     cmd_q_(&(h.getCommandQueue())),
00053     evtbuf_q_(&(h.getEventQueue())),
00054     frag_q_(&(h.getFragmentQueue())),
00055     buffer_deleter_(d),
00056     prods_(0),
00057     info_(&h), 
00058     lastStaleCheckTime_(time(0)),
00059     staleFragmentTimeout_(30),
00060     disks_(0),
00061     applicationLogger_(applicationLogger),
00062     writer_(new edm::ServiceManager(config_str)),
00063     dqmServiceManager_(new stor::DQMServiceManager())
00064   {
00065     // supposed to have given parameterSet smConfigString to writer_
00066     // at ctor
00067     event_area_.reserve(7000000);
00068   }
00069   FragmentCollector::FragmentCollector(std::auto_ptr<HLTInfo> info,Deleter d,
00070                                        log4cplus::Logger& applicationLogger,
00071                                        const string& config_str):
00072     cmd_q_(&(info.get()->getCommandQueue())),
00073     evtbuf_q_(&(info.get()->getEventQueue())),
00074     frag_q_(&(info.get()->getFragmentQueue())),
00075     buffer_deleter_(d),
00076     prods_(0),
00077     info_(info.get()), 
00078     lastStaleCheckTime_(time(0)),
00079     staleFragmentTimeout_(30),
00080     disks_(0),
00081     applicationLogger_(applicationLogger),
00082     writer_(new edm::ServiceManager(config_str)),
00083     dqmServiceManager_(new stor::DQMServiceManager())
00084   {
00085     // supposed to have given parameterSet smConfigString to writer_
00086     // at ctor
00087     event_area_.reserve(7000000);
00088   }
00089 
00090   FragmentCollector::~FragmentCollector()
00091   {
00092   }
00093 
00094   void FragmentCollector::run(FragmentCollector* t)
00095   {
00096     try {
00097       t->processFragments();
00098     }
00099     catch(cms::Exception& e)
00100     {
00101        edm::LogError("StorageManager") << "Fatal error in FragmentCollector thread: " << "\n"
00102                  << e.explainSelf() << std::endl;
00103        SMFragCollThread.exception_in_thread = true;
00104        SMFragCollThread.reason_for_exception = "Fatal error in FragmentCollector thread: \n" +
00105           e.explainSelf();
00106     }
00107     // just let the thread end here there is no cleanup, no recovery possible
00108   }
00109 
00110   void FragmentCollector::start()
00111   {
00112     // 14-Oct-2008, KAB - avoid race condition by starting writers first
00113     // (otherwise INIT message could be received and processed before
00114     // the writers are started (and whatever initialization is done in the
00115     // writers when INIT messages are processed could be wiped out by 
00116     // the start command)
00117     writer_->start();
00118     me_.reset(new boost::thread(boost::bind(FragmentCollector::run,this)));
00119   }
00120 
00121   void FragmentCollector::join()
00122   {
00123     me_->join();
00124   }
00125 
00126   void FragmentCollector::processFragments()
00127   {
00128     // everything comes in on the fragment queue, even
00129     // command-like messages.  we need to dispatch things
00130     // we recognize - either execute the command, forward it
00131     // to the command queue, or process it for output to the 
00132     // event queue.
00133     bool done=false;
00134 
00135     while(!done)
00136       {
00137         EventBuffer::ConsumerBuffer cb(*frag_q_);
00138         if(cb.size()==0) break;
00139         FragEntry* entry = (FragEntry*)cb.buffer();
00140         FR_DEBUG << "FragColl: " << (void*)this << " Got a buffer size="
00141                  << entry->buffer_size_ << endl;
00142         switch(entry->code_)
00143           {
00144           case Header::EVENT:
00145             {
00146               FR_DEBUG << "FragColl: Got an Event" << endl;
00147               processEvent(entry);
00148               break;
00149             }
00150           case Header::DONE:
00151             {
00152               // make sure that this is actually sent by the controller! (JBK)
00153               // this does nothing currently
00154               FR_DEBUG << "FragColl: Got a Done" << endl;
00155               done=true;
00156               break;
00157             }
00158           case Header::INIT:
00159             {
00160               FR_DEBUG << "FragColl: Got an Init" << endl;
00161               processHeader(entry);
00162               break;
00163             }
00164           case Header::DQM_EVENT:
00165             {
00166               FR_DEBUG << "FragColl: Got a DQM_Event" << endl;
00167               processDQMEvent(entry);
00168               break;
00169             }
00170           case Header::ERROR_EVENT:
00171             {
00172               FR_DEBUG << "FragColl: Got an Error_Event" << endl;
00173               processErrorEvent(entry);
00174               break;
00175             }
00176           case Header::FILE_CLOSE_REQUEST:
00177             {
00178               FR_DEBUG << "FragColl: Got a File Close Request message" << endl;
00179               writer_->closeFilesIfNeeded();
00180               break;
00181             }
00182           default:
00183             {
00184               FR_DEBUG << "FragColl: Got junk" << endl;
00185               break; // lets ignore other things for now
00186             }
00187           }
00188       }
00189     
00190     FR_DEBUG << "FragColl: DONE!" << endl;
00191     writer_->stop();
00192     dqmServiceManager_->stop();
00193   }
00194 
00195   void FragmentCollector::stop()
00196   {
00197     // called from a different thread - trigger completion to the
00198     // fragment collector, which will cause a completion of the 
00199     // event processor
00200 
00201     edm::EventBuffer::ProducerBuffer cb(*frag_q_);
00202     cb.commit();
00203   }
00204 
00205   void FragmentCollector::processEvent(FragEntry* entry)
00206   {
00207     ProgressMarker::instance()->processing(true);
00208     if(entry->totalSegs_==1)
00209     {
00210         FR_DEBUG << "FragColl: Got an Event with one segment" << endl;
00211         FR_DEBUG << "FragColl: Event size " << entry->buffer_size_ << endl;
00212         FR_DEBUG << "FragColl: Event ID " << entry->id_ << endl;
00213 
00214         // send immediately
00215         EventMsgView emsg(entry->buffer_address_);
00216         FR_DEBUG << "FragColl: writing event size " << entry->buffer_size_ << endl;
00217         writer_->manageEventMsg(emsg);
00218 
00219         if (eventServer_.get() != NULL)
00220         {
00221           eventServer_->processEvent(emsg);
00222         }
00223 
00224         // make sure the buffer properly released
00225         (*buffer_deleter_)(entry);
00226         return;
00227     } // end of single segment test
00228 
00229     // verify that the segment number of the fragment is valid
00230     if (entry->segNumber_ < 1 || entry->segNumber_ > entry->totalSegs_)
00231     {
00232       LOG4CPLUS_ERROR(applicationLogger_,
00233                       "Invalid fragment ID received for event " << entry->id_
00234                       << " in run " << entry->run_ << " with output module ID of "
00235                       << entry->secondaryId_ << ", FU PID = "
00236                       << entry->originatorPid_ << ", FU GUID = "
00237                       << entry->originatorGuid_  << ".  Fragment id is "
00238                       << entry->segNumber_ << ", total number of fragments is "
00239                       << entry->totalSegs_ << ".");
00240       (*buffer_deleter_)(entry);
00241       return;
00242     }
00243 
00244     // add a new entry to the fragment area (Collection) based on this
00245     // fragment's key (or fetch the existing entry if a fragment with the
00246     // same key has already been processed)
00247     pair<Collection::iterator,bool> rc =
00248       fragment_area_.insert(make_pair(FragKey(entry->code_, entry->run_, entry->id_,
00249                                               entry->secondaryId_, entry->originatorPid_,
00250                                               entry->originatorGuid_),
00251                                       FragmentContainer()));
00252 
00253     // add this fragment to the map of fragments for this event
00254     // (fragment map has key/value of fragment/segment ID and FragEntry)
00255     FragmentContainer& fragContainer = rc.first->second;
00256     std::map<int, FragEntry>& eventFragmentMap = fragContainer.fragmentMap_;
00257     pair<std::map<int, FragEntry>::iterator, bool> fragInsertResult =
00258       eventFragmentMap.insert(make_pair(entry->segNumber_, *entry));
00259     bool duplicateEntry = ! fragInsertResult.second;
00260 
00261     // if the specified fragment/segment ID already existed in the
00262     // map, complain and clean up
00263     if (duplicateEntry)
00264     {
00265       LOG4CPLUS_ERROR(applicationLogger_,
00266                       "Duplicate fragment ID received for event " << entry->id_
00267                       << " in run " << entry->run_ << " with output module ID of "
00268                       << entry->secondaryId_ << ", FU PID = "
00269                       << entry->originatorPid_ << ", FU GUID = "
00270                       << entry->originatorGuid_  << ".  Fragment id is "
00271                       << entry->segNumber_ << ", total number of fragments is "
00272                       << entry->totalSegs_ << ".");
00273       (*buffer_deleter_)(entry);
00274       return;
00275     }
00276     // otherwise, we update the last fragment time for this event
00277     else {
00278       fragContainer.lastFragmentTime_ = time(0);
00279     }
00280 
00281     FR_DEBUG << "FragColl: added fragment with segment number "
00282              << entry->segNumber_ << endl;
00283 
00284     if((int)eventFragmentMap.size()==entry->totalSegs_)
00285     {
00286         FR_DEBUG << "FragColl: completed an event with "
00287                  << entry->totalSegs_ << " segments" << endl;
00288 
00289         // the assembleFragments method has several side-effects:
00290         // - the event_area_ is filled, and it may be resized
00291         // - the fragment entries are deleted using the buffer_deleter_
00292         int assembledSize = assembleFragments(eventFragmentMap);
00293 
00294         EventMsgView emsg(&event_area_[0]);
00295         FR_DEBUG << "FragColl: writing event size " << assembledSize << endl;
00296         writer_->manageEventMsg(emsg);
00297 
00298         if (eventServer_.get() != NULL)
00299         {
00300           eventServer_->processEvent(emsg);
00301         }
00302 
00303         // remove the entry from the map
00304         fragment_area_.erase(rc.first);
00305 
00306         // check for stale fragments
00307         removeStaleFragments();
00308     }
00309     ProgressMarker::instance()->processing(false);
00310   }
00311 
00312   void FragmentCollector::processHeader(FragEntry* entry)
00313   {
00314     ProgressMarker::instance()->processing(true);
00315     if(entry->totalSegs_==1)
00316     {
00317       FR_DEBUG << "FragColl: Got an INIT message with one segment" << endl;
00318       FR_DEBUG << "FragColl: Output Module ID " << entry->secondaryId_ << endl;
00319 
00320       // send immediately
00321       InitMsgView msg(entry->buffer_address_);
00322       FR_DEBUG << "FragColl: writing INIT size " << entry->buffer_size_ << endl;
00323       writer_->manageInitMsg(catalog_, disks_, sourceId_, msg, *initMsgCollection_);
00324 
00325       try
00326       {
00327         if (initMsgCollection_->addIfUnique(msg))
00328         {
00329           // check if any currently connected consumers did not specify
00330           // an HLT output module label and we now have multiple, different,
00331           // INIT messages.  If so, we need to complain because the
00332           // SelectHLTOutput parameter needs to be specified when there
00333           // is more than one HLT output module (and correspondingly, more
00334           // than one INIT message)
00335           if (initMsgCollection_->size() > 1)
00336           {
00337             std::map< uint32, boost::shared_ptr<ConsumerPipe> > consumerTable = 
00338               eventServer_->getConsumerTable();
00339             std::map< uint32, boost::shared_ptr<ConsumerPipe> >::const_iterator 
00340               consumerIter;
00341             for (consumerIter = consumerTable.begin();
00342                  consumerIter != consumerTable.end();
00343                  ++consumerIter)
00344             {
00345               boost::shared_ptr<ConsumerPipe> consPtr = consumerIter->second;
00346 
00347               // for regular consumers, we need to test whether the consumer
00348               // configuration specified an HLT output module
00349               if (! consPtr->isProxyServer())
00350               {
00351                 if (consPtr->getHLTOutputSelection().empty())
00352                 {
00353                   // store a warning message in the consumer pipe to be
00354                   // sent to the consumer at the next opportunity
00355                   std::string errorString;
00356                   errorString.append("ERROR: The configuration for this ");
00357                   errorString.append("consumer does not specify an HLT output ");
00358                   errorString.append("module.\nPlease specify one of the HLT ");
00359                   errorString.append("output modules listed below as the ");
00360                   errorString.append("SelectHLTOutput parameter ");
00361                   errorString.append("in the InputSource configuration.\n");
00362                   errorString.append(initMsgCollection_->getSelectionHelpString());
00363                   errorString.append("\n");
00364                   consPtr->setRegistryWarning(errorString);
00365                 }
00366               }
00367             }
00368           }
00369         }
00370       }
00371       catch(cms::Exception& excpt)
00372       {
00373         char tidString[32];
00374         sprintf(tidString, "%d", entry->hltTid_);
00375         std::string logMsg = "receiveRegistryMessage: Error processing a ";
00376         logMsg.append("registry message from URL ");
00377         logMsg.append(entry->hltURL_);
00378         logMsg.append(" and Tid ");
00379         logMsg.append(tidString);
00380         logMsg.append(":\n");
00381         logMsg.append(excpt.what());
00382         logMsg.append("\n");
00383         logMsg.append(initMsgCollection_->getSelectionHelpString());
00384         FDEBUG(9) << logMsg << std::endl;
00385         LOG4CPLUS_ERROR(applicationLogger_, logMsg);
00386 
00387         throw excpt;
00388       }
00389 
00390       smRBSenderList_->registerDataSender(&entry->hltURL_[0], &entry->hltClassName_[0],
00391                                           entry->hltLocalId_, entry->hltInstance_, entry->hltTid_,
00392                                           entry->segNumber_, entry->totalSegs_, msg.size(),
00393                                           msg.outputModuleLabel(), msg.outputModuleId(),
00394                                           entry->rbBufferID_);
00395 
00396       // make sure the buffer properly released
00397       (*buffer_deleter_)(entry);
00398       return;
00399     } // end of single segment test
00400 
00401     // verify that the segment number of the fragment is valid
00402     if (entry->segNumber_ < 1 || entry->segNumber_ > entry->totalSegs_)
00403     {
00404       LOG4CPLUS_ERROR(applicationLogger_,
00405                       "Invalid fragment ID received for INIT " << entry->id_
00406                       << " in run " << entry->run_ << " with output module ID of "
00407                       << entry->secondaryId_ << ", FU PID = "
00408                       << entry->originatorPid_ << ", FU GUID = "
00409                       << entry->originatorGuid_  << ".  Fragment id is "
00410                       << entry->segNumber_ << ", total number of fragments is "
00411                       << entry->totalSegs_ << ".");
00412       (*buffer_deleter_)(entry);
00413       return;
00414     }
00415 
00416     // add a new entry to the fragment area (Collection) based on this
00417     // fragment's key (or fetch the existing entry if a fragment with the
00418     // same key has already been processed)
00419     pair<Collection::iterator,bool> rc =
00420       fragment_area_.insert(make_pair(FragKey(entry->code_, entry->run_, entry->id_,
00421                                               entry->secondaryId_, entry->originatorPid_,
00422                                               entry->originatorGuid_),
00423                                       FragmentContainer()));
00424 
00425     // add this fragment to the map of fragments for this event
00426     // (fragment map has key/value of fragment/segment ID and FragEntry)
00427     FragmentContainer& fragContainer = rc.first->second;
00428     std::map<int, FragEntry>& eventFragmentMap = fragContainer.fragmentMap_;
00429     pair<std::map<int, FragEntry>::iterator, bool> fragInsertResult =
00430       eventFragmentMap.insert(make_pair(entry->segNumber_, *entry));
00431     bool duplicateEntry = ! fragInsertResult.second;
00432 
00433     // if the specified fragment/segment ID already existed in the
00434     // map, complain and clean up
00435     if (duplicateEntry)
00436     {
00437       LOG4CPLUS_ERROR(applicationLogger_,
00438                       "Duplicate fragment ID received for INIT " << entry->id_
00439                       << " in run " << entry->run_ << " with output module ID of "
00440                       << entry->secondaryId_ << ", FU PID = "
00441                       << entry->originatorPid_ << ", FU GUID = "
00442                       << entry->originatorGuid_  << ".  Fragment id is "
00443                       << entry->segNumber_ << ", total number of fragments is "
00444                       << entry->totalSegs_ << ".");
00445       (*buffer_deleter_)(entry);
00446       return;
00447     }
00448     // otherwise, we update the last fragment time for this event
00449     else {
00450       fragContainer.lastFragmentTime_ = time(0);
00451     }
00452 
00453     FR_DEBUG << "FragColl: added INIT fragment with segment number "
00454              << entry->segNumber_ << endl;
00455 
00456     if((int)eventFragmentMap.size()==entry->totalSegs_)
00457     {
00458       FR_DEBUG << "FragColl: completed an INIT message with "
00459                << entry->totalSegs_ << " segments" << endl;
00460 
00461       // the assembleFragments method has several side-effects:
00462       // - the event_area_ is filled, and it may be resized
00463       // - the fragment entries are deleted using the buffer_deleter_
00464       int assembledSize = assembleFragments(eventFragmentMap);
00465 
00466       InitMsgView msg(&event_area_[0]);
00467       FR_DEBUG << "FragColl: writing INIT size " << assembledSize << endl;
00468       writer_->manageInitMsg(catalog_, disks_, sourceId_, msg, *initMsgCollection_);
00469 
00470       try
00471       {
00472         if (initMsgCollection_->addIfUnique(msg))
00473         {
00474           // check if any currently connected consumers did not specify
00475           // an HLT output module label and we now have multiple, different,
00476           // INIT messages.  If so, we need to complain because the
00477           // SelectHLTOutput parameter needs to be specified when there
00478           // is more than one HLT output module (and correspondingly, more
00479           // than one INIT message)
00480           if (initMsgCollection_->size() > 1)
00481           {
00482             std::map< uint32, boost::shared_ptr<ConsumerPipe> > consumerTable = 
00483               eventServer_->getConsumerTable();
00484             std::map< uint32, boost::shared_ptr<ConsumerPipe> >::const_iterator 
00485               consumerIter;
00486             for (consumerIter = consumerTable.begin();
00487                  consumerIter != consumerTable.end();
00488                  ++consumerIter)
00489             {
00490               boost::shared_ptr<ConsumerPipe> consPtr = consumerIter->second;
00491 
00492               // for regular consumers, we need to test whether the consumer
00493               // configuration specified an HLT output module
00494               if (! consPtr->isProxyServer())
00495               {
00496                 if (consPtr->getHLTOutputSelection().empty())
00497                 {
00498                   // store a warning message in the consumer pipe to be
00499                   // sent to the consumer at the next opportunity
00500                   std::string errorString;
00501                   errorString.append("ERROR: The configuration for this ");
00502                   errorString.append("consumer does not specify an HLT output ");
00503                   errorString.append("module.\nPlease specify one of the HLT ");
00504                   errorString.append("output modules listed below as the ");
00505                   errorString.append("SelectHLTOutput parameter ");
00506                   errorString.append("in the InputSource configuration.\n");
00507                   errorString.append(initMsgCollection_->getSelectionHelpString());
00508                   errorString.append("\n");
00509                   consPtr->setRegistryWarning(errorString);
00510                 }
00511               }
00512             }
00513           }
00514         }
00515       }
00516       catch(cms::Exception& excpt)
00517       {
00518         char tidString[32];
00519         sprintf(tidString, "%d", entry->hltTid_);
00520         std::string logMsg = "receiveRegistryMessage: Error processing a ";
00521         logMsg.append("registry message from URL ");
00522         logMsg.append(entry->hltURL_);
00523         logMsg.append(" and Tid ");
00524         logMsg.append(tidString);
00525         logMsg.append(":\n");
00526         logMsg.append(excpt.what());
00527         logMsg.append("\n");
00528         logMsg.append(initMsgCollection_->getSelectionHelpString());
00529         FDEBUG(9) << logMsg << std::endl;
00530         LOG4CPLUS_ERROR(applicationLogger_, logMsg);
00531 
00532         throw excpt;
00533       }
00534 
00535       smRBSenderList_->registerDataSender(&entry->hltURL_[0], &entry->hltClassName_[0],
00536                                           entry->hltLocalId_, entry->hltInstance_, entry->hltTid_,
00537                                           entry->segNumber_, entry->totalSegs_, msg.size(),
00538                                           msg.outputModuleLabel(), msg.outputModuleId(),
00539                                           entry->rbBufferID_);
00540 
00541       // remove the entry from the map
00542       fragment_area_.erase(rc.first);
00543 
00544       // check for stale fragments
00545       removeStaleFragments();
00546     }
00547     ProgressMarker::instance()->processing(false);
00548   }
00549 
00550   void FragmentCollector::processDQMEvent(FragEntry* entry)
00551   {
00552     ProgressMarker::instance()->processing(true);
00553     if(entry->totalSegs_==1)
00554     {
00555       FR_DEBUG << "FragColl: Got a DQM_Event with one segment" << endl;
00556       FR_DEBUG << "FragColl: DQM_Event size " << entry->buffer_size_ << endl;
00557       FR_DEBUG << "FragColl: DQM_Event ID " << entry->id_ << endl;
00558       FR_DEBUG << "FragColl: DQM_Event folderID " << entry->secondaryId_ << endl;
00559 
00560       DQMEventMsgView dqmEventView(entry->buffer_address_);
00561       dqmServiceManager_->manageDQMEventMsg(dqmEventView);
00562       (*buffer_deleter_)(entry);
00563       return;
00564     } // end of single segment test
00565 
00566     // verify that the segment number of the fragment is valid
00567     if (entry->segNumber_ < 1 || entry->segNumber_ > entry->totalSegs_)
00568     {
00569       LOG4CPLUS_ERROR(applicationLogger_,
00570                       "Invalid fragment ID received for DQM event " << entry->id_
00571                       << " in run " << entry->run_ << " with folder ID of "
00572                       << entry->secondaryId_ << ", FU PID = "
00573                       << entry->originatorPid_ << ", FU GUID = "
00574                       << entry->originatorGuid_  << ".  Fragment id is "
00575                       << entry->segNumber_ << ", total number of fragments is "
00576                       << entry->totalSegs_ << ".");
00577       (*buffer_deleter_)(entry);
00578       return;
00579     }
00580 
00581     // add a new entry to the fragment area (Collection) based on this
00582     // fragment's key (or fetch the existing entry if a fragment with the
00583     // same key has already been processed)
00584     pair<Collection::iterator,bool> rc =
00585       fragment_area_.insert(make_pair(FragKey(entry->code_, entry->run_, entry->id_,
00586                                               entry->secondaryId_, entry->originatorPid_,
00587                                               entry->originatorGuid_),
00588                                       FragmentContainer()));
00589 
00590     // add this fragment to the map of fragments for this event
00591     // (fragment map has key/value of fragment/segment ID and FragEntry)
00592     FragmentContainer& fragContainer = rc.first->second;
00593     std::map<int, FragEntry>& eventFragmentMap = fragContainer.fragmentMap_;
00594     pair<std::map<int, FragEntry>::iterator, bool> fragInsertResult =
00595       eventFragmentMap.insert(make_pair(entry->segNumber_, *entry));
00596     bool duplicateEntry = ! fragInsertResult.second;
00597 
00598     // if the specified fragment/segment ID already existed in the
00599     // map, complain and clean up
00600     if (duplicateEntry)
00601     {
00602       LOG4CPLUS_ERROR(applicationLogger_,
00603                       "Duplicate fragment ID received for DQM event " << entry->id_
00604                       << " in run " << entry->run_ << " with folder ID of "
00605                       << entry->secondaryId_ << ", FU PID = "
00606                       << entry->originatorPid_ << ", FU GUID = "
00607                       << entry->originatorGuid_  << ".  Fragment id is "
00608                       << entry->segNumber_ << ", total number of fragments is "
00609                       << entry->totalSegs_ << ".");
00610       (*buffer_deleter_)(entry);
00611       return;
00612     }
00613     // otherwise, we update the last fragment time for this event
00614     else {
00615       fragContainer.lastFragmentTime_ = time(0);
00616     }
00617 
00618     FR_DEBUG << "FragColl: added DQM fragment" << endl;
00619     
00620     if((int)eventFragmentMap.size()==entry->totalSegs_)
00621     {
00622       FR_DEBUG << "FragColl: completed a DQM_event with "
00623        << entry->totalSegs_ << " segments" << endl;
00624 
00625       // the assembleFragments method has several side-effects:
00626       // - the event_area_ is filled, and it may be resized
00627       // - the fragment entries are deleted using the buffer_deleter_
00628       assembleFragments(eventFragmentMap);
00629 
00630       // the reformed DQM data is now in event_area_ deal with it
00631       DQMEventMsgView dqmEventView(&event_area_[0]);
00632       dqmServiceManager_->manageDQMEventMsg(dqmEventView);
00633 
00634       // remove the entry from the map
00635       fragment_area_.erase(rc.first);
00636 
00637       // check for stale fragments
00638       removeStaleFragments();
00639     }
00640     ProgressMarker::instance()->processing(false);
00641   }
00642 
00643   void FragmentCollector::processErrorEvent(FragEntry* entry)
00644   {
00645     ProgressMarker::instance()->processing(true);
00646     if(entry->totalSegs_==1)
00647     {
00648         FR_DEBUG << "FragColl: Got an Error Event with one segment" << endl;
00649         FR_DEBUG << "FragColl: Event size " << entry->buffer_size_ << endl;
00650         FR_DEBUG << "FragColl: Event ID " << entry->id_ << endl;
00651 
00652         // send immediately
00653         FRDEventMsgView emsg(entry->buffer_address_);
00654         FR_DEBUG << "FragColl: writing error event size " << entry->buffer_size_ << endl;
00655         writer_->manageErrorEventMsg(catalog_, disks_, sourceId_, emsg);
00656 
00657         // make sure the buffer properly released
00658         (*buffer_deleter_)(entry);
00659         return;
00660     } // end of single segment test
00661 
00662     // verify that the segment number of the fragment is valid
00663     if (entry->segNumber_ < 1 || entry->segNumber_ > entry->totalSegs_)
00664     {
00665       LOG4CPLUS_ERROR(applicationLogger_,
00666                       "Invalid fragment ID received for Error event " << entry->id_
00667                       << " in run " << entry->run_ << " with output module ID of "
00668                       << entry->secondaryId_ << ", FU PID = "
00669                       << entry->originatorPid_ << ", FU GUID = "
00670                       << entry->originatorGuid_  << ".  Fragment id is "
00671                       << entry->segNumber_ << ", total number of fragments is "
00672                       << entry->totalSegs_ << ".");
00673       (*buffer_deleter_)(entry);
00674       return;
00675     }
00676 
00677     // add a new entry to the fragment area (Collection) based on this
00678     // fragment's key (or fetch the existing entry if a fragment with the
00679     // same key has already been processed)
00680     pair<Collection::iterator,bool> rc =
00681       fragment_area_.insert(make_pair(FragKey(entry->code_, entry->run_, entry->id_,
00682                                               entry->secondaryId_, entry->originatorPid_,
00683                                               entry->originatorGuid_),
00684                                       FragmentContainer()));
00685 
00686     // add this fragment to the map of fragments for this event
00687     // (fragment map has key/value of fragment/segment ID and FragEntry)
00688     FragmentContainer& fragContainer = rc.first->second;
00689     std::map<int, FragEntry>& eventFragmentMap = fragContainer.fragmentMap_;
00690     pair<std::map<int, FragEntry>::iterator, bool> fragInsertResult =
00691       eventFragmentMap.insert(make_pair(entry->segNumber_, *entry));
00692     bool duplicateEntry = ! fragInsertResult.second;
00693 
00694     // if the specified fragment/segment ID already existed in the
00695     // map, complain and clean up
00696     if (duplicateEntry)
00697     {
00698       LOG4CPLUS_ERROR(applicationLogger_,
00699                       "Duplicate fragment ID received for Error event " << entry->id_
00700                       << " in run " << entry->run_ << " with output module ID of "
00701                       << entry->secondaryId_ << ", FU PID = "
00702                       << entry->originatorPid_ << ", FU GUID = "
00703                       << entry->originatorGuid_  << ".  Fragment id is "
00704                       << entry->segNumber_ << ", total number of fragments is "
00705                       << entry->totalSegs_ << ".");
00706       (*buffer_deleter_)(entry);
00707       return;
00708     }
00709     // otherwise, we update the last fragment time for this event
00710     else {
00711       fragContainer.lastFragmentTime_ = time(0);
00712     }
00713 
00714     FR_DEBUG << "FragColl: added Error event fragment" << endl;
00715     
00716     if((int)eventFragmentMap.size()==entry->totalSegs_)
00717     {
00718         FR_DEBUG << "FragColl: completed an error event with "
00719                  << entry->totalSegs_ << " segments" << endl;
00720 
00721         // the assembleFragments method has several side-effects:
00722         // - the event_area_ is filled, and it may be resized
00723         // - the fragment entries are deleted using the buffer_deleter_
00724         int assembledSize = assembleFragments(eventFragmentMap);
00725 
00726         FRDEventMsgView emsg(&event_area_[0]);
00727         FR_DEBUG << "FragColl: writing error event size " << assembledSize << endl;
00728         writer_->manageErrorEventMsg(catalog_, disks_, sourceId_, emsg);
00729 
00730         // remove the entry from the map
00731         fragment_area_.erase(rc.first);
00732 
00733         // check for stale fragments
00734         removeStaleFragments();
00735     }
00736     ProgressMarker::instance()->processing(false);
00737   }
00738 
00752   int FragmentCollector::assembleFragments(std::map<int, FragEntry>& fragmentMap)
00753   {
00754     // first make sure we have enough room; use an overestimate
00755     unsigned int max_sizePerFrame = fragmentMap[1].buffer_size_;
00756     if((fragmentMap.size() * max_sizePerFrame) > event_area_.capacity())
00757     {
00758       event_area_.resize(fragmentMap.size() * max_sizePerFrame);
00759     }
00760     unsigned char* pos = (unsigned char*)&event_area_[0];
00761         
00762     int sum=0;
00763     unsigned int lastpos=0;
00764     for (unsigned int idx = 1; idx <= fragmentMap.size(); ++idx)
00765     {
00766       FragEntry& workingEntry = fragmentMap[idx];
00767       int dsize = workingEntry.buffer_size_;
00768       sum+=dsize;
00769       unsigned char* from=(unsigned char*)workingEntry.buffer_address_;
00770       copy(from,from+dsize,pos+lastpos);
00771       lastpos = lastpos + dsize;
00772       // ask deleter to kill off the buffer
00773       (*buffer_deleter_)(&workingEntry);
00774     }
00775 
00776     return sum;
00777   }
00778 
00786   int FragmentCollector::removeStaleFragments()
00787   {
00788     // if there are no entries in the fragment_area_, we know
00789     // right away that there are no stale fragments
00790     if (fragment_area_.size() == 0) {return 0;}
00791 
00792     // check if it is time to look for stale fragments
00793     // (we could have a separate interval specified for how often
00794     // to run the test, but for now, we'll just use an interval
00795     // of the stale timeout.  So, the staleFragmentTimeout is doing
00796     // double duty - it tells us how old fragments must be before
00797     // we delete them and it tells us how often to run the test of
00798     // whether any stale fragments exist.
00799     time_t now = time(0);
00800     if ((now - lastStaleCheckTime_) < staleFragmentTimeout_) {return 0;}
00801 
00802     lastStaleCheckTime_ = now;
00803     //LOG4CPLUS_INFO(applicationLogger_,
00804     //               "Running the stale fragment test at " << now
00805     //               << ", number of entries in the fragment area is "
00806     //               << fragment_area_.size() << ".");
00807 
00808     //  build up a list of events that need to be removed
00809     std::vector<FragKey> staleList;
00810     Collection::iterator areaIter;
00811     for (areaIter = fragment_area_.begin();
00812          areaIter != fragment_area_.end();
00813          ++areaIter)
00814     {
00815       FragmentContainer& fragContainer = areaIter->second;
00816       std::map<int, FragEntry>::iterator fragIter =
00817         fragContainer.fragmentMap_.begin();
00818       FragEntry& workingEntry = fragIter->second;
00819       //LOG4CPLUS_INFO(applicationLogger_,
00820       //               "Testing if the fragments for event " << workingEntry.id_
00821       //               << " in run " << workingEntry.run_ << " are stale.  "
00822       //               << "Now = " << now << ", lastFragmentTime = "
00823       //               << fragContainer.lastFragmentTime_ << ".");
00824       // remember, the granularity of the times is a (large) one second
00825       if (fragContainer.lastFragmentTime_ > 0 &&
00826           fragContainer.lastFragmentTime_ >= fragContainer.creationTime_ &&
00827           (now - fragContainer.lastFragmentTime_) > staleFragmentTimeout_)
00828       {
00829         LOG4CPLUS_WARN(applicationLogger_,
00830                        "Deleting a stale fragment set for event "
00831                        << workingEntry.id_ << " in run " << workingEntry.run_
00832                        << " with secondary ID of " << workingEntry.secondaryId_
00833                        << ", FU PID = " << workingEntry.originatorPid_
00834                        << ", FU GUID = " << workingEntry.originatorGuid_
00835                        << ".  The number of fragments received was "
00836                        << fragContainer.fragmentMap_.size()
00837                        << ", and the total number of fragments expected was "
00838                        << workingEntry.totalSegs_ << ".");
00839         staleList.push_back(areaIter->first);
00840       }
00841     }
00842 
00843     // actually do the removal
00844     for (unsigned int idx = 0; idx < staleList.size(); ++idx)
00845     {
00846       fragment_area_.erase(staleList[idx]);
00847     }
00848 
00849     return staleList.size();
00850   }
00851 }

Generated on Tue Jun 9 17:34:57 2009 for CMSSW by  doxygen 1.5.4