CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_4_5_patch3/src/EventFilter/StorageManager/src/ChainData.cc

Go to the documentation of this file.
00001 // $Id: ChainData.cc,v 1.18 2011/03/08 18:34:11 mommsen Exp $
00003 
00004 #include "IOPool/Streamer/interface/HLTInfo.h"
00005 #include "IOPool/Streamer/interface/MsgHeader.h"
00006 
00007 #include "EventFilter/Utilities/interface/i2oEvfMsgs.h"
00008 
00009 #include "EventFilter/StorageManager/interface/Utils.h"
00010 #include "EventFilter/StorageManager/interface/DQMKey.h"
00011 #include "EventFilter/StorageManager/interface/StreamID.h"
00012 #include "EventFilter/StorageManager/interface/QueueID.h"
00013 #include "EventFilter/StorageManager/interface/Exception.h"
00014 
00015 #include "EventFilter/StorageManager/src/ChainData.h"
00016 
00017 #include "interface/shared/i2oXFunctionCodes.h"
00018 #include "interface/shared/version.h"
00019 
00020 #include <stdlib.h>
00021 #include "zlib.h"
00022 
00023 
00024 using namespace stor;
00025 
00026 // A ChainData object may or may not contain a Reference.
00027 detail::ChainData::ChainData(const unsigned short i2oMessageCode,
00028                              const unsigned int messageCode) :
00029   streamTags_(),
00030   eventConsumerTags_(),
00031   dqmEventConsumerTags_(),
00032   ref_(0),
00033   complete_(false),
00034   faultyBits_(INCOMPLETE_MESSAGE),
00035   messageCode_(messageCode),
00036   i2oMessageCode_(i2oMessageCode),
00037   fragKey_(Header::INVALID,0,0,0,0,0),
00038   fragmentCount_(0),
00039   expectedNumberOfFragments_(0),
00040   rbBufferId_(0),
00041   hltLocalId_(0),
00042   hltInstance_(0),
00043   hltTid_(0),
00044   fuProcessId_(0),
00045   fuGuid_(0)
00046 {
00047   #ifdef STOR_DEBUG_CORRUPT_MESSAGES
00048   double r = rand()/static_cast<double>(RAND_MAX);
00049   if (r < 0.001)
00050   {
00051     // std::cout << "Simulating corrupt I2O message" << std::endl;
00052     // markCorrupt();
00053   }
00054   else if (r < 0.02)
00055   {
00056     std::cout << "Simulating faulty I2O message" << std::endl;
00057     markFaulty();
00058   }
00059   #endif // STOR_DEBUG_CORRUPT_MESSAGES
00060 }
00061 
00062 // A ChainData that has a Reference is in charge of releasing
00063 // it. Because releasing a Reference can throw an exception, we have
00064 // to be prepared to swallow the exception. This is fairly gross,
00065 // because we lose any exception information. But allowing an
00066 // exception to escape from a destructor is even worse, so we must
00067 // do what we must do.
00068 //
00069 detail::ChainData::~ChainData()
00070 {
00071   if (ref_) 
00072     {
00073       //std::cout << std::endl << std::endl << std::hex
00074       //          << "### releasing 0x" << ((int) ref_)
00075       //          << std::dec << std::endl << std::endl;
00076       try { ref_->release(); }
00077       catch (...) { /* swallow any exception. */ }
00078     }
00079 }
00080 
00081 bool detail::ChainData::empty() const
00082 {
00083   return !ref_;
00084 }
00085 
00086 bool detail::ChainData::complete() const
00087 {
00088   return complete_;
00089 }
00090 
00091 bool detail::ChainData::faulty() const
00092 {
00093   if (complete_)
00094     return (faultyBits_ != 0);
00095   else
00096     return (faultyBits_ != INCOMPLETE_MESSAGE);
00097 }
00098 
00099 unsigned int detail::ChainData::faultyBits() const
00100 {
00101   return faultyBits_;
00102 }
00103 
00104 bool detail::ChainData::parsable() const
00105 {
00106   return (ref_) && 
00107     ((faultyBits_ & INVALID_INITIAL_REFERENCE & ~INCOMPLETE_MESSAGE) == 0) &&
00108     ((faultyBits_ & CORRUPT_INITIAL_HEADER & ~INCOMPLETE_MESSAGE) == 0);
00109 }
00110 
00111 bool detail::ChainData::headerOkay() const
00112 {
00113   return ( (faultyBits_ & ~WRONG_CHECKSUM) == 0);
00114 }
00115 
00116 void detail::ChainData::addFirstFragment(toolbox::mem::Reference* pRef)
00117 {
00118   if (ref_)
00119   {
00120     XCEPT_RAISE(stor::exception::I2OChain, "Cannot add a first fragment to a non-empty I2OChain.");
00121   }
00122   ref_ = pRef;
00123 
00124   // Avoid the situation in which all unparsable chains
00125   // have the same fragment key.  We do this by providing a 
00126   // variable default value for one of the fragKey fields.
00127   if (pRef)
00128     {
00129       fragKey_.secondaryId_ = static_cast<uint32_t>(
00130         (uintptr_t)pRef->getDataLocation()
00131       );
00132     }
00133   else
00134     {
00135       fragKey_.secondaryId_ = static_cast<uint32_t>( time(0) );
00136     }
00137 
00138   if (pRef)
00139     {
00140       creationTime_ = utils::getCurrentTime();
00141       lastFragmentTime_ = creationTime_;
00142       staleWindowStartTime_ = creationTime_;
00143 
00144       // first fragment in Reference chain
00145       ++fragmentCount_;
00146       int workingIndex = -1;
00147 
00148       if (validateDataLocation(pRef, INVALID_INITIAL_REFERENCE) &&
00149           validateMessageSize(pRef, CORRUPT_INITIAL_HEADER) &&
00150           validateFragmentIndexAndCount(pRef, CORRUPT_INITIAL_HEADER))
00151         {
00152           I2O_SM_MULTIPART_MESSAGE_FRAME *smMsg =
00153             (I2O_SM_MULTIPART_MESSAGE_FRAME*) pRef->getDataLocation();
00154           expectedNumberOfFragments_ = smMsg->numFrames;
00155           validateFragmentOrder(pRef, workingIndex);
00156         }
00157 
00158       // subsequent fragments in Reference chain
00159       toolbox::mem::Reference* curRef = pRef->getNextReference();
00160       while (curRef)
00161         {
00162           ++fragmentCount_;
00163           
00164           if (validateDataLocation(curRef, INVALID_SECONDARY_REFERENCE) &&
00165               validateMessageSize(curRef, CORRUPT_SECONDARY_HEADER) &&
00166               validateFragmentIndexAndCount(curRef, CORRUPT_SECONDARY_HEADER))
00167             {
00168               validateExpectedFragmentCount(curRef, TOTAL_COUNT_MISMATCH);
00169               validateFragmentOrder(curRef, workingIndex);
00170               validateMessageCode(curRef, i2oMessageCode_);
00171             }
00172           
00173           curRef = curRef->getNextReference();
00174         }
00175     }
00176 
00177   checkForCompleteness();
00178 }
00179 
00180 void detail::ChainData::addToChain(ChainData const& newpart)
00181 {
00182   if ( this->empty() )
00183   {
00184     addFirstFragment(newpart.ref_);
00185     return;
00186   }
00187 
00188   if (parsable() && newpart.parsable())
00189   {
00190     // loop over the fragments in the new part
00191     toolbox::mem::Reference* newRef = newpart.ref_;
00192     while (newRef)
00193     {
00194       // unlink the next element in the new chain from that chain
00195       toolbox::mem::Reference* nextNewRef = newRef->getNextReference();
00196       newRef->setNextReference(0);
00197       
00198       // if the new fragment that we're working with is the first one
00199       // in its chain, we need to duplicate it (now that it is unlinked)
00200       // This is necessary since it is still being managed by an I2OChain
00201       // somewhere.  The subsequent fragments in the new part do not
00202       // need to be duplicated since we explicitly unlinked them from
00203       // the first one.
00204       if (newRef == newpart.ref_) {newRef = newpart.ref_->duplicate();}
00205       
00206       // we want to track whether the fragment was added (it *always* should be)
00207       bool fragmentWasAdded = false;
00208       
00209       // determine the index of the new fragment
00210       I2O_SM_MULTIPART_MESSAGE_FRAME *thatMsg =
00211         (I2O_SM_MULTIPART_MESSAGE_FRAME*) newRef->getDataLocation();
00212       unsigned int newIndex = thatMsg->frameCount;
00213       //std::cout << "newIndex = " << newIndex << std::endl;
00214       
00215       // verify that the total fragment counts match
00216       unsigned int newFragmentTotalCount = thatMsg->numFrames;
00217       if (newFragmentTotalCount != expectedNumberOfFragments_)
00218       {
00219         faultyBits_ |= TOTAL_COUNT_MISMATCH;
00220       }
00221       
00222       // if the new fragment goes at the head of the chain, handle that here
00223       I2O_SM_MULTIPART_MESSAGE_FRAME *fragMsg =
00224         (I2O_SM_MULTIPART_MESSAGE_FRAME*) ref_->getDataLocation();
00225       unsigned int firstIndex = fragMsg->frameCount;
00226       //std::cout << "firstIndex = " << firstIndex << std::endl;
00227       if (newIndex < firstIndex)
00228       {
00229         newRef->setNextReference(ref_);
00230         ref_ = newRef;
00231         fragmentWasAdded = true;
00232       }
00233 
00234       else
00235       {
00236         // loop over the existing fragments and insert the new one
00237         // in the correct place
00238         toolbox::mem::Reference* curRef = ref_;
00239         for (unsigned int idx = 0; idx < fragmentCount_; ++idx)
00240         {
00241           // if we have a duplicate fragment, add it after the existing
00242           // one and indicate the error
00243           I2O_SM_MULTIPART_MESSAGE_FRAME *curMsg =
00244             (I2O_SM_MULTIPART_MESSAGE_FRAME*) curRef->getDataLocation();
00245           unsigned int curIndex = curMsg->frameCount;
00246           //std::cout << "curIndex = " << curIndex << std::endl;
00247           if (newIndex == curIndex) 
00248           {
00249             faultyBits_ |= DUPLICATE_FRAGMENT;
00250             newRef->setNextReference(curRef->getNextReference());
00251             curRef->setNextReference(newRef);
00252             fragmentWasAdded = true;
00253             break;
00254           }
00255           
00256           // if we have reached the end of the chain, add the
00257           // new fragment to the end
00258           //std::cout << "nextRef = " << ((int) nextRef) << std::endl;
00259           toolbox::mem::Reference* nextRef = curRef->getNextReference();
00260           if (nextRef == 0)
00261           {
00262             curRef->setNextReference(newRef);
00263             fragmentWasAdded = true;
00264             break;
00265           }
00266           
00267           I2O_SM_MULTIPART_MESSAGE_FRAME *nextMsg =
00268             (I2O_SM_MULTIPART_MESSAGE_FRAME*) nextRef->getDataLocation();
00269           unsigned int nextIndex = nextMsg->frameCount;
00270           //std::cout << "nextIndex = " << nextIndex << std::endl;
00271           if (newIndex > curIndex && newIndex < nextIndex)
00272           {
00273             newRef->setNextReference(curRef->getNextReference());
00274             curRef->setNextReference(newRef);
00275             fragmentWasAdded = true;
00276             break;
00277           }
00278           
00279           curRef = nextRef;
00280         }
00281       }
00282       
00283       // update the fragment count and check if the chain is now complete
00284       if (!fragmentWasAdded)
00285       {
00286         // this should never happen - if it does, there is a logic
00287         // error in the loop above
00288         XCEPT_RAISE(stor::exception::I2OChain,
00289           "A fragment was unable to be added to a chain.");
00290       }
00291       ++fragmentCount_;
00292       
00293       newRef = nextNewRef;
00294     }
00295   }
00296   else
00297   {
00298     // if either the current chain or the newpart are nor parsable,
00299     // we simply append the new stuff to the end of the existing chain
00300     
00301     // update our fragment count to include the new fragments
00302     toolbox::mem::Reference* curRef = newpart.ref_;
00303     while (curRef) {
00304       ++fragmentCount_;
00305       curRef = curRef->getNextReference();
00306     }
00307     
00308     // append the new fragments to the end of the existing chain
00309     toolbox::mem::Reference* lastRef = ref_;
00310     curRef = ref_->getNextReference();
00311     while (curRef) {
00312       lastRef = curRef;
00313       curRef = curRef->getNextReference();
00314     }
00315     lastRef->setNextReference(newpart.ref_->duplicate());
00316     
00317     // update the time stamps
00318     lastFragmentTime_ = utils::getCurrentTime();
00319     staleWindowStartTime_ = lastFragmentTime_;
00320     if (newpart.creationTime() < creationTime_)
00321     {
00322       creationTime_ = newpart.creationTime();
00323     }
00324     
00325     return;
00326   }
00327 
00328   // merge the faulty flags from the new part into the existing flags
00329   faultyBits_ |= newpart.faultyBits_;
00330 
00331   // update the time stamps
00332   lastFragmentTime_ = utils::getCurrentTime();
00333   staleWindowStartTime_ = lastFragmentTime_;
00334   if (newpart.creationTime() < creationTime_)
00335   {
00336     creationTime_ = newpart.creationTime();
00337   }
00338   
00339   checkForCompleteness();
00340 }
00341 
00342 void detail::ChainData::checkForCompleteness()
00343 {
00344   if ((fragmentCount_ == expectedNumberOfFragments_) &&
00345     ((faultyBits_ & (TOTAL_COUNT_MISMATCH | FRAGMENTS_OUT_OF_ORDER | DUPLICATE_FRAGMENT)) == 0))
00346     markComplete();
00347 }
00348 
00349 void detail::ChainData::markComplete()
00350 {
00351   faultyBits_ &= ~INCOMPLETE_MESSAGE; // reset incomplete bit
00352   complete_ = true;
00353   validateAdler32Checksum();
00354 }
00355 
00356 void detail::ChainData::markFaulty()
00357 {
00358   faultyBits_ |= EXTERNALLY_REQUESTED;
00359 }
00360 
00361 void detail::ChainData::markCorrupt()
00362 {
00363   faultyBits_ |= CORRUPT_INITIAL_HEADER;
00364 }
00365 
00366 unsigned long* detail::ChainData::getBufferData() const
00367 {
00368   return ref_ 
00369     ?  static_cast<unsigned long*>(ref_->getDataLocation()) 
00370     : 0UL;
00371 }
00372 
00373 void detail::ChainData::swap(ChainData& other)
00374 {
00375   streamTags_.swap(other.streamTags_);
00376   eventConsumerTags_.swap(other.eventConsumerTags_);
00377   dqmEventConsumerTags_.swap(other.dqmEventConsumerTags_);
00378   std::swap(ref_, other.ref_);
00379   std::swap(complete_, other.complete_);
00380   std::swap(faultyBits_, other.faultyBits_);
00381   std::swap(messageCode_, other.messageCode_);
00382   std::swap(i2oMessageCode_, other.i2oMessageCode_);
00383   std::swap(fragKey_, other.fragKey_);
00384   std::swap(fragmentCount_, other.fragmentCount_);
00385   std::swap(expectedNumberOfFragments_, other.expectedNumberOfFragments_);
00386   std::swap(creationTime_, other.creationTime_);
00387   std::swap(lastFragmentTime_, other.lastFragmentTime_);
00388   std::swap(staleWindowStartTime_, other.staleWindowStartTime_);
00389 }
00390 
00391 size_t detail::ChainData::memoryUsed() const
00392 {
00393   size_t memoryUsed = 0;
00394   toolbox::mem::Reference* curRef = ref_;
00395   while (curRef)
00396     {
00397       memoryUsed += curRef->getDataSize();
00398       curRef = curRef->getNextReference();
00399     }
00400   return memoryUsed;
00401 }
00402 
00403 unsigned long detail::ChainData::totalDataSize() const
00404 {
00405   unsigned long totalSize = 0;
00406   toolbox::mem::Reference* curRef = ref_;
00407   for (unsigned int idx = 0; idx < fragmentCount_; ++idx)
00408     {
00409       I2O_MESSAGE_FRAME *i2oMsg =
00410         (I2O_MESSAGE_FRAME*) curRef->getDataLocation();
00411       if (!faulty())
00412         {
00413           I2O_SM_MULTIPART_MESSAGE_FRAME *smMsg =
00414             (I2O_SM_MULTIPART_MESSAGE_FRAME*) i2oMsg;
00415           totalSize += smMsg->dataSize;
00416         }
00417       else if (i2oMsg)
00418         {
00419           totalSize += (i2oMsg->MessageSize*4);
00420         }
00421       
00422       curRef = curRef->getNextReference();
00423     }
00424   return totalSize;
00425 }
00426 
00427 unsigned long detail::ChainData::dataSize(int fragmentIndex) const
00428 {
00429   toolbox::mem::Reference* curRef = ref_;
00430   for (int idx = 0; idx < fragmentIndex; ++idx)
00431     {
00432       curRef = curRef->getNextReference();
00433     }
00434   
00435   I2O_MESSAGE_FRAME *i2oMsg =
00436     (I2O_MESSAGE_FRAME*) curRef->getDataLocation();
00437   if (!faulty())
00438     {
00439       I2O_SM_MULTIPART_MESSAGE_FRAME *smMsg =
00440         (I2O_SM_MULTIPART_MESSAGE_FRAME*) i2oMsg;
00441       return smMsg->dataSize;
00442     }
00443   else if (i2oMsg)
00444     {
00445       return (i2oMsg->MessageSize*4);
00446     }
00447   return 0;
00448 }
00449 
00450 unsigned char* detail::ChainData::dataLocation(int fragmentIndex) const
00451 {
00452   toolbox::mem::Reference* curRef = ref_;
00453   for (int idx = 0; idx < fragmentIndex; ++idx)
00454     {
00455       curRef = curRef->getNextReference();
00456     }
00457   
00458   if (!faulty())
00459     {
00460       return do_fragmentLocation(static_cast<unsigned char*>
00461                                  (curRef->getDataLocation()));
00462     }
00463   else
00464     {
00465       return static_cast<unsigned char*>(curRef->getDataLocation());
00466     }
00467 }
00468 
00469 unsigned int detail::ChainData::getFragmentID(int fragmentIndex) const
00470 {
00471   toolbox::mem::Reference* curRef = ref_;
00472   for (int idx = 0; idx < fragmentIndex; ++idx)
00473     {
00474       curRef = curRef->getNextReference();
00475     }
00476   
00477   if (parsable())
00478     {
00479       I2O_SM_MULTIPART_MESSAGE_FRAME *smMsg =
00480         (I2O_SM_MULTIPART_MESSAGE_FRAME*) curRef->getDataLocation();
00481       return smMsg->frameCount;
00482     }
00483   else
00484     {
00485       return 0;
00486     }
00487 }
00488 
00489 unsigned int detail::ChainData::
00490 copyFragmentsIntoBuffer(std::vector<unsigned char>& targetBuffer) const
00491 {
00492   unsigned long fullSize = totalDataSize();
00493   if (targetBuffer.capacity() < fullSize)
00494     {
00495       targetBuffer.resize(fullSize);
00496     }
00497   unsigned char* targetLoc = (unsigned char*)&targetBuffer[0];
00498 
00499   toolbox::mem::Reference* curRef = ref_;
00500   while (curRef)
00501     {
00502       unsigned char* fragmentLoc =
00503         (unsigned char*) curRef->getDataLocation();
00504       unsigned long sourceSize = 0;
00505       unsigned char* sourceLoc = 0;
00506 
00507       if (!faulty())
00508         {
00509           I2O_SM_MULTIPART_MESSAGE_FRAME *smMsg =
00510             (I2O_SM_MULTIPART_MESSAGE_FRAME*) fragmentLoc;
00511           sourceSize = smMsg->dataSize;
00512           sourceLoc = do_fragmentLocation(fragmentLoc);
00513         }
00514       else if (fragmentLoc)
00515         {
00516           I2O_MESSAGE_FRAME *i2oMsg = (I2O_MESSAGE_FRAME*) fragmentLoc;
00517           sourceSize = i2oMsg->MessageSize * 4;
00518           sourceLoc = fragmentLoc;
00519         }
00520 
00521       if (sourceSize > 0)
00522         {
00523           std::copy(sourceLoc, sourceLoc+sourceSize, targetLoc);
00524           targetLoc += sourceSize;
00525         }
00526 
00527       curRef = curRef->getNextReference();
00528     }
00529 
00530   return static_cast<unsigned int>(fullSize);
00531 }
00532 
00533 unsigned long detail::ChainData::headerSize() const
00534 {
00535   return do_headerSize();
00536 }
00537 
00538 unsigned char* detail::ChainData::headerLocation() const
00539 {
00540   return do_headerLocation();
00541 }
00542 
00543 std::string detail::ChainData::hltURL() const
00544 {
00545   if (parsable())
00546     {
00547       I2O_SM_MULTIPART_MESSAGE_FRAME *smMsg =
00548         (I2O_SM_MULTIPART_MESSAGE_FRAME*) ref_->getDataLocation();
00549       size_t size = std::min(strlen(smMsg->hltURL),
00550                              (size_t) MAX_I2O_SM_URLCHARS);
00551       std::string URL(smMsg->hltURL, size);
00552       return URL;
00553     }
00554   else
00555     {
00556       return "unavailable";
00557     }
00558 }
00559 
00560 std::string detail::ChainData::hltClassName() const
00561 {
00562   if (parsable())
00563     {
00564       I2O_SM_MULTIPART_MESSAGE_FRAME *smMsg =
00565         (I2O_SM_MULTIPART_MESSAGE_FRAME*) ref_->getDataLocation();
00566       size_t size = std::min(strlen(smMsg->hltClassName),
00567                              (size_t) MAX_I2O_SM_URLCHARS);
00568       std::string className(smMsg->hltClassName, size);
00569       return className;
00570     }
00571   else
00572     {
00573       return "unavailable";
00574     }
00575 }
00576 
00577 uint32_t detail::ChainData::outputModuleId() const
00578 {
00579   return do_outputModuleId();
00580 }
00581 
00582 
00583 std::string detail::ChainData::outputModuleLabel() const
00584 {
00585   return do_outputModuleLabel();
00586 }
00587 
00588 std::string detail::ChainData::topFolderName() const
00589 {
00590   return do_topFolderName();
00591 }
00592 
00593 DQMKey detail::ChainData::dqmKey() const
00594 {
00595   return do_dqmKey();
00596 }
00597 
00598 void detail::ChainData::hltTriggerNames(Strings& nameList) const
00599 {
00600   do_hltTriggerNames(nameList);
00601 }
00602 
00603 void detail::ChainData::hltTriggerSelections(Strings& nameList) const
00604 {
00605   do_hltTriggerSelections(nameList);
00606 }
00607 
00608 void detail::ChainData::l1TriggerNames(Strings& nameList) const
00609 {
00610   do_l1TriggerNames(nameList);
00611 }
00612 
00613 uint32_t detail::ChainData::hltTriggerCount() const
00614 {
00615   return do_hltTriggerCount();
00616 }
00617 
00618 void
00619 detail::ChainData::hltTriggerBits(std::vector<unsigned char>& bitList) const
00620 {
00621   do_hltTriggerBits(bitList);
00622 }
00623 
00624 void detail::ChainData::assertRunNumber(uint32_t runNumber)
00625 {
00626   do_assertRunNumber(runNumber);
00627 }
00628 
00629 uint32_t detail::ChainData::runNumber() const
00630 {
00631   return do_runNumber();
00632 }
00633 
00634 uint32_t detail::ChainData::lumiSection() const
00635 {
00636   return do_lumiSection();
00637 }
00638 
00639 uint32_t detail::ChainData::eventNumber() const
00640 {
00641   return do_eventNumber();
00642 }
00643 
00644 uint32_t detail::ChainData::adler32Checksum() const
00645 {
00646   return do_adler32Checksum();
00647 }
00648 
00649 void detail::ChainData::tagForStream(StreamID streamId)
00650 {
00651   streamTags_.push_back(streamId);
00652 }
00653 
00654 void detail::ChainData::tagForEventConsumer(QueueID queueId)
00655 {
00656   eventConsumerTags_.push_back(queueId);
00657 }
00658 
00659 void detail::ChainData::tagForDQMEventConsumer(QueueID queueId)
00660 {
00661   dqmEventConsumerTags_.push_back(queueId);
00662 }
00663 
00664 std::vector<StreamID> const& detail::ChainData::getStreamTags() const
00665 {
00666   return streamTags_;
00667 }
00668 
00669 QueueIDs const& detail::ChainData::getEventConsumerTags() const
00670 {
00671   return eventConsumerTags_;
00672 }
00673 
00674 QueueIDs const& detail::ChainData::getDQMEventConsumerTags() const
00675 {
00676   return dqmEventConsumerTags_;
00677 }
00678 
00679 unsigned int detail::ChainData::droppedEventsCount() const
00680 {
00681   return do_droppedEventsCount();
00682 }
00683 
00684 void detail::ChainData::setDroppedEventsCount(unsigned int count)
00685 {
00686   do_setDroppedEventsCount(count);
00687 }
00688 
00689 bool detail::ChainData::isEndOfLumiSectionMessage() const
00690 {
00691   return ( i2oMessageCode_ == I2O_EVM_LUMISECTION );
00692 }
00693 
00694 bool
00695 detail::ChainData::validateDataLocation(toolbox::mem::Reference* ref,
00696                                         BitMasksForFaulty maskToUse)
00697 {
00698   I2O_PRIVATE_MESSAGE_FRAME *pvtMsg =
00699     (I2O_PRIVATE_MESSAGE_FRAME*) ref->getDataLocation();
00700   if (!pvtMsg)
00701     {
00702       faultyBits_ |= maskToUse;
00703       return false;
00704     }
00705   return true;
00706 }
00707 
00708 bool
00709 detail::ChainData::validateMessageSize(toolbox::mem::Reference* ref,
00710                                        BitMasksForFaulty maskToUse)
00711 {
00712   I2O_PRIVATE_MESSAGE_FRAME *pvtMsg =
00713     (I2O_PRIVATE_MESSAGE_FRAME*) ref->getDataLocation();
00714   if ((size_t)(pvtMsg->StdMessageFrame.MessageSize*4) <
00715       sizeof(I2O_SM_MULTIPART_MESSAGE_FRAME))
00716     {
00717       faultyBits_ |= maskToUse;
00718       return false;
00719     }
00720   return true;
00721 }
00722 
00723 bool
00724 detail::ChainData::validateFragmentIndexAndCount(toolbox::mem::Reference* ref,
00725                                                  BitMasksForFaulty maskToUse)
00726 {
00727   I2O_SM_MULTIPART_MESSAGE_FRAME *smMsg =
00728     (I2O_SM_MULTIPART_MESSAGE_FRAME*) ref->getDataLocation();
00729   if (smMsg->numFrames < 1 || smMsg->frameCount >= smMsg->numFrames)
00730     {
00731       faultyBits_ |= maskToUse;
00732       return false;
00733     }
00734   return true;
00735 }
00736 
00737 bool
00738 detail::ChainData::validateExpectedFragmentCount(toolbox::mem::Reference* ref,
00739                                                  BitMasksForFaulty maskToUse)
00740 {
00741   I2O_SM_MULTIPART_MESSAGE_FRAME *smMsg =
00742     (I2O_SM_MULTIPART_MESSAGE_FRAME*) ref->getDataLocation();
00743   if (smMsg->numFrames != expectedNumberOfFragments_)
00744     {
00745       faultyBits_ |= maskToUse;
00746       return false;
00747     }
00748   return true;
00749 }
00750 
00751 bool
00752 detail::ChainData::validateFragmentOrder(toolbox::mem::Reference* ref,
00753                                          int& indexValue)
00754 {
00755   int problemCount = 0;
00756   I2O_SM_MULTIPART_MESSAGE_FRAME *smMsg =
00757     (I2O_SM_MULTIPART_MESSAGE_FRAME*) ref->getDataLocation();
00758   int thisIndex = static_cast<int>(smMsg->frameCount);
00759   if (thisIndex == indexValue)
00760     {
00761       faultyBits_ |= DUPLICATE_FRAGMENT;
00762       ++problemCount;
00763     }
00764   else if (thisIndex < indexValue)
00765     {
00766       faultyBits_ |= FRAGMENTS_OUT_OF_ORDER;
00767       ++problemCount;
00768     }
00769   indexValue = thisIndex;
00770   return (problemCount == 0);
00771 }
00772 
00773 bool
00774 detail::ChainData::validateMessageCode(toolbox::mem::Reference* ref,
00775                                        unsigned short expectedI2OMessageCode)
00776 {
00777   I2O_PRIVATE_MESSAGE_FRAME *pvtMsg =
00778     (I2O_PRIVATE_MESSAGE_FRAME*) ref->getDataLocation();
00779   if (pvtMsg->XFunctionCode != expectedI2OMessageCode)
00780     {
00781       faultyBits_ |= CORRUPT_SECONDARY_HEADER;
00782       return false;
00783     }
00784   return true;
00785 }
00786 
00787 bool detail::ChainData::validateAdler32Checksum()
00788 {
00789   if ( !complete() || !headerOkay() ) return false;
00790 
00791   const uint32_t expected = adler32Checksum();
00792   if (expected == 0) return false; // Adler32 not available
00793 
00794   const uint32_t calculated = calculateAdler32();
00795 
00796   if ( calculated != expected )
00797   {
00798     faultyBits_ |= WRONG_CHECKSUM;
00799     return false;
00800   }
00801   return true;
00802 }
00803 
00804 uint32_t detail::ChainData::calculateAdler32() const
00805 {
00806   uint32_t adler = adler32(0L, 0, 0);
00807   
00808   toolbox::mem::Reference* curRef = ref_;
00809 
00810   I2O_SM_MULTIPART_MESSAGE_FRAME* smMsg =
00811     (I2O_SM_MULTIPART_MESSAGE_FRAME*) curRef->getDataLocation();
00812 
00813   //skip event header in first fragment
00814   const unsigned long headerSize = do_headerSize();
00815   unsigned long offset = 0;
00816 
00817   const unsigned long payloadSize = curRef->getDataSize() - do_i2oFrameSize();
00818   if ( headerSize > payloadSize )
00819   {
00820     // Header continues into next fragment
00821     offset = headerSize - payloadSize;
00822   }
00823   else
00824   {
00825     const unsigned char* dataLocation = 
00826       do_fragmentLocation((unsigned char*)curRef->getDataLocation()) + headerSize;
00827     adler = adler32(adler, dataLocation, smMsg->dataSize - headerSize);
00828   }
00829 
00830   curRef = curRef->getNextReference();
00831 
00832   while (curRef)
00833   {
00834     smMsg = (I2O_SM_MULTIPART_MESSAGE_FRAME*) curRef->getDataLocation();
00835 
00836     const unsigned long payloadSize = curRef->getDataSize() - do_i2oFrameSize();
00837     if ( offset > payloadSize )
00838     {
00839       offset -= payloadSize;
00840     }
00841     else
00842     {
00843       const unsigned char* dataLocation =
00844         do_fragmentLocation((unsigned char*)curRef->getDataLocation()) + offset;
00845       adler = adler32(adler, dataLocation, smMsg->dataSize - offset);
00846       offset = 0;
00847     }
00848 
00849     curRef = curRef->getNextReference();
00850   }
00851 
00852   return adler;
00853 }
00854 
00855 uint32_t detail::ChainData::do_outputModuleId() const
00856 {
00857   std::stringstream msg;
00858   msg << "An output module ID is only available from a valid, ";
00859   msg << "complete INIT or Event message.";
00860   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00861 }
00862 
00863 std::string detail::ChainData::do_outputModuleLabel() const
00864 {
00865   std::stringstream msg;
00866   msg << "An output module label is only available from a valid, ";
00867   msg << "complete INIT message.";
00868   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00869 }
00870 
00871 std::string detail::ChainData::do_topFolderName() const
00872 {
00873   std::stringstream msg;
00874   msg << "A top folder name is only available from a valid, ";
00875   msg << "complete DQM event message.";
00876   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00877 }
00878 
00879 DQMKey detail::ChainData::do_dqmKey() const
00880 {
00881   std::stringstream msg;
00882   msg << "The DQM key is only available from a valid, ";
00883   msg << "complete DQM event message.";
00884   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00885 }
00886 
00887 void detail::ChainData::do_hltTriggerNames(Strings& nameList) const
00888 {
00889   std::stringstream msg;
00890   msg << "The HLT trigger names are only available from a valid, ";
00891   msg << "complete INIT message.";
00892   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00893 }
00894 
00895 void detail::ChainData::do_hltTriggerSelections(Strings& nameList) const
00896 {
00897   std::stringstream msg;
00898   msg << "The HLT trigger selections are only available from a valid, ";
00899   msg << "complete INIT message.";
00900   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00901 }
00902 
00903 void detail::ChainData::do_l1TriggerNames(Strings& nameList) const
00904 {
00905   std::stringstream msg;
00906   msg << "The L1 trigger names are only available from a valid, ";
00907   msg << "complete INIT message.";
00908   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00909 }
00910 
00911 uint32_t detail::ChainData::do_hltTriggerCount() const
00912 {
00913   std::stringstream msg;
00914   msg << "An HLT trigger count is only available from a valid, ";
00915   msg << "complete Event message.";
00916   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00917 }
00918 
00919 void 
00920 detail::ChainData::do_hltTriggerBits(std::vector<unsigned char>& bitList) const
00921 {
00922   std::stringstream msg;
00923   msg << "The HLT trigger bits are only available from a valid, ";
00924   msg << "complete Event message.";
00925   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00926 }
00927 
00928 unsigned int
00929 detail::ChainData::do_droppedEventsCount() const
00930 {
00931   std::stringstream msg;
00932   msg << "Dropped events count can only be retrieved from a valid, ";
00933   msg << "complete Event message.";
00934   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00935 }
00936 
00937 void 
00938 detail::ChainData::do_setDroppedEventsCount(unsigned int count)
00939 {
00940   std::stringstream msg;
00941   msg << "Dropped events count can only be added to a valid, ";
00942   msg << "complete Event message.";
00943   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00944 }
00945 
00946 uint32_t detail::ChainData::do_runNumber() const
00947 {
00948   std::stringstream msg;
00949   msg << "A run number is only available from a valid, ";
00950   msg << "complete EVENT or ERROR_EVENT message.";
00951   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00952 }
00953 
00954 uint32_t detail::ChainData::do_lumiSection() const
00955 {
00956   std::stringstream msg;
00957   msg << "A luminosity section is only available from a valid, ";
00958   msg << "complete EVENT or ERROR_EVENT message.";
00959   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00960 }
00961 
00962 uint32_t detail::ChainData::do_eventNumber() const
00963 {
00964   std::stringstream msg;
00965   msg << "An event number is only available from a valid, ";
00966   msg << "complete EVENT or ERROR_EVENT message.";
00967   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00968 }
00969 
00970