CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_5_3_3/src/EventFilter/StorageManager/src/I2OChain.cc

Go to the documentation of this file.
00001 // $Id: I2OChain.cc,v 1.27 2012/04/20 10:48:02 mommsen Exp $
00003 
00004 #include <algorithm>
00005 
00006 #include "EventFilter/StorageManager/interface/DQMKey.h"
00007 #include "EventFilter/StorageManager/interface/Exception.h"
00008 #include "EventFilter/StorageManager/interface/I2OChain.h"
00009 #include "EventFilter/StorageManager/interface/QueueID.h"
00010 
00011 #include "EventFilter/StorageManager/src/ChainData.h"
00012 
00013 #include "EventFilter/Utilities/interface/i2oEvfMsgs.h"
00014 
00015 #include "IOPool/Streamer/interface/MsgHeader.h"
00016 
00017 #include "interface/evb/i2oEVBMsgs.h"
00018 #include "interface/shared/i2oXFunctionCodes.h"
00019 #include "interface/shared/version.h"
00020 
00021 
00022 namespace stor
00023 {
00024 
00025   // A default-constructed I2OChain has a null (shared) pointer.
00026   I2OChain::I2OChain():
00027     data_()
00028   {}
00029 
00030   I2OChain::I2OChain(toolbox::mem::Reference* pRef)
00031   {
00032     if (pRef)
00033       {
00034         I2O_PRIVATE_MESSAGE_FRAME *pvtMsg =
00035           (I2O_PRIVATE_MESSAGE_FRAME*) pRef->getDataLocation();
00036         if (!pvtMsg)
00037           {
00038             data_.reset(new detail::ChainData());
00039             data_->addFirstFragment(pRef);
00040             return;
00041           }
00042 
00043         unsigned short i2oMessageCode = pvtMsg->XFunctionCode;
00044         switch (i2oMessageCode)
00045           {
00046 
00047           case I2O_SM_PREAMBLE:
00048             {
00049               data_.reset(new detail::InitMsgData(pRef));
00050               break;
00051             }
00052 
00053           case I2O_SM_DATA:
00054             {
00055               data_.reset(new detail::EventMsgData(pRef));
00056               break;
00057             }
00058 
00059           case I2O_SM_DQM:
00060             {
00061               data_.reset(new detail::DQMEventMsgData(pRef));
00062               break;
00063             }
00064 
00065           case I2O_EVM_LUMISECTION:
00066             {
00067               data_.reset(new detail::EndLumiSectMsgData(pRef));
00068               break;
00069             }
00070 
00071           case I2O_SM_ERROR:
00072             {
00073               data_.reset(new detail::ErrorEventMsgData(pRef));
00074               break;
00075             }
00076 
00077           default:
00078             {
00079               data_.reset(new detail::ChainData());
00080               data_->addFirstFragment(pRef);
00081               data_->markCorrupt();
00082               break;
00083             }
00084 
00085           }
00086       }
00087   }
00088 
00089   I2OChain::I2OChain(I2OChain const& other) :
00090     data_(other.data_)
00091   { }
00092 
00093   I2OChain::~I2OChain()
00094   { }
00095 
00096   I2OChain& I2OChain::operator=(I2OChain const& rhs)
00097   {
00098     // This is the standard copy/swap algorithm, to obtain the strong
00099     // exception safety guarantee.
00100     I2OChain temp(rhs);
00101     swap(temp);
00102     return *this;
00103   }
00104   
00105   void I2OChain::swap(I2OChain& other)
00106   {
00107     data_.swap(other.data_);
00108   }
00109 
00110   bool I2OChain::empty() const
00111   {
00112     // We're empty if we have no ChainData, or if the ChainData object
00113     // we have is empty.
00114     return !data_ || data_->empty();
00115   }
00116 
00117 
00118   bool I2OChain::complete() const
00119   {
00120     if (!data_) return false;
00121     return data_->complete();
00122   }
00123 
00124 
00125   bool I2OChain::faulty() const
00126   {
00127     if (!data_) return false;
00128     return data_->faulty();
00129   }
00130 
00131 
00132   unsigned int I2OChain::faultyBits() const
00133   {
00134     if (!data_) return 0;
00135     return data_->faultyBits();
00136   }
00137 
00138 
00139   void I2OChain::addToChain(I2OChain &newpart)
00140   {
00141     // fragments can not be added to empty, complete, or faulty chains.
00142     if (empty())
00143       {
00144         XCEPT_RAISE(stor::exception::I2OChain,
00145                     "A fragment may not be added to an empty chain.");
00146       }
00147     if (complete())
00148       {
00149         XCEPT_RAISE(stor::exception::I2OChain,
00150                     "A fragment may not be added to a complete chain.");
00151       }
00152 
00153     // empty, complete, or faulty new parts can not be added to chains
00154     if (newpart.empty())
00155       {
00156         XCEPT_RAISE(stor::exception::I2OChain,
00157                     "An empty chain may not be added to an existing chain.");
00158       }
00159     if (newpart.complete())
00160       {
00161         XCEPT_RAISE(stor::exception::I2OChain,
00162                     "A complete chain may not be added to an existing chain.");
00163       }
00164 
00165     // require the new part and this chain to have the same fragment key
00166     FragKey thisKey = fragmentKey();
00167     FragKey thatKey = newpart.fragmentKey();
00168     // should change this to != once we implement that operator in FragKey
00169     if (thisKey < thatKey || thatKey < thisKey)
00170       {
00171         std::stringstream msg;
00172         msg << "A fragment key mismatch was detected when trying to add "
00173             << "a chain link to an existing chain. "
00174             << "Existing key values = ("
00175             << ((int)thisKey.code_) << "," << thisKey.run_ << ","
00176             << thisKey.event_ << "," << thisKey.secondaryId_ << ","
00177             << thisKey.originatorPid_ << "," << thisKey.originatorGuid_
00178             << "), new key values = ("
00179             << ((int)thatKey.code_) << "," << thatKey.run_ << ","
00180             << thatKey.event_ << "," << thatKey.secondaryId_ << ","
00181             << thatKey.originatorPid_ << "," << thatKey.originatorGuid_
00182             << ").";
00183         XCEPT_RAISE(stor::exception::I2OChain, msg.str());
00184       }
00185 
00186     // add the fragment to the current chain
00187     data_->addToChain(*(newpart.data_));
00188     newpart.release();
00189   }
00190 
00191   //void I2OChain::markComplete()
00192   //{
00193   //  // TODO:: Should we throw an exception if data_ is null? If so, what
00194   //  // type? Right now, we do nothing if data_ is null.
00195   //  if (data_) data_->markComplete();
00196   //}
00197 
00198   void I2OChain::markFaulty()
00199   {
00200     // TODO:: Should we throw an exception if data_ is null? If so, what
00201     // type? Right now, we do nothing if data_ is null.
00202     if (data_) data_->markFaulty();
00203   }
00204 
00205   unsigned long* I2OChain::getBufferData() const
00206   {
00207     return data_ ?  data_->getBufferData() : 0UL;
00208   }
00209 
00210   void I2OChain::release()
00211   {
00212     // A default-constructed chain controls no resources; we can
00213     // relinquish our control over any controlled Reference by
00214     // becoming like a default-constructed chain.
00215     I2OChain().swap(*this);
00216   }
00217 
00218   unsigned int I2OChain::messageCode() const
00219   {
00220     if (!data_) return Header::INVALID;
00221     return data_->messageCode();
00222   }
00223 
00224   unsigned short I2OChain::i2oMessageCode() const
00225   {
00226     if (!data_) return 0xffff;
00227     return data_->i2oMessageCode();
00228   }
00229 
00230   unsigned int I2OChain::rbBufferId() const
00231   {
00232     if (!data_) return 0;
00233     return data_->rbBufferId();
00234   }
00235 
00236   unsigned int I2OChain::hltLocalId() const
00237   {
00238     if (!data_) return 0;
00239     return data_->hltLocalId();
00240   }
00241 
00242   unsigned int I2OChain::hltInstance() const
00243   {
00244     if (!data_) return 0;
00245     return data_->hltInstance();
00246   }
00247 
00248   unsigned int I2OChain::hltTid() const
00249   {
00250     if (!data_) return 0;
00251     return data_->hltTid();
00252   }
00253 
00254   std::string I2OChain::hltURL() const
00255   {
00256     if (!data_) return "";
00257     return data_->hltURL();
00258   }
00259 
00260   std::string I2OChain::hltClassName() const
00261   {
00262     if (!data_) return "";
00263     return data_->hltClassName();
00264   }
00265 
00266   unsigned int I2OChain::fuProcessId() const
00267   {
00268     if (!data_) return 0;
00269     return data_->fuProcessId();
00270   }
00271 
00272   unsigned int I2OChain::fuGuid() const
00273   {
00274     if (!data_) return 0;
00275     return data_->fuGuid();
00276   }
00277 
00278   FragKey I2OChain::fragmentKey() const
00279   {
00280     if (!data_) return FragKey(Header::INVALID,0,0,0,0,0);
00281     return data_->fragmentKey();
00282   }
00283 
00284   unsigned int I2OChain::fragmentCount() const
00285   {
00286     if (!data_) return 0;
00287     return data_->fragmentCount();
00288   }
00289 
00290   utils::TimePoint_t I2OChain::creationTime() const
00291   {
00292     if (!data_) return boost::posix_time::not_a_date_time;
00293     return data_->creationTime();
00294   }
00295 
00296   utils::TimePoint_t I2OChain::lastFragmentTime() const
00297   {
00298     if (!data_) return boost::posix_time::not_a_date_time;
00299     return data_->lastFragmentTime();
00300   }
00301 
00302   utils::TimePoint_t I2OChain::staleWindowStartTime() const
00303   {
00304     if (!data_) return boost::posix_time::not_a_date_time;
00305     return data_->staleWindowStartTime();
00306   }
00307 
00308   void I2OChain::addToStaleWindowStartTime(const utils::Duration_t duration)
00309   {
00310     if (!data_) return;
00311     data_->addToStaleWindowStartTime(duration);
00312   }
00313 
00314   void I2OChain::resetStaleWindowStartTime()
00315   {
00316     if (!data_) return;
00317     data_->resetStaleWindowStartTime();
00318   }
00319 
00320   void I2OChain::tagForStream(StreamID streamId)
00321   {
00322     if (!data_)
00323       {
00324         std::stringstream msg;
00325         msg << "An empty chain can not be tagged for a specific ";
00326         msg << "event stream.";
00327         XCEPT_RAISE(stor::exception::I2OChain, msg.str());
00328       }
00329     data_->tagForStream(streamId);
00330   }
00331 
00332   void I2OChain::tagForEventConsumer(QueueID queueId)
00333   {
00334     if (!data_)
00335       {
00336         std::stringstream msg;
00337         msg << "An empty chain can not be tagged for a specific ";
00338         msg << "event consumer.";
00339         XCEPT_RAISE(stor::exception::I2OChain, msg.str());
00340       }
00341     data_->tagForEventConsumer(queueId);
00342   }
00343 
00344   void I2OChain::tagForDQMEventConsumer(QueueID queueId)
00345   {
00346     if (!data_)
00347       {
00348         std::stringstream msg;
00349         msg << "An empty chain can not be tagged for a specific ";
00350         msg << "DQM event consumer.";
00351         XCEPT_RAISE(stor::exception::I2OChain, msg.str());
00352       }
00353     data_->tagForDQMEventConsumer(queueId);
00354   }
00355 
00356   bool I2OChain::isTaggedForAnyStream() const
00357   {
00358     if (!data_) return false;
00359     return data_->isTaggedForAnyStream();
00360   }
00361 
00362   bool I2OChain::isTaggedForAnyEventConsumer() const
00363   {
00364     if (!data_) return false;
00365     return data_->isTaggedForAnyEventConsumer();
00366   }
00367 
00368   bool I2OChain::isTaggedForAnyDQMEventConsumer() const
00369   {
00370     if (!data_) return false;
00371     return data_->isTaggedForAnyDQMEventConsumer();
00372   }
00373 
00374   std::vector<StreamID> I2OChain::getStreamTags() const
00375   {
00376     if (!data_)
00377       {
00378         std::vector<StreamID> tmpList;
00379         return tmpList;
00380       }
00381     return data_->getStreamTags();
00382   }
00383 
00384   QueueIDs I2OChain::getEventConsumerTags() const
00385   {
00386     if (!data_)
00387       {
00388         QueueIDs tmpList;
00389         return tmpList;
00390       }
00391     return data_->getEventConsumerTags();
00392   }
00393 
00394   QueueIDs I2OChain::getDQMEventConsumerTags() const
00395   {
00396     if (!data_)
00397       {
00398         QueueIDs tmpList;
00399         return tmpList;
00400       }
00401     return data_->getDQMEventConsumerTags();
00402   }
00403 
00404   unsigned int I2OChain::droppedEventsCount() const
00405   {
00406     if (!data_)
00407       {
00408         std::stringstream msg;
00409         msg << "A dropped event count cannot be retrieved from an empty chain";
00410         XCEPT_RAISE(stor::exception::I2OChain, msg.str());
00411       }
00412     return data_->droppedEventsCount();
00413   }
00414 
00415   void I2OChain::setDroppedEventsCount(unsigned int count)
00416   {
00417     if (!data_)
00418       {
00419         std::stringstream msg;
00420         msg << "A dropped event count cannot be added to an empty chain";
00421         XCEPT_RAISE(stor::exception::I2OChain, msg.str());
00422       }
00423     data_->setDroppedEventsCount(count);
00424   }
00425 
00426   size_t I2OChain::memoryUsed() const
00427   {
00428     if (!data_) return 0;
00429     return data_->memoryUsed();
00430   }
00431 
00432   unsigned long I2OChain::totalDataSize() const
00433   {
00434     if (!data_) return 0UL;
00435     return data_->totalDataSize();
00436   }
00437 
00438   unsigned long I2OChain::dataSize(int fragmentIndex) const
00439   {
00440     if (!data_) return 0UL;
00441     return data_->dataSize(fragmentIndex);
00442   }
00443 
00444   unsigned char* I2OChain::dataLocation(int fragmentIndex) const
00445   {
00446     if (!data_) return 0UL;
00447     return data_->dataLocation(fragmentIndex);
00448   }
00449 
00450   unsigned int I2OChain::getFragmentID(int fragmentIndex) const
00451   {
00452     if (!data_) return 0;
00453     return data_->getFragmentID(fragmentIndex);
00454   }
00455 
00456   unsigned long I2OChain::headerSize() const
00457   {
00458     if (!data_) return 0UL;
00459     return data_->headerSize();
00460   }
00461 
00462   unsigned char* I2OChain::headerLocation() const
00463   {
00464     if (!data_) return 0UL;
00465     return data_->headerLocation();
00466   }
00467 
00468   unsigned int I2OChain::
00469   copyFragmentsIntoBuffer(std::vector<unsigned char>& targetBuffer) const
00470   {
00471     if (!data_) return 0;
00472     return data_->copyFragmentsIntoBuffer(targetBuffer);
00473   }
00474 
00475   std::string I2OChain::topFolderName() const
00476   {
00477     if( !data_ )
00478       {
00479         XCEPT_RAISE( stor::exception::I2OChain,
00480                      "The top folder name can not be determined from an empty I2OChain." );
00481       }
00482     return data_->topFolderName();
00483   }
00484 
00485   DQMKey I2OChain::dqmKey() const
00486   {
00487     if( !data_ )
00488       {
00489         XCEPT_RAISE( stor::exception::I2OChain,
00490                      "The DQM key can not be determined from an empty I2OChain." );
00491       }
00492     return data_->dqmKey();
00493   }
00494 
00495   std::string I2OChain::outputModuleLabel() const
00496   {
00497     if (!data_)
00498       {
00499         XCEPT_RAISE(stor::exception::I2OChain,
00500           "The output module label can not be determined from an empty I2OChain.");
00501       }
00502     return data_->outputModuleLabel();
00503   }
00504 
00505   uint32_t I2OChain::outputModuleId() const
00506   {
00507     if (!data_)
00508       {
00509         XCEPT_RAISE(stor::exception::I2OChain,
00510           "The output module ID can not be determined from an empty I2OChain.");
00511       }
00512     return data_->outputModuleId();
00513   }
00514 
00515   uint32_t I2OChain::nExpectedEPs() const
00516   {
00517     if (!data_)
00518       {
00519         XCEPT_RAISE(stor::exception::I2OChain,
00520           "The slave EP count can not be determined from an empty I2OChain.");
00521       }
00522     return data_->nExpectedEPs();
00523   }
00524 
00525   void I2OChain::hltTriggerNames(Strings& nameList) const
00526   {
00527     if (!data_)
00528       {
00529         XCEPT_RAISE(stor::exception::I2OChain,
00530           "HLT trigger names can not be determined from an empty I2OChain.");
00531       }
00532     data_->hltTriggerNames(nameList);
00533   }
00534 
00535   void I2OChain::hltTriggerSelections(Strings& nameList) const
00536   {
00537     if (!data_)
00538       {
00539         XCEPT_RAISE(stor::exception::I2OChain,
00540           "HLT trigger selections can not be determined from an empty I2OChain.");
00541       }
00542     data_->hltTriggerSelections(nameList);
00543   }
00544 
00545   void I2OChain::l1TriggerNames(Strings& nameList) const
00546   {
00547     if (!data_)
00548       {
00549         XCEPT_RAISE(stor::exception::I2OChain,
00550           "L1 trigger names can not be determined from an empty I2OChain.");
00551       }
00552     data_->l1TriggerNames(nameList);
00553   }
00554 
00555   uint32_t I2OChain::hltTriggerCount() const
00556   {
00557     if (!data_)
00558       {
00559         XCEPT_RAISE(stor::exception::I2OChain,
00560           "The number of HLT trigger bits can not be determined from an empty I2OChain.");
00561       }
00562     return data_->hltTriggerCount();
00563   }
00564 
00565   void I2OChain::hltTriggerBits(std::vector<unsigned char>& bitList) const
00566   {
00567     if (!data_)
00568       {
00569         XCEPT_RAISE(stor::exception::I2OChain,
00570           "HLT trigger bits can not be determined from an empty I2OChain.");
00571       }
00572     data_->hltTriggerBits(bitList);
00573   }
00574 
00575   void I2OChain::assertRunNumber(uint32_t runNumber)
00576   {
00577     if (!data_)
00578       {
00579         XCEPT_RAISE(stor::exception::I2OChain,
00580           "The run number can not be checked for an empty I2OChain.");
00581       }
00582     return data_->assertRunNumber(runNumber);
00583   }
00584 
00585   uint32_t I2OChain::runNumber() const
00586   {
00587     if (!data_)
00588       {
00589         XCEPT_RAISE(stor::exception::I2OChain,
00590           "The run number can not be determined from an empty I2OChain.");
00591       }
00592     return data_->runNumber();
00593   }
00594 
00595   uint32_t I2OChain::lumiSection() const
00596   {
00597     if (!data_)
00598       {
00599         XCEPT_RAISE(stor::exception::I2OChain,
00600           "The luminosity section can not be determined from an empty I2OChain.");
00601       }
00602     return data_->lumiSection();
00603   }
00604 
00605   uint32_t I2OChain::eventNumber() const
00606   {
00607     if (!data_)
00608       {
00609         XCEPT_RAISE(stor::exception::I2OChain,
00610           "The event number can not be determined from an empty I2OChain.");
00611       }
00612     return data_->eventNumber();
00613   }
00614 
00615   uint32_t I2OChain::adler32Checksum() const
00616   {
00617     if (!data_)
00618       {
00619         XCEPT_RAISE(stor::exception::I2OChain,
00620           "The adler32 checksum can not be determined from an empty I2OChain.");
00621       }
00622     return data_->adler32Checksum();
00623   }
00624 
00625   bool I2OChain::isEndOfLumiSectionMessage() const
00626   {
00627     if (!data_) return false;
00628     return data_->isEndOfLumiSectionMessage();
00629   }
00630 
00631 } // namespace stor
00632