CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_4_1_8_patch9/src/EventFilter/StorageManager/src/ChainData.cc

Go to the documentation of this file.
00001 // $Id: ChainData.cc,v 1.14.4.1 2011/03/07 11:33:04 mommsen Exp $
00003 
00004 #include "IOPool/Streamer/interface/HLTInfo.h"
00005 #include "IOPool/Streamer/interface/MsgHeader.h"
00006 
00007 #include "EventFilter/Utilities/interface/i2oEvfMsgs.h"
00008 
00009 #include "EventFilter/StorageManager/interface/Utils.h"
00010 #include "EventFilter/StorageManager/interface/DQMKey.h"
00011 #include "EventFilter/StorageManager/interface/StreamID.h"
00012 #include "EventFilter/StorageManager/interface/QueueID.h"
00013 #include "EventFilter/StorageManager/interface/Exception.h"
00014 
00015 #include "EventFilter/StorageManager/src/ChainData.h"
00016 
00017 #include "interface/shared/i2oXFunctionCodes.h"
00018 #include "interface/shared/version.h"
00019 
00020 #include <stdlib.h>
00021 #include "zlib.h"
00022 
00023 
00024 using namespace stor;
00025 
00026 // A ChainData object may or may not contain a Reference.
00027 detail::ChainData::ChainData(const unsigned short i2oMessageCode,
00028                              const unsigned int messageCode) :
00029   streamTags_(),
00030   eventConsumerTags_(),
00031   dqmEventConsumerTags_(),
00032   ref_(0),
00033   complete_(false),
00034   faultyBits_(INCOMPLETE_MESSAGE),
00035   messageCode_(messageCode),
00036   i2oMessageCode_(i2oMessageCode),
00037   fragKey_(Header::INVALID,0,0,0,0,0),
00038   fragmentCount_(0),
00039   expectedNumberOfFragments_(0),
00040   rbBufferId_(0),
00041   hltLocalId_(0),
00042   hltInstance_(0),
00043   hltTid_(0),
00044   fuProcessId_(0),
00045   fuGuid_(0)
00046 {
00047   #ifdef STOR_DEBUG_CORRUPT_MESSAGES
00048   double r = rand()/static_cast<double>(RAND_MAX);
00049   if (r < 0.001)
00050   {
00051     // std::cout << "Simulating corrupt I2O message" << std::endl;
00052     // markCorrupt();
00053   }
00054   else if (r < 0.02)
00055   {
00056     std::cout << "Simulating faulty I2O message" << std::endl;
00057     markFaulty();
00058   }
00059   #endif // STOR_DEBUG_CORRUPT_MESSAGES
00060 }
00061 
00062 // A ChainData that has a Reference is in charge of releasing
00063 // it. Because releasing a Reference can throw an exception, we have
00064 // to be prepared to swallow the exception. This is fairly gross,
00065 // because we lose any exception information. But allowing an
00066 // exception to escape from a destructor is even worse, so we must
00067 // do what we must do.
00068 //
00069 detail::ChainData::~ChainData()
00070 {
00071   if (ref_) 
00072     {
00073       //std::cout << std::endl << std::endl << std::hex
00074       //          << "### releasing 0x" << ((int) ref_)
00075       //          << std::dec << std::endl << std::endl;
00076       try { ref_->release(); }
00077       catch (...) { /* swallow any exception. */ }
00078     }
00079 }
00080 
00081 bool detail::ChainData::empty() const
00082 {
00083   return !ref_;
00084 }
00085 
00086 bool detail::ChainData::complete() const
00087 {
00088   return complete_;
00089 }
00090 
00091 bool detail::ChainData::faulty() const
00092 {
00093   if (complete_)
00094     return (faultyBits_ != 0);
00095   else
00096     return (faultyBits_ != INCOMPLETE_MESSAGE);
00097 }
00098 
00099 unsigned int detail::ChainData::faultyBits() const
00100 {
00101   return faultyBits_;
00102 }
00103 
00104 bool detail::ChainData::parsable() const
00105 {
00106   return (ref_) && 
00107     ((faultyBits_ & INVALID_INITIAL_REFERENCE & ~INCOMPLETE_MESSAGE) == 0) &&
00108     ((faultyBits_ & CORRUPT_INITIAL_HEADER & ~INCOMPLETE_MESSAGE) == 0);
00109 }
00110 
00111 bool detail::ChainData::headerOkay() const
00112 {
00113   return ( (faultyBits_ & ~WRONG_CHECKSUM) == 0);
00114 }
00115 
00116 void detail::ChainData::addFirstFragment(toolbox::mem::Reference* pRef)
00117 {
00118   if (ref_)
00119   {
00120     XCEPT_RAISE(stor::exception::I2OChain, "Cannot add a first fragment to a non-empty I2OChain.");
00121   }
00122   ref_ = pRef;
00123 
00124   // Avoid the situation in which all unparsable chains
00125   // have the same fragment key.  We do this by providing a 
00126   // variable default value for one of the fragKey fields.
00127   if (pRef)
00128     {
00129       fragKey_.secondaryId_ = static_cast<uint32_t>(
00130         (uintptr_t)pRef->getDataLocation()
00131       );
00132     }
00133   else
00134     {
00135       fragKey_.secondaryId_ = static_cast<uint32_t>( time(0) );
00136     }
00137 
00138   if (pRef)
00139     {
00140       creationTime_ = utils::getCurrentTime();
00141       lastFragmentTime_ = creationTime_;
00142       staleWindowStartTime_ = creationTime_;
00143 
00144       // first fragment in Reference chain
00145       ++fragmentCount_;
00146       int workingIndex = -1;
00147 
00148       if (validateDataLocation(pRef, INVALID_INITIAL_REFERENCE) &&
00149           validateMessageSize(pRef, CORRUPT_INITIAL_HEADER) &&
00150           validateFragmentIndexAndCount(pRef, CORRUPT_INITIAL_HEADER))
00151         {
00152           I2O_SM_MULTIPART_MESSAGE_FRAME *smMsg =
00153             (I2O_SM_MULTIPART_MESSAGE_FRAME*) pRef->getDataLocation();
00154           expectedNumberOfFragments_ = smMsg->numFrames;
00155           validateFragmentOrder(pRef, workingIndex);
00156         }
00157 
00158       // subsequent fragments in Reference chain
00159       toolbox::mem::Reference* curRef = pRef->getNextReference();
00160       while (curRef)
00161         {
00162           ++fragmentCount_;
00163           
00164           if (validateDataLocation(curRef, INVALID_SECONDARY_REFERENCE) &&
00165               validateMessageSize(curRef, CORRUPT_SECONDARY_HEADER) &&
00166               validateFragmentIndexAndCount(curRef, CORRUPT_SECONDARY_HEADER))
00167             {
00168               validateExpectedFragmentCount(curRef, TOTAL_COUNT_MISMATCH);
00169               validateFragmentOrder(curRef, workingIndex);
00170               validateMessageCode(curRef, i2oMessageCode_);
00171             }
00172           
00173           curRef = curRef->getNextReference();
00174         }
00175     }
00176 
00177   checkForCompleteness();
00178 }
00179 
00180 void detail::ChainData::addToChain(ChainData const& newpart)
00181 {
00182   if ( this->empty() )
00183   {
00184     addFirstFragment(newpart.ref_);
00185     return;
00186   }
00187 
00188   if (parsable() && newpart.parsable())
00189   {
00190     // loop over the fragments in the new part
00191     toolbox::mem::Reference* newRef = newpart.ref_;
00192     while (newRef)
00193     {
00194       // unlink the next element in the new chain from that chain
00195       toolbox::mem::Reference* nextNewRef = newRef->getNextReference();
00196       newRef->setNextReference(0);
00197       
00198       // if the new fragment that we're working with is the first one
00199       // in its chain, we need to duplicate it (now that it is unlinked)
00200       // This is necessary since it is still being managed by an I2OChain
00201       // somewhere.  The subsequent fragments in the new part do not
00202       // need to be duplicated since we explicitly unlinked them from
00203       // the first one.
00204       if (newRef == newpart.ref_) {newRef = newpart.ref_->duplicate();}
00205       
00206       // we want to track whether the fragment was added (it *always* should be)
00207       bool fragmentWasAdded = false;
00208       
00209       // determine the index of the new fragment
00210       I2O_SM_MULTIPART_MESSAGE_FRAME *thatMsg =
00211         (I2O_SM_MULTIPART_MESSAGE_FRAME*) newRef->getDataLocation();
00212       unsigned int newIndex = thatMsg->frameCount;
00213       //std::cout << "newIndex = " << newIndex << std::endl;
00214       
00215       // verify that the total fragment counts match
00216       unsigned int newFragmentTotalCount = thatMsg->numFrames;
00217       if (newFragmentTotalCount != expectedNumberOfFragments_)
00218       {
00219         faultyBits_ |= TOTAL_COUNT_MISMATCH;
00220       }
00221       
00222       // if the new fragment goes at the head of the chain, handle that here
00223       I2O_SM_MULTIPART_MESSAGE_FRAME *fragMsg =
00224         (I2O_SM_MULTIPART_MESSAGE_FRAME*) ref_->getDataLocation();
00225       unsigned int firstIndex = fragMsg->frameCount;
00226       //std::cout << "firstIndex = " << firstIndex << std::endl;
00227       if (newIndex < firstIndex)
00228       {
00229         newRef->setNextReference(ref_);
00230         ref_ = newRef;
00231         fragmentWasAdded = true;
00232       }
00233 
00234       else
00235       {
00236         // loop over the existing fragments and insert the new one
00237         // in the correct place
00238         toolbox::mem::Reference* curRef = ref_;
00239         for (unsigned int idx = 0; idx < fragmentCount_; ++idx)
00240         {
00241           // if we have a duplicate fragment, add it after the existing
00242           // one and indicate the error
00243           I2O_SM_MULTIPART_MESSAGE_FRAME *curMsg =
00244             (I2O_SM_MULTIPART_MESSAGE_FRAME*) curRef->getDataLocation();
00245           unsigned int curIndex = curMsg->frameCount;
00246           //std::cout << "curIndex = " << curIndex << std::endl;
00247           if (newIndex == curIndex) 
00248           {
00249             faultyBits_ |= DUPLICATE_FRAGMENT;
00250             newRef->setNextReference(curRef->getNextReference());
00251             curRef->setNextReference(newRef);
00252             fragmentWasAdded = true;
00253             break;
00254           }
00255           
00256           // if we have reached the end of the chain, add the
00257           // new fragment to the end
00258           //std::cout << "nextRef = " << ((int) nextRef) << std::endl;
00259           toolbox::mem::Reference* nextRef = curRef->getNextReference();
00260           if (nextRef == 0)
00261           {
00262             curRef->setNextReference(newRef);
00263             fragmentWasAdded = true;
00264             break;
00265           }
00266           
00267           I2O_SM_MULTIPART_MESSAGE_FRAME *nextMsg =
00268             (I2O_SM_MULTIPART_MESSAGE_FRAME*) nextRef->getDataLocation();
00269           unsigned int nextIndex = nextMsg->frameCount;
00270           //std::cout << "nextIndex = " << nextIndex << std::endl;
00271           if (newIndex > curIndex && newIndex < nextIndex)
00272           {
00273             newRef->setNextReference(curRef->getNextReference());
00274             curRef->setNextReference(newRef);
00275             fragmentWasAdded = true;
00276             break;
00277           }
00278           
00279           curRef = nextRef;
00280         }
00281       }
00282       
00283       // update the fragment count and check if the chain is now complete
00284       if (!fragmentWasAdded)
00285       {
00286         // this should never happen - if it does, there is a logic
00287         // error in the loop above
00288         XCEPT_RAISE(stor::exception::I2OChain,
00289           "A fragment was unable to be added to a chain.");
00290       }
00291       ++fragmentCount_;
00292       
00293       newRef = nextNewRef;
00294     }
00295   }
00296   else
00297   {
00298     // if either the current chain or the newpart are nor parsable,
00299     // we simply append the new stuff to the end of the existing chain
00300     
00301     // update our fragment count to include the new fragments
00302     toolbox::mem::Reference* curRef = newpart.ref_;
00303     while (curRef) {
00304       ++fragmentCount_;
00305       curRef = curRef->getNextReference();
00306     }
00307     
00308     // append the new fragments to the end of the existing chain
00309     toolbox::mem::Reference* lastRef = ref_;
00310     curRef = ref_->getNextReference();
00311     while (curRef) {
00312       lastRef = curRef;
00313       curRef = curRef->getNextReference();
00314     }
00315     lastRef->setNextReference(newpart.ref_->duplicate());
00316     
00317     // update the time stamps
00318     lastFragmentTime_ = utils::getCurrentTime();
00319     staleWindowStartTime_ = lastFragmentTime_;
00320     if (newpart.creationTime() < creationTime_)
00321     {
00322       creationTime_ = newpart.creationTime();
00323     }
00324     
00325     return;
00326   }
00327 
00328   // merge the faulty flags from the new part into the existing flags
00329   faultyBits_ |= newpart.faultyBits_;
00330 
00331   // update the time stamps
00332   lastFragmentTime_ = utils::getCurrentTime();
00333   staleWindowStartTime_ = lastFragmentTime_;
00334   if (newpart.creationTime() < creationTime_)
00335   {
00336     creationTime_ = newpart.creationTime();
00337   }
00338   
00339   checkForCompleteness();
00340 }
00341 
00342 void detail::ChainData::checkForCompleteness()
00343 {
00344   if ((fragmentCount_ == expectedNumberOfFragments_) &&
00345     ((faultyBits_ & (TOTAL_COUNT_MISMATCH | FRAGMENTS_OUT_OF_ORDER | DUPLICATE_FRAGMENT)) == 0))
00346     markComplete();
00347 }
00348 
00349 void detail::ChainData::markComplete()
00350 {
00351   faultyBits_ &= ~INCOMPLETE_MESSAGE; // reset incomplete bit
00352   complete_ = true;
00353   validateAdler32Checksum();
00354 }
00355 
00356 void detail::ChainData::markFaulty()
00357 {
00358   faultyBits_ |= EXTERNALLY_REQUESTED;
00359 }
00360 
00361 void detail::ChainData::markCorrupt()
00362 {
00363   faultyBits_ |= CORRUPT_INITIAL_HEADER;
00364 }
00365 
00366 unsigned long* detail::ChainData::getBufferData() const
00367 {
00368   return ref_ 
00369     ?  static_cast<unsigned long*>(ref_->getDataLocation()) 
00370     : 0UL;
00371 }
00372 
00373 void detail::ChainData::swap(ChainData& other)
00374 {
00375   streamTags_.swap(other.streamTags_);
00376   eventConsumerTags_.swap(other.eventConsumerTags_);
00377   dqmEventConsumerTags_.swap(other.dqmEventConsumerTags_);
00378   std::swap(ref_, other.ref_);
00379   std::swap(complete_, other.complete_);
00380   std::swap(faultyBits_, other.faultyBits_);
00381   std::swap(messageCode_, other.messageCode_);
00382   std::swap(i2oMessageCode_, other.i2oMessageCode_);
00383   std::swap(fragKey_, other.fragKey_);
00384   std::swap(fragmentCount_, other.fragmentCount_);
00385   std::swap(expectedNumberOfFragments_, other.expectedNumberOfFragments_);
00386   std::swap(creationTime_, other.creationTime_);
00387   std::swap(lastFragmentTime_, other.lastFragmentTime_);
00388   std::swap(staleWindowStartTime_, other.staleWindowStartTime_);
00389 }
00390 
00391 size_t detail::ChainData::memoryUsed() const
00392 {
00393   size_t memoryUsed = 0;
00394   toolbox::mem::Reference* curRef = ref_;
00395   while (curRef)
00396     {
00397       memoryUsed += curRef->getDataSize();
00398       curRef = curRef->getNextReference();
00399     }
00400   return memoryUsed;
00401 }
00402 
00403 unsigned long detail::ChainData::totalDataSize() const
00404 {
00405   unsigned long totalSize = 0;
00406   toolbox::mem::Reference* curRef = ref_;
00407   for (unsigned int idx = 0; idx < fragmentCount_; ++idx)
00408     {
00409       I2O_MESSAGE_FRAME *i2oMsg =
00410         (I2O_MESSAGE_FRAME*) curRef->getDataLocation();
00411       if (!faulty())
00412         {
00413           I2O_SM_MULTIPART_MESSAGE_FRAME *smMsg =
00414             (I2O_SM_MULTIPART_MESSAGE_FRAME*) i2oMsg;
00415           totalSize += smMsg->dataSize;
00416         }
00417       else if (i2oMsg)
00418         {
00419           totalSize += (i2oMsg->MessageSize*4);
00420         }
00421       
00422       curRef = curRef->getNextReference();
00423     }
00424   return totalSize;
00425 }
00426 
00427 unsigned long detail::ChainData::dataSize(int fragmentIndex) const
00428 {
00429   toolbox::mem::Reference* curRef = ref_;
00430   for (int idx = 0; idx < fragmentIndex; ++idx)
00431     {
00432       curRef = curRef->getNextReference();
00433     }
00434   
00435   I2O_MESSAGE_FRAME *i2oMsg =
00436     (I2O_MESSAGE_FRAME*) curRef->getDataLocation();
00437   if (!faulty())
00438     {
00439       I2O_SM_MULTIPART_MESSAGE_FRAME *smMsg =
00440         (I2O_SM_MULTIPART_MESSAGE_FRAME*) i2oMsg;
00441       return smMsg->dataSize;
00442     }
00443   else if (i2oMsg)
00444     {
00445       return (i2oMsg->MessageSize*4);
00446     }
00447   return 0;
00448 }
00449 
00450 unsigned char* detail::ChainData::dataLocation(int fragmentIndex) const
00451 {
00452   toolbox::mem::Reference* curRef = ref_;
00453   for (int idx = 0; idx < fragmentIndex; ++idx)
00454     {
00455       curRef = curRef->getNextReference();
00456     }
00457   
00458   if (!faulty())
00459     {
00460       return do_fragmentLocation(static_cast<unsigned char*>
00461                                  (curRef->getDataLocation()));
00462     }
00463   else
00464     {
00465       return static_cast<unsigned char*>(curRef->getDataLocation());
00466     }
00467 }
00468 
00469 unsigned int detail::ChainData::getFragmentID(int fragmentIndex) const
00470 {
00471   toolbox::mem::Reference* curRef = ref_;
00472   for (int idx = 0; idx < fragmentIndex; ++idx)
00473     {
00474       curRef = curRef->getNextReference();
00475     }
00476   
00477   if (parsable())
00478     {
00479       I2O_SM_MULTIPART_MESSAGE_FRAME *smMsg =
00480         (I2O_SM_MULTIPART_MESSAGE_FRAME*) curRef->getDataLocation();
00481       return smMsg->frameCount;
00482     }
00483   else
00484     {
00485       return 0;
00486     }
00487 }
00488 
00489 unsigned int detail::ChainData::
00490 copyFragmentsIntoBuffer(std::vector<unsigned char>& targetBuffer) const
00491 {
00492   unsigned long fullSize = totalDataSize();
00493   if (targetBuffer.capacity() < fullSize)
00494     {
00495       targetBuffer.resize(fullSize);
00496     }
00497   unsigned char* targetLoc = (unsigned char*)&targetBuffer[0];
00498 
00499   toolbox::mem::Reference* curRef = ref_;
00500   while (curRef)
00501     {
00502       unsigned char* fragmentLoc =
00503         (unsigned char*) curRef->getDataLocation();
00504       unsigned long sourceSize = 0;
00505       unsigned char* sourceLoc = 0;
00506 
00507       if (!faulty())
00508         {
00509           I2O_SM_MULTIPART_MESSAGE_FRAME *smMsg =
00510             (I2O_SM_MULTIPART_MESSAGE_FRAME*) fragmentLoc;
00511           sourceSize = smMsg->dataSize;
00512           sourceLoc = do_fragmentLocation(fragmentLoc);
00513         }
00514       else if (fragmentLoc)
00515         {
00516           I2O_MESSAGE_FRAME *i2oMsg = (I2O_MESSAGE_FRAME*) fragmentLoc;
00517           sourceSize = i2oMsg->MessageSize * 4;
00518           sourceLoc = fragmentLoc;
00519         }
00520 
00521       if (sourceSize > 0)
00522         {
00523           std::copy(sourceLoc, sourceLoc+sourceSize, targetLoc);
00524           targetLoc += sourceSize;
00525         }
00526 
00527       curRef = curRef->getNextReference();
00528     }
00529 
00530   return static_cast<unsigned int>(fullSize);
00531 }
00532 
00533 unsigned long detail::ChainData::headerSize() const
00534 {
00535   return do_headerSize();
00536 }
00537 
00538 unsigned char* detail::ChainData::headerLocation() const
00539 {
00540   return do_headerLocation();
00541 }
00542 
00543 std::string detail::ChainData::hltURL() const
00544 {
00545   if (parsable())
00546     {
00547       I2O_SM_MULTIPART_MESSAGE_FRAME *smMsg =
00548         (I2O_SM_MULTIPART_MESSAGE_FRAME*) ref_->getDataLocation();
00549       size_t size = std::min(strlen(smMsg->hltURL),
00550                              (size_t) MAX_I2O_SM_URLCHARS);
00551       std::string URL(smMsg->hltURL, size);
00552       return URL;
00553     }
00554   else
00555     {
00556       return "unavailable";
00557     }
00558 }
00559 
00560 std::string detail::ChainData::hltClassName() const
00561 {
00562   if (parsable())
00563     {
00564       I2O_SM_MULTIPART_MESSAGE_FRAME *smMsg =
00565         (I2O_SM_MULTIPART_MESSAGE_FRAME*) ref_->getDataLocation();
00566       size_t size = std::min(strlen(smMsg->hltClassName),
00567                              (size_t) MAX_I2O_SM_URLCHARS);
00568       std::string className(smMsg->hltClassName, size);
00569       return className;
00570     }
00571   else
00572     {
00573       return "unavailable";
00574     }
00575 }
00576 
00577 uint32_t detail::ChainData::outputModuleId() const
00578 {
00579   return do_outputModuleId();
00580 }
00581 
00582 
00583 std::string detail::ChainData::outputModuleLabel() const
00584 {
00585   return do_outputModuleLabel();
00586 }
00587 
00588 std::string detail::ChainData::topFolderName() const
00589 {
00590   return do_topFolderName();
00591 }
00592 
00593 DQMKey detail::ChainData::dqmKey() const
00594 {
00595   return do_dqmKey();
00596 }
00597 
00598 void detail::ChainData::hltTriggerNames(Strings& nameList) const
00599 {
00600   do_hltTriggerNames(nameList);
00601 }
00602 
00603 void detail::ChainData::hltTriggerSelections(Strings& nameList) const
00604 {
00605   do_hltTriggerSelections(nameList);
00606 }
00607 
00608 void detail::ChainData::l1TriggerNames(Strings& nameList) const
00609 {
00610   do_l1TriggerNames(nameList);
00611 }
00612 
00613 uint32_t detail::ChainData::hltTriggerCount() const
00614 {
00615   return do_hltTriggerCount();
00616 }
00617 
00618 void
00619 detail::ChainData::hltTriggerBits(std::vector<unsigned char>& bitList) const
00620 {
00621   do_hltTriggerBits(bitList);
00622 }
00623 
00624 void detail::ChainData::assertRunNumber(uint32_t runNumber)
00625 {
00626   do_assertRunNumber(runNumber);
00627 }
00628 
00629 uint32_t detail::ChainData::runNumber() const
00630 {
00631   return do_runNumber();
00632 }
00633 
00634 uint32_t detail::ChainData::lumiSection() const
00635 {
00636   return do_lumiSection();
00637 }
00638 
00639 uint32_t detail::ChainData::eventNumber() const
00640 {
00641   return do_eventNumber();
00642 }
00643 
00644 uint32_t detail::ChainData::adler32Checksum() const
00645 {
00646   return do_adler32Checksum();
00647 }
00648 
00649 void detail::ChainData::tagForStream(StreamID streamId)
00650 {
00651   streamTags_.push_back(streamId);
00652 }
00653 
00654 void detail::ChainData::tagForEventConsumer(QueueID queueId)
00655 {
00656   eventConsumerTags_.push_back(queueId);
00657 }
00658 
00659 void detail::ChainData::tagForDQMEventConsumer(QueueID queueId)
00660 {
00661   dqmEventConsumerTags_.push_back(queueId);
00662 }
00663 
00664 std::vector<StreamID> const& detail::ChainData::getStreamTags() const
00665 {
00666   return streamTags_;
00667 }
00668 
00669 QueueIDs const& detail::ChainData::getEventConsumerTags() const
00670 {
00671   return eventConsumerTags_;
00672 }
00673 
00674 QueueIDs const& detail::ChainData::getDQMEventConsumerTags() const
00675 {
00676   return dqmEventConsumerTags_;
00677 }
00678 
00679 bool detail::ChainData::isEndOfLumiSectionMessage() const
00680 {
00681   return ( i2oMessageCode_ == I2O_EVM_LUMISECTION );
00682 }
00683 
00684 bool
00685 detail::ChainData::validateDataLocation(toolbox::mem::Reference* ref,
00686                                         BitMasksForFaulty maskToUse)
00687 {
00688   I2O_PRIVATE_MESSAGE_FRAME *pvtMsg =
00689     (I2O_PRIVATE_MESSAGE_FRAME*) ref->getDataLocation();
00690   if (!pvtMsg)
00691     {
00692       faultyBits_ |= maskToUse;
00693       return false;
00694     }
00695   return true;
00696 }
00697 
00698 bool
00699 detail::ChainData::validateMessageSize(toolbox::mem::Reference* ref,
00700                                        BitMasksForFaulty maskToUse)
00701 {
00702   I2O_PRIVATE_MESSAGE_FRAME *pvtMsg =
00703     (I2O_PRIVATE_MESSAGE_FRAME*) ref->getDataLocation();
00704   if ((size_t)(pvtMsg->StdMessageFrame.MessageSize*4) <
00705       sizeof(I2O_SM_MULTIPART_MESSAGE_FRAME))
00706     {
00707       faultyBits_ |= maskToUse;
00708       return false;
00709     }
00710   return true;
00711 }
00712 
00713 bool
00714 detail::ChainData::validateFragmentIndexAndCount(toolbox::mem::Reference* ref,
00715                                                  BitMasksForFaulty maskToUse)
00716 {
00717   I2O_SM_MULTIPART_MESSAGE_FRAME *smMsg =
00718     (I2O_SM_MULTIPART_MESSAGE_FRAME*) ref->getDataLocation();
00719   if (smMsg->numFrames < 1 || smMsg->frameCount >= smMsg->numFrames)
00720     {
00721       faultyBits_ |= maskToUse;
00722       return false;
00723     }
00724   return true;
00725 }
00726 
00727 bool
00728 detail::ChainData::validateExpectedFragmentCount(toolbox::mem::Reference* ref,
00729                                                  BitMasksForFaulty maskToUse)
00730 {
00731   I2O_SM_MULTIPART_MESSAGE_FRAME *smMsg =
00732     (I2O_SM_MULTIPART_MESSAGE_FRAME*) ref->getDataLocation();
00733   if (smMsg->numFrames != expectedNumberOfFragments_)
00734     {
00735       faultyBits_ |= maskToUse;
00736       return false;
00737     }
00738   return true;
00739 }
00740 
00741 bool
00742 detail::ChainData::validateFragmentOrder(toolbox::mem::Reference* ref,
00743                                          int& indexValue)
00744 {
00745   int problemCount = 0;
00746   I2O_SM_MULTIPART_MESSAGE_FRAME *smMsg =
00747     (I2O_SM_MULTIPART_MESSAGE_FRAME*) ref->getDataLocation();
00748   int thisIndex = static_cast<int>(smMsg->frameCount);
00749   if (thisIndex == indexValue)
00750     {
00751       faultyBits_ |= DUPLICATE_FRAGMENT;
00752       ++problemCount;
00753     }
00754   else if (thisIndex < indexValue)
00755     {
00756       faultyBits_ |= FRAGMENTS_OUT_OF_ORDER;
00757       ++problemCount;
00758     }
00759   indexValue = thisIndex;
00760   return (problemCount == 0);
00761 }
00762 
00763 bool
00764 detail::ChainData::validateMessageCode(toolbox::mem::Reference* ref,
00765                                        unsigned short expectedI2OMessageCode)
00766 {
00767   I2O_PRIVATE_MESSAGE_FRAME *pvtMsg =
00768     (I2O_PRIVATE_MESSAGE_FRAME*) ref->getDataLocation();
00769   if (pvtMsg->XFunctionCode != expectedI2OMessageCode)
00770     {
00771       faultyBits_ |= CORRUPT_SECONDARY_HEADER;
00772       return false;
00773     }
00774   return true;
00775 }
00776 
00777 bool detail::ChainData::validateAdler32Checksum()
00778 {
00779   if ( !complete() || !headerOkay() ) return false;
00780 
00781   const uint32_t expected = adler32Checksum();
00782   if (expected == 0) return false; // Adler32 not available
00783 
00784   const uint32_t calculated = calculateAdler32();
00785 
00786   if ( calculated != expected )
00787   {
00788     faultyBits_ |= WRONG_CHECKSUM;
00789     return false;
00790   }
00791   return true;
00792 }
00793 
00794 uint32_t detail::ChainData::calculateAdler32() const
00795 {
00796   uint32_t adler = adler32(0L, 0, 0);
00797   
00798   toolbox::mem::Reference* curRef = ref_;
00799 
00800   I2O_SM_MULTIPART_MESSAGE_FRAME* smMsg =
00801     (I2O_SM_MULTIPART_MESSAGE_FRAME*) curRef->getDataLocation();
00802 
00803   //skip event header in first fragment
00804   const unsigned long headerSize = do_headerSize();
00805   unsigned long offset = 0;
00806 
00807   const unsigned long payloadSize = curRef->getDataSize() - do_i2oFrameSize();
00808   if ( headerSize > payloadSize )
00809   {
00810     // Header continues into next fragment
00811     offset = headerSize - payloadSize;
00812   }
00813   else
00814   {
00815     const unsigned char* dataLocation = 
00816       do_fragmentLocation((unsigned char*)curRef->getDataLocation()) + headerSize;
00817     adler = adler32(adler, dataLocation, smMsg->dataSize - headerSize);
00818   }
00819 
00820   curRef = curRef->getNextReference();
00821 
00822   while (curRef)
00823   {
00824     smMsg = (I2O_SM_MULTIPART_MESSAGE_FRAME*) curRef->getDataLocation();
00825 
00826     const unsigned long payloadSize = curRef->getDataSize() - do_i2oFrameSize();
00827     if ( offset > payloadSize )
00828     {
00829       offset -= payloadSize;
00830     }
00831     else
00832     {
00833       const unsigned char* dataLocation =
00834         do_fragmentLocation((unsigned char*)curRef->getDataLocation()) + offset;
00835       adler = adler32(adler, dataLocation, smMsg->dataSize - offset);
00836       offset = 0;
00837     }
00838 
00839     curRef = curRef->getNextReference();
00840   }
00841 
00842   return adler;
00843 }
00844 
00845 uint32_t detail::ChainData::do_outputModuleId() const
00846 {
00847   std::stringstream msg;
00848   msg << "An output module ID is only available from a valid, ";
00849   msg << "complete INIT or Event message.";
00850   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00851 }
00852 
00853 std::string detail::ChainData::do_outputModuleLabel() const
00854 {
00855   std::stringstream msg;
00856   msg << "An output module label is only available from a valid, ";
00857   msg << "complete INIT message.";
00858   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00859 }
00860 
00861 std::string detail::ChainData::do_topFolderName() const
00862 {
00863   std::stringstream msg;
00864   msg << "A top folder name is only available from a valid, ";
00865   msg << "complete DQM event message.";
00866   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00867 }
00868 
00869 DQMKey detail::ChainData::do_dqmKey() const
00870 {
00871   std::stringstream msg;
00872   msg << "The DQM key is only available from a valid, ";
00873   msg << "complete DQM event message.";
00874   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00875 }
00876 
00877 void detail::ChainData::do_hltTriggerNames(Strings& nameList) const
00878 {
00879   std::stringstream msg;
00880   msg << "The HLT trigger names are only available from a valid, ";
00881   msg << "complete INIT message.";
00882   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00883 }
00884 
00885 void detail::ChainData::do_hltTriggerSelections(Strings& nameList) const
00886 {
00887   std::stringstream msg;
00888   msg << "The HLT trigger selections are only available from a valid, ";
00889   msg << "complete INIT message.";
00890   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00891 }
00892 
00893 void detail::ChainData::do_l1TriggerNames(Strings& nameList) const
00894 {
00895   std::stringstream msg;
00896   msg << "The L1 trigger names are only available from a valid, ";
00897   msg << "complete INIT message.";
00898   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00899 }
00900 
00901 uint32_t detail::ChainData::do_hltTriggerCount() const
00902 {
00903   std::stringstream msg;
00904   msg << "An HLT trigger count is only available from a valid, ";
00905   msg << "complete Event message.";
00906   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00907 }
00908 
00909 void 
00910 detail::ChainData::do_hltTriggerBits(std::vector<unsigned char>& bitList) const
00911 {
00912   std::stringstream msg;
00913   msg << "The HLT trigger bits are only available from a valid, ";
00914   msg << "complete Event message.";
00915   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00916 }
00917 
00918 uint32_t detail::ChainData::do_runNumber() const
00919 {
00920   std::stringstream msg;
00921   msg << "A run number is only available from a valid, ";
00922   msg << "complete EVENT or ERROR_EVENT message.";
00923   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00924 }
00925 
00926 uint32_t detail::ChainData::do_lumiSection() const
00927 {
00928   std::stringstream msg;
00929   msg << "A luminosity section is only available from a valid, ";
00930   msg << "complete EVENT or ERROR_EVENT message.";
00931   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00932 }
00933 
00934 uint32_t detail::ChainData::do_eventNumber() const
00935 {
00936   std::stringstream msg;
00937   msg << "An event number is only available from a valid, ";
00938   msg << "complete EVENT or ERROR_EVENT message.";
00939   XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
00940 }
00941 
00942