CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_5_3_0/src/EventFilter/StorageManager/src/ChainData.cc

Go to the documentation of this file.
00001 // $Id: ChainData.cc,v 1.20 2012/04/20 10:48:01 mommsen Exp $
00003 
00004 #include "IOPool/Streamer/interface/MsgHeader.h"
00005 
00006 #include "EventFilter/Utilities/interface/i2oEvfMsgs.h"
00007 
00008 #include "EventFilter/StorageManager/interface/Utils.h"
00009 #include "EventFilter/StorageManager/interface/DQMKey.h"
00010 #include "EventFilter/StorageManager/interface/StreamID.h"
00011 #include "EventFilter/StorageManager/interface/QueueID.h"
00012 #include "EventFilter/StorageManager/interface/Exception.h"
00013 
00014 #include "EventFilter/StorageManager/src/ChainData.h"
00015 
00016 #include "interface/shared/i2oXFunctionCodes.h"
00017 #include "interface/shared/version.h"
00018 
00019 #include <stdlib.h>
00020 #include "zlib.h"
00021 
00022 
00023 using namespace stor;
00024 
00025 // A ChainData object may or may not contain a Reference.
00026 detail::ChainData::ChainData(const unsigned short i2oMessageCode,
00027                              const unsigned int messageCode) :
00028   streamTags_(),
00029   eventConsumerTags_(),
00030   dqmEventConsumerTags_(),
00031   ref_(0),
00032   complete_(false),
00033   faultyBits_(INCOMPLETE_MESSAGE),
00034   messageCode_(messageCode),
00035   i2oMessageCode_(i2oMessageCode),
00036   fragKey_(Header::INVALID,0,0,0,0,0),
00037   fragmentCount_(0),
00038   expectedNumberOfFragments_(0),
00039   rbBufferId_(0),
00040   hltLocalId_(0),
00041   hltInstance_(0),
00042   hltTid_(0),
00043   fuProcessId_(0),
00044   fuGuid_(0)
00045 {
00046   #ifdef STOR_DEBUG_CORRUPT_MESSAGES
00047   double r = rand()/static_cast<double>(RAND_MAX);
00048   if (r < 0.001)
00049   {
00050     // std::cout << "Simulating corrupt I2O message" << std::endl;
00051     // markCorrupt();
00052   }
00053   else if (r < 0.02)
00054   {
00055     std::cout << "Simulating faulty I2O message" << std::endl;
00056     markFaulty();
00057   }
00058   #endif // STOR_DEBUG_CORRUPT_MESSAGES
00059 }
00060 
00061 // A ChainData that has a Reference is in charge of releasing
00062 // it. Because releasing a Reference can throw an exception, we have
00063 // to be prepared to swallow the exception. This is fairly gross,
00064 // because we lose any exception information. But allowing an
00065 // exception to escape from a destructor is even worse, so we must
00066 // do what we must do.
00067 //
00068 detail::ChainData::~ChainData()
00069 {
00070   if (ref_) 
00071     {
00072       //std::cout << std::endl << std::endl << std::hex
00073       //          << "### releasing 0x" << ((int) ref_)
00074       //          << std::dec << std::endl << std::endl;
00075       try { ref_->release(); }
00076       catch (...) { /* swallow any exception. */ }
00077     }
00078 }
00079 
00080 bool detail::ChainData::empty() const
00081 {
00082   return !ref_;
00083 }
00084 
00085 bool detail::ChainData::complete() const
00086 {
00087   return complete_;
00088 }
00089 
00090 bool detail::ChainData::faulty() const
00091 {
00092   if (complete_)
00093     return (faultyBits_ != 0);
00094   else
00095     return (faultyBits_ != INCOMPLETE_MESSAGE);
00096 }
00097 
00098 unsigned int detail::ChainData::faultyBits() const
00099 {
00100   return faultyBits_;
00101 }
00102 
00103 bool detail::ChainData::parsable() const
00104 {
00105   return (ref_) && 
00106     ((faultyBits_ & INVALID_INITIAL_REFERENCE & ~INCOMPLETE_MESSAGE) == 0) &&
00107     ((faultyBits_ & CORRUPT_INITIAL_HEADER & ~INCOMPLETE_MESSAGE) == 0);
00108 }
00109 
00110 bool detail::ChainData::headerOkay() const
00111 {
00112   return ( (faultyBits_ & ~WRONG_CHECKSUM) == 0);
00113 }
00114 
00115 void detail::ChainData::addFirstFragment(toolbox::mem::Reference* pRef)
00116 {
00117   if (ref_)
00118   {
00119     XCEPT_RAISE(stor::exception::I2OChain, "Cannot add a first fragment to a non-empty I2OChain.");
00120   }
00121   ref_ = pRef;
00122 
00123   // Avoid the situation in which all unparsable chains
00124   // have the same fragment key.  We do this by providing a 
00125   // variable default value for one of the fragKey fields.
00126   if (pRef)
00127     {
00128       fragKey_.secondaryId_ = static_cast<uint32_t>(
00129         (uintptr_t)pRef->getDataLocation()
00130       );
00131     }
00132   else
00133     {
00134       fragKey_.secondaryId_ = static_cast<uint32_t>( time(0) );
00135     }
00136 
00137   if (pRef)
00138     {
00139       creationTime_ = utils::getCurrentTime();
00140       lastFragmentTime_ = creationTime_;
00141       staleWindowStartTime_ = creationTime_;
00142 
00143       // first fragment in Reference chain
00144       ++fragmentCount_;
00145       int workingIndex = -1;
00146 
00147       if (validateDataLocation(pRef, INVALID_INITIAL_REFERENCE) &&
00148           validateMessageSize(pRef, CORRUPT_INITIAL_HEADER) &&
00149           validateFragmentIndexAndCount(pRef, CORRUPT_INITIAL_HEADER))
00150         {
00151           I2O_SM_MULTIPART_MESSAGE_FRAME *smMsg =
00152             (I2O_SM_MULTIPART_MESSAGE_FRAME*) pRef->getDataLocation();
00153           expectedNumberOfFragments_ = smMsg->numFrames;
00154           validateFragmentOrder(pRef, workingIndex);
00155         }
00156 
00157       // subsequent fragments in Reference chain
00158       toolbox::mem::Reference* curRef = pRef->getNextReference();
00159       while (curRef)
00160         {
00161           ++fragmentCount_;
00162           
00163           if (validateDataLocation(curRef, INVALID_SECONDARY_REFERENCE) &&
00164               validateMessageSize(curRef, CORRUPT_SECONDARY_HEADER) &&
00165               validateFragmentIndexAndCount(curRef, CORRUPT_SECONDARY_HEADER))
00166             {
00167               validateExpectedFragmentCount(curRef, TOTAL_COUNT_MISMATCH);
00168               validateFragmentOrder(curRef, workingIndex);
00169               validateMessageCode(curRef, i2oMessageCode_);
00170             }
00171           
00172           curRef = curRef->getNextReference();
00173         }
00174     }
00175 
00176   checkForCompleteness();
00177 }
00178 
00179 void detail::ChainData::addToChain(ChainData const& newpart)
00180 {
00181   if ( this->empty() )
00182   {
00183     addFirstFragment(newpart.ref_);
00184     return;
00185   }
00186 
00187   if (parsable() && newpart.parsable())
00188   {
00189     // loop over the fragments in the new part
00190     toolbox::mem::Reference* newRef = newpart.ref_;
00191     while (newRef)
00192     {
00193       // unlink the next element in the new chain from that chain
00194       toolbox::mem::Reference* nextNewRef = newRef->getNextReference();
00195       newRef->setNextReference(0);
00196       
00197       // if the new fragment that we're working with is the first one
00198       // in its chain, we need to duplicate it (now that it is unlinked)
00199       // This is necessary since it is still being managed by an I2OChain
00200       // somewhere.  The subsequent fragments in the new part do not
00201       // need to be duplicated since we explicitly unlinked them from
00202       // the first one.
00203       if (newRef == newpart.ref_) {newRef = newpart.ref_->duplicate();}
00204       
00205       // we want to track whether the fragment was added (it *always* should be)
00206       bool fragmentWasAdded = false;
00207       
00208       // determine the index of the new fragment
00209       I2O_SM_MULTIPART_MESSAGE_FRAME *thatMsg =
00210         (I2O_SM_MULTIPART_MESSAGE_FRAME*) newRef->getDataLocation();
00211       unsigned int newIndex = thatMsg->frameCount;
00212       //std::cout << "newIndex = " << newIndex << std::endl;
00213       
00214       // verify that the total fragment counts match
00215       unsigned int newFragmentTotalCount = thatMsg->numFrames;
00216       if (newFragmentTotalCount != expectedNumberOfFragments_)
00217       {
00218         faultyBits_ |= TOTAL_COUNT_MISMATCH;
00219       }
00220       
00221       // if the new fragment goes at the head of the chain, handle that here
00222       I2O_SM_MULTIPART_MESSAGE_FRAME *fragMsg =
00223         (I2O_SM_MULTIPART_MESSAGE_FRAME*) ref_->getDataLocation();
00224       unsigned int firstIndex = fragMsg->frameCount;
00225       //std::cout << "firstIndex = " << firstIndex << std::endl;
00226       if (newIndex < firstIndex)
00227       {
00228         newRef->setNextReference(ref_);
00229         ref_ = newRef;
00230         fragmentWasAdded = true;
00231       }
00232 
00233       else
00234       {
00235         // loop over the existing fragments and insert the new one
00236         // in the correct place
00237         toolbox::mem::Reference* curRef = ref_;
00238         for (unsigned int idx = 0; idx < fragmentCount_; ++idx)
00239         {
00240           // if we have a duplicate fragment, add it after the existing
00241           // one and indicate the error
00242           I2O_SM_MULTIPART_MESSAGE_FRAME *curMsg =
00243             (I2O_SM_MULTIPART_MESSAGE_FRAME*) curRef->getDataLocation();
00244           unsigned int curIndex = curMsg->frameCount;
00245           //std::cout << "curIndex = " << curIndex << std::endl;
00246           if (newIndex == curIndex) 
00247           {
00248             faultyBits_ |= DUPLICATE_FRAGMENT;
00249             newRef->setNextReference(curRef->getNextReference());
00250             curRef->setNextReference(newRef);
00251             fragmentWasAdded = true;
00252             break;
00253           }
00254           
00255           // if we have reached the end of the chain, add the
00256           // new fragment to the end
00257           //std::cout << "nextRef = " << ((int) nextRef) << std::endl;
00258           toolbox::mem::Reference* nextRef = curRef->getNextReference();
00259           if (nextRef == 0)
00260           {
00261             curRef->setNextReference(newRef);
00262             fragmentWasAdded = true;
00263             break;
00264           }
00265           
00266           I2O_SM_MULTIPART_MESSAGE_FRAME *nextMsg =
00267             (I2O_SM_MULTIPART_MESSAGE_FRAME*) nextRef->getDataLocation();
00268           unsigned int nextIndex = nextMsg->frameCount;
00269           //std::cout << "nextIndex = " << nextIndex << std::endl;
00270           if (newIndex > curIndex && newIndex < nextIndex)
00271           {
00272             newRef->setNextReference(curRef->getNextReference());
00273             curRef->setNextReference(newRef);
00274             fragmentWasAdded = true;
00275             break;
00276           }
00277           
00278           curRef = nextRef;
00279         }
00280       }
00281       
00282       // update the fragment count and check if the chain is now complete
00283       if (!fragmentWasAdded)
00284       {
00285         // this should never happen - if it does, there is a logic
00286         // error in the loop above
00287         XCEPT_RAISE(stor::exception::I2OChain,
00288           "A fragment was unable to be added to a chain.");
00289       }
00290       ++fragmentCount_;
00291       
00292       newRef = nextNewRef;
00293     }
00294   }
00295   else
00296   {
00297     // if either the current chain or the newpart are nor parsable,
00298     // we simply append the new stuff to the end of the existing chain
00299     
00300     // update our fragment count to include the new fragments
00301     toolbox::mem::Reference* curRef = newpart.ref_;
00302     while (curRef) {
00303       ++fragmentCount_;
00304       curRef = curRef->getNextReference();
00305     }
00306     
00307     // append the new fragments to the end of the existing chain
00308     toolbox::mem::Reference* lastRef = ref_;
00309     curRef = ref_->getNextReference();
00310     while (curRef) {
00311       lastRef = curRef;
00312       curRef = curRef->getNextReference();
00313     }
00314     lastRef->setNextReference(newpart.ref_->duplicate());
00315     
00316     // update the time stamps
00317     lastFragmentTime_ = utils::getCurrentTime();
00318     staleWindowStartTime_ = lastFragmentTime_;
00319     if (newpart.creationTime() < creationTime_)
00320     {
00321       creationTime_ = newpart.creationTime();
00322     }
00323     
00324     return;
00325   }
00326 
00327   // merge the faulty flags from the new part into the existing flags
00328   faultyBits_ |= newpart.faultyBits_;
00329 
00330   // update the time stamps
00331   lastFragmentTime_ = utils::getCurrentTime();
00332   staleWindowStartTime_ = lastFragmentTime_;
00333   if (newpart.creationTime() < creationTime_)
00334   {
00335     creationTime_ = newpart.creationTime();
00336   }
00337   
00338   checkForCompleteness();
00339 }
00340 
00341 void detail::ChainData::checkForCompleteness()
00342 {
00343   if ((fragmentCount_ == expectedNumberOfFragments_) &&
00344     ((faultyBits_ & (TOTAL_COUNT_MISMATCH | FRAGMENTS_OUT_OF_ORDER | DUPLICATE_FRAGMENT)) == 0))
00345     markComplete();
00346 }
00347 
00348 void detail::ChainData::markComplete()
00349 {
00350   faultyBits_ &= ~INCOMPLETE_MESSAGE; // reset incomplete bit
00351   complete_ = true;
00352   validateAdler32Checksum();
00353 }
00354 
00355 void detail::ChainData::markFaulty()
00356 {
00357   faultyBits_ |= EXTERNALLY_REQUESTED;
00358 }
00359 
00360 void detail::ChainData::markCorrupt()
00361 {
00362   faultyBits_ |= CORRUPT_INITIAL_HEADER;
00363 }
00364 
00365 unsigned long* detail::ChainData::getBufferData() const
00366 {
00367   return ref_ 
00368     ?  static_cast<unsigned long*>(ref_->getDataLocation()) 
00369     : 0UL;
00370 }
00371 
00372 void detail::ChainData::swap(ChainData& other)
00373 {
00374   streamTags_.swap(other.streamTags_);
00375   eventConsumerTags_.swap(other.eventConsumerTags_);
00376   dqmEventConsumerTags_.swap(other.dqmEventConsumerTags_);
00377   std::swap(ref_, other.ref_);
00378   std::swap(complete_, other.complete_);
00379   std::swap(faultyBits_, other.faultyBits_);
00380   std::swap(messageCode_, other.messageCode_);
00381   std::swap(i2oMessageCode_, other.i2oMessageCode_);
00382   std::swap(fragKey_, other.fragKey_);
00383   std::swap(fragmentCount_, other.fragmentCount_);
00384   std::swap(expectedNumberOfFragments_, other.expectedNumberOfFragments_);
00385   std::swap(creationTime_, other.creationTime_);
00386   std::swap(lastFragmentTime_, other.lastFragmentTime_);
00387   std::swap(staleWindowStartTime_, other.staleWindowStartTime_);
00388 }
00389 
00390 size_t detail::ChainData::memoryUsed() const
00391 {
00392   size_t memoryUsed = 0;
00393   toolbox::mem::Reference* curRef = ref_;
00394   while (curRef)
00395     {
00396       memoryUsed += curRef->getDataSize();
00397       curRef = curRef->getNextReference();
00398     }
00399   return memoryUsed;
00400 }
00401 
00402 unsigned long detail::ChainData::totalDataSize() const
00403 {
00404   unsigned long totalSize = 0;
00405   toolbox::mem::Reference* curRef = ref_;
00406   for (unsigned int idx = 0; idx < fragmentCount_; ++idx)
00407     {
00408       I2O_MESSAGE_FRAME *i2oMsg =
00409         (I2O_MESSAGE_FRAME*) curRef->getDataLocation();
00410       if (!faulty())
00411         {
00412           I2O_SM_MULTIPART_MESSAGE_FRAME *smMsg =
00413             (I2O_SM_MULTIPART_MESSAGE_FRAME*) i2oMsg;
00414           totalSize += smMsg->dataSize;
00415         }
00416       else if (i2oMsg)
00417         {
00418           totalSize += (i2oMsg->MessageSize*4);
00419         }
00420       
00421       curRef = curRef->getNextReference();
00422     }
00423   return totalSize;
00424 }
00425 
00426 unsigned long detail::ChainData::dataSize(int fragmentIndex) const
00427 {
00428   toolbox::mem::Reference* curRef = ref_;
00429   for (int idx = 0; idx < fragmentIndex; ++idx)
00430     {
00431       curRef = curRef->getNextReference();
00432     }
00433   
00434   I2O_MESSAGE_FRAME *i2oMsg =
00435     (I2O_MESSAGE_FRAME*) curRef->getDataLocation();
00436   if (!faulty())
00437     {
00438       I2O_SM_MULTIPART_MESSAGE_FRAME *smMsg =
00439         (I2O_SM_MULTIPART_MESSAGE_FRAME*) i2oMsg;
00440       return smMsg->dataSize;
00441     }
00442   else if (i2oMsg)
00443     {
00444       return (i2oMsg->MessageSize*4);
00445     }
00446   return 0;
00447 }
00448 
00449 unsigned char* detail::ChainData::dataLocation(int fragmentIndex) const
00450 {
00451   toolbox::mem::Reference* curRef = ref_;
00452   for (int idx = 0; idx < fragmentIndex; ++idx)
00453     {
00454       curRef = curRef->getNextReference();
00455     }
00456   
00457   if (!faulty())
00458     {
00459       return do_fragmentLocation(static_cast<unsigned char*>
00460                                  (curRef->getDataLocation()));
00461     }
00462   else
00463     {
00464       return static_cast<unsigned char*>(curRef->getDataLocation());
00465     }
00466 }
00467 
00468 unsigned int detail::ChainData::getFragmentID(int fragmentIndex) const
00469 {
00470   toolbox::mem::Reference* curRef = ref_;
00471   for (int idx = 0; idx < fragmentIndex; ++idx)
00472     {
00473       curRef = curRef->getNextReference();
00474     }
00475   
00476   if (parsable())
00477     {
00478       I2O_SM_MULTIPART_MESSAGE_FRAME *smMsg =
00479         (I2O_SM_MULTIPART_MESSAGE_FRAME*) curRef->getDataLocation();
00480       return smMsg->frameCount;
00481     }
00482   else
00483     {
00484       return 0;
00485     }
00486 }
00487 
00488 unsigned int detail::ChainData::
00489 copyFragmentsIntoBuffer(std::vector<unsigned char>& targetBuffer) const
00490 {
00491   unsigned long fullSize = totalDataSize();
00492   if (targetBuffer.capacity() < fullSize)
00493     {
00494       targetBuffer.resize(fullSize);
00495     }
00496   unsigned char* targetLoc = (unsigned char*)&targetBuffer[0];
00497 
00498   toolbox::mem::Reference* curRef = ref_;
00499   while (curRef)
00500     {
00501       unsigned char* fragmentLoc =
00502         (unsigned char*) curRef->getDataLocation();
00503       unsigned long sourceSize = 0;
00504       unsigned char* sourceLoc = 0;
00505 
00506       if (!faulty())
00507         {
00508           I2O_SM_MULTIPART_MESSAGE_FRAME *smMsg =
00509             (I2O_SM_MULTIPART_MESSAGE_FRAME*) fragmentLoc;
00510           sourceSize = smMsg->dataSize;
00511           sourceLoc = do_fragmentLocation(fragmentLoc);
00512         }
00513       else if (fragmentLoc)
00514         {
00515           I2O_MESSAGE_FRAME *i2oMsg = (I2O_MESSAGE_FRAME*) fragmentLoc;
00516           sourceSize = i2oMsg->MessageSize * 4;
00517           sourceLoc = fragmentLoc;
00518         }
00519 
00520       if (sourceSize > 0)
00521         {
00522           std::copy(sourceLoc, sourceLoc+sourceSize, targetLoc);
00523           targetLoc += sourceSize;
00524         }
00525 
00526       curRef = curRef->getNextReference();
00527     }
00528 
00529   return static_cast<unsigned int>(fullSize);
00530 }
00531 
00532 unsigned long detail::ChainData::headerSize() const
00533 {
00534   return do_headerSize();
00535 }
00536 
00537 unsigned char* detail::ChainData::headerLocation() const
00538 {
00539   return do_headerLocation();
00540 }
00541 
00542 std::string detail::ChainData::hltURL() const
00543 {
00544   if (parsable())
00545     {
00546       I2O_SM_MULTIPART_MESSAGE_FRAME *smMsg =
00547         (I2O_SM_MULTIPART_MESSAGE_FRAME*) ref_->getDataLocation();
00548       size_t size = std::min(strlen(smMsg->hltURL),
00549                              (size_t) MAX_I2O_SM_URLCHARS);
00550       std::string URL(smMsg->hltURL, size);
00551       return URL;
00552     }
00553   else
00554     {
00555       return "unavailable";
00556     }
00557 }
00558 
00559 std::string detail::ChainData::hltClassName() const
00560 {
00561   if (parsable())
00562     {
00563       I2O_SM_MULTIPART_MESSAGE_FRAME *smMsg =
00564         (I2O_SM_MULTIPART_MESSAGE_FRAME*) ref_->getDataLocation();
00565       size_t size = std::min(strlen(smMsg->hltClassName),
00566                              (size_t) MAX_I2O_SM_URLCHARS);
00567       std::string className(smMsg->hltClassName, size);
00568       return className;
00569     }
00570   else
00571     {
00572       return "unavailable";
00573     }
00574 }
00575 
00576 std::string detail::ChainData::outputModuleLabel() const
00577 {
00578   return do_outputModuleLabel();
00579 }
00580 
00581 uint32_t detail::ChainData::outputModuleId() const
00582 {
00583   return do_outputModuleId();
00584 }
00585 
00586 uint32_t detail::ChainData::nExpectedEPs() const
00587 {
00588   return do_nExpectedEPs();
00589 }
00590 
00591 std::string detail::ChainData::topFolderName() const
00592 {
00593   return do_topFolderName();
00594 }
00595 
00596 DQMKey detail::ChainData::dqmKey() const
00597 {
00598   return do_dqmKey();
00599 }
00600 
00601 void detail::ChainData::hltTriggerNames(Strings& nameList) const
00602 {
00603   do_hltTriggerNames(nameList);
00604 }
00605 
00606 void detail::ChainData::hltTriggerSelections(Strings& nameList) const
00607 {
00608   do_hltTriggerSelections(nameList);
00609 }
00610 
00611 void detail::ChainData::l1TriggerNames(Strings& nameList) const
00612 {
00613   do_l1TriggerNames(nameList);
00614 }
00615 
00616 uint32_t detail::ChainData::hltTriggerCount() const
00617 {
00618   return do_hltTriggerCount();
00619 }
00620 
00621 void
00622 detail::ChainData::hltTriggerBits(std::vector<unsigned char>& bitList) const
00623 {
00624   do_hltTriggerBits(bitList);
00625 }
00626 
00627 void detail::ChainData::assertRunNumber(uint32_t runNumber)
00628 {
00629   do_assertRunNumber(runNumber);
00630 }
00631 
00632 uint32_t detail::ChainData::runNumber() const
00633 {
00634   return do_runNumber();
00635 }
00636 
00637 uint32_t detail::ChainData::lumiSection() const
00638 {
00639   return do_lumiSection();
00640 }
00641 
00642 uint32_t detail::ChainData::eventNumber() const
00643 {
00644   return do_eventNumber();
00645 }
00646 
00647 uint32_t detail::ChainData::adler32Checksum() const
00648 {
00649   return do_adler32Checksum();
00650 }
00651 
00652 void detail::ChainData::tagForStream(StreamID streamId)
00653 {
00654   streamTags_.push_back(streamId);
00655 }
00656 
00657 void detail::ChainData::tagForEventConsumer(QueueID queueId)
00658 {
00659   eventConsumerTags_.push_back(queueId);
00660 }
00661 
00662 void detail::ChainData::tagForDQMEventConsumer(QueueID queueId)
00663 {
00664   dqmEventConsumerTags_.push_back(queueId);
00665 }
00666 
00667 std::vector<StreamID> const& detail::ChainData::getStreamTags() const
00668 {
00669   return streamTags_;
00670 }
00671 
00672 QueueIDs const& detail::ChainData::getEventConsumerTags() const
00673 {
00674   return eventConsumerTags_;
00675 }
00676 
00677 QueueIDs const& detail::ChainData::getDQMEventConsumerTags() const
00678 {
00679   return dqmEventConsumerTags_;
00680 }
00681 
00682 unsigned int detail::ChainData::droppedEventsCount() const
00683 {
00684   return do_droppedEventsCount();
00685 }
00686 
00687 void detail::ChainData::setDroppedEventsCount(unsigned int count)
00688 {
00689   do_setDroppedEventsCount(count);
00690 }
00691 
00692 bool detail::ChainData::isEndOfLumiSectionMessage() const
00693 {
00694   return ( i2oMessageCode_ == I2O_EVM_LUMISECTION );
00695 }
00696 
00697 bool
00698 detail::ChainData::validateDataLocation(toolbox::mem::Reference* ref,
00699                                         BitMasksForFaulty maskToUse)
00700 {
00701   I2O_PRIVATE_MESSAGE_FRAME *pvtMsg =
00702     (I2O_PRIVATE_MESSAGE_FRAME*) ref->getDataLocation();
00703   if (!pvtMsg)
00704     {
00705       faultyBits_ |= maskToUse;
00706       return false;
00707     }
00708   return true;
00709 }
00710 
00711 bool
00712 detail::ChainData::validateMessageSize(toolbox::mem::Reference* ref,
00713                                        BitMasksForFaulty maskToUse)
00714 {
00715   I2O_PRIVATE_MESSAGE_FRAME *pvtMsg =
00716     (I2O_PRIVATE_MESSAGE_FRAME*) ref->getDataLocation();
00717   if ((size_t)(pvtMsg->StdMessageFrame.MessageSize*4) <
00718       sizeof(I2O_SM_MULTIPART_MESSAGE_FRAME))
00719     {
00720       faultyBits_ |= maskToUse;
00721       return false;
00722     }
00723   return true;
00724 }
00725 
00726 bool
00727 detail::ChainData::validateFragmentIndexAndCount(toolbox::mem::Reference* ref,
00728                                                  BitMasksForFaulty maskToUse)
00729 {
00730   I2O_SM_MULTIPART_MESSAGE_FRAME *smMsg =
00731     (I2O_SM_MULTIPART_MESSAGE_FRAME*) ref->getDataLocation();
00732   if (smMsg->numFrames < 1 || smMsg->frameCount >= smMsg->numFrames)
00733     {
00734       faultyBits_ |= maskToUse;
00735       return false;
00736     }
00737   return true;
00738 }
00739 
00740 bool
00741 detail::ChainData::validateExpectedFragmentCount(toolbox::mem::Reference* ref,
00742                                                  BitMasksForFaulty maskToUse)
00743 {
00744   I2O_SM_MULTIPART_MESSAGE_FRAME *smMsg =
00745     (I2O_SM_MULTIPART_MESSAGE_FRAME*) ref->getDataLocation();
00746   if (smMsg->numFrames != expectedNumberOfFragments_)
00747     {
00748       faultyBits_ |= maskToUse;
00749       return false;
00750     }
00751   return true;
00752 }
00753 
00754 bool
00755 detail::ChainData::validateFragmentOrder(toolbox::mem::Reference* ref,
00756                                          int& indexValue)
00757 {
00758   int problemCount = 0;
00759   I2O_SM_MULTIPART_MESSAGE_FRAME *smMsg =
00760     (I2O_SM_MULTIPART_MESSAGE_FRAME*) ref->getDataLocation();
00761   int thisIndex = static_cast<int>(smMsg->frameCount);
00762   if (thisIndex == indexValue)
00763     {
00764       faultyBits_ |= DUPLICATE_FRAGMENT;
00765       ++problemCount;
00766     }
00767   else if (thisIndex < indexValue)
00768     {
00769       faultyBits_ |= FRAGMENTS_OUT_OF_ORDER;
00770       ++problemCount;
00771     }
00772   indexValue = thisIndex;
00773   return (problemCount == 0);
00774 }
00775 
00776 bool
00777 detail::ChainData::validateMessageCode(toolbox::mem::Reference* ref,
00778                                        unsigned short expectedI2OMessageCode)
00779 {
00780   I2O_PRIVATE_MESSAGE_FRAME *pvtMsg =
00781     (I2O_PRIVATE_MESSAGE_FRAME*) ref->getDataLocation();
00782   if (pvtMsg->XFunctionCode != expectedI2OMessageCode)
00783     {
00784       faultyBits_ |= CORRUPT_SECONDARY_HEADER;
00785       return false;
00786     }
00787   return true;
00788 }
00789 
00790 bool detail::ChainData::validateAdler32Checksum()
00791 {
00792   if ( !complete() || !headerOkay() ) return false;
00793 
00794   const uint32_t expected = adler32Checksum();
00795   if (expected == 0) return false; // Adler32 not available
00796 
00797   const uint32_t calculated = calculateAdler32();
00798 
00799   if ( calculated != expected )
00800   {
00801     faultyBits_ |= WRONG_CHECKSUM;
00802     return false;
00803   }
00804   return true;
00805 }
00806 
00807 uint32_t detail::ChainData::calculateAdler32() const
00808 {
00809   uint32_t adler = adler32(0L, 0, 0);
00810   
00811   toolbox::mem::Reference* curRef = ref_;
00812 
00813   I2O_SM_MULTIPART_MESSAGE_FRAME* smMsg =
00814     (I2O_SM_MULTIPART_MESSAGE_FRAME*) curRef->getDataLocation();
00815 
00816   //skip event header in first fragment
00817   const unsigned long headerSize = do_headerSize();
00818   unsigned long offset = 0;
00819 
00820   const unsigned long payloadSize = curRef->getDataSize() - do_i2oFrameSize();
00821   if ( headerSize > payloadSize )
00822   {
00823     // Header continues into next fragment
00824     offset = headerSize - payloadSize;
00825   }
00826   else
00827   {
00828     const unsigned char* dataLocation = 
00829       do_fragmentLocation((unsigned char*)curRef->getDataLocation()) + headerSize;
00830     adler = adler32(adler, dataLocation, smMsg->dataSize - headerSize);
00831   }
00832 
00833   curRef = curRef->getNextReference();
00834 
00835   while (curRef)
00836   {
00837     smMsg = (I2O_SM_MULTIPART_MESSAGE_FRAME*) curRef->getDataLocation();
00838 
00839     const unsigned long payloadSize = curRef->getDataSize() - do_i2oFrameSize();
00840     if ( offset > payloadSize )
00841     {
00842       offset -= payloadSize;
00843     }
00844     else
00845     {
00846       const unsigned char* dataLocation =
00847         do_fragmentLocation((unsigned char*)curRef->getDataLocation()) + offset;
00848       adler = adler32(adler, dataLocation, smMsg->dataSize - offset);
00849       offset = 0;
00850     }
00851 
00852     curRef = curRef->getNextReference();
00853   }
00854 
00855   return adler;
00856 }
00857 
00858 std::string detail::ChainData::do_outputModuleLabel() const
00859 {
00860   std::stringstream msg;
00861   msg << "An output module label is only available from a valid, ";
00862   msg << "complete INIT message.";
00863   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00864 }
00865 
00866 uint32_t detail::ChainData::do_outputModuleId() const
00867 {
00868   std::stringstream msg;
00869   msg << "An output module ID is only available from a valid, ";
00870   msg << "complete INIT or Event message.";
00871   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00872 }
00873 
00874 uint32_t detail::ChainData::do_nExpectedEPs() const
00875 {
00876   std::stringstream msg;
00877   msg << "The number of slave EPs is only available from a valid, ";
00878   msg << "complete INIT message.";
00879   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00880 }
00881 
00882 std::string detail::ChainData::do_topFolderName() const
00883 {
00884   std::stringstream msg;
00885   msg << "A top folder name is only available from a valid, ";
00886   msg << "complete DQM event message.";
00887   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00888 }
00889 
00890 DQMKey detail::ChainData::do_dqmKey() const
00891 {
00892   std::stringstream msg;
00893   msg << "The DQM key is only available from a valid, ";
00894   msg << "complete DQM event message.";
00895   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00896 }
00897 
00898 void detail::ChainData::do_hltTriggerNames(Strings& nameList) const
00899 {
00900   std::stringstream msg;
00901   msg << "The HLT trigger names are only available from a valid, ";
00902   msg << "complete INIT message.";
00903   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00904 }
00905 
00906 void detail::ChainData::do_hltTriggerSelections(Strings& nameList) const
00907 {
00908   std::stringstream msg;
00909   msg << "The HLT trigger selections are only available from a valid, ";
00910   msg << "complete INIT message.";
00911   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00912 }
00913 
00914 void detail::ChainData::do_l1TriggerNames(Strings& nameList) const
00915 {
00916   std::stringstream msg;
00917   msg << "The L1 trigger names are only available from a valid, ";
00918   msg << "complete INIT message.";
00919   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00920 }
00921 
00922 uint32_t detail::ChainData::do_hltTriggerCount() const
00923 {
00924   std::stringstream msg;
00925   msg << "An HLT trigger count is only available from a valid, ";
00926   msg << "complete Event message.";
00927   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00928 }
00929 
00930 void 
00931 detail::ChainData::do_hltTriggerBits(std::vector<unsigned char>& bitList) const
00932 {
00933   std::stringstream msg;
00934   msg << "The HLT trigger bits are only available from a valid, ";
00935   msg << "complete Event message.";
00936   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00937 }
00938 
00939 unsigned int
00940 detail::ChainData::do_droppedEventsCount() const
00941 {
00942   std::stringstream msg;
00943   msg << "Dropped events count can only be retrieved from a valid, ";
00944   msg << "complete Event message.";
00945   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00946 }
00947 
00948 void 
00949 detail::ChainData::do_setDroppedEventsCount(unsigned int count)
00950 {
00951   std::stringstream msg;
00952   msg << "Dropped events count can only be added to a valid, ";
00953   msg << "complete Event message.";
00954   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00955 }
00956 
00957 uint32_t detail::ChainData::do_runNumber() const
00958 {
00959   std::stringstream msg;
00960   msg << "A run number is only available from a valid, ";
00961   msg << "complete EVENT or ERROR_EVENT message.";
00962   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00963 }
00964 
00965 uint32_t detail::ChainData::do_lumiSection() const
00966 {
00967   std::stringstream msg;
00968   msg << "A luminosity section is only available from a valid, ";
00969   msg << "complete EVENT or ERROR_EVENT message.";
00970   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00971 }
00972 
00973 uint32_t detail::ChainData::do_eventNumber() const
00974 {
00975   std::stringstream msg;
00976   msg << "An event number is only available from a valid, ";
00977   msg << "complete EVENT or ERROR_EVENT message.";
00978   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00979 }
00980 
00981