17 #include "interface/shared/i2oXFunctionCodes.h"
18 #include "interface/shared/version.h"
28 const unsigned int messageCode) :
31 dqmEventConsumerTags_(),
34 faultyBits_(INCOMPLETE_MESSAGE),
35 messageCode_(messageCode),
36 i2oMessageCode_(i2oMessageCode),
39 expectedNumberOfFragments_(0),
47 #ifdef STOR_DEBUG_CORRUPT_MESSAGES
48 double r =
rand()/
static_cast<double>(RAND_MAX);
56 std::cout <<
"Simulating faulty I2O message" << std::endl;
59 #endif // STOR_DEBUG_CORRUPT_MESSAGES
76 try { ref_->release(); }
94 return (faultyBits_ != 0);
96 return (faultyBits_ != INCOMPLETE_MESSAGE);
107 ((faultyBits_ & INVALID_INITIAL_REFERENCE & ~INCOMPLETE_MESSAGE) == 0) &&
108 ((faultyBits_ & CORRUPT_INITIAL_HEADER & ~INCOMPLETE_MESSAGE) == 0);
113 return ( (faultyBits_ & ~WRONG_CHECKSUM) == 0);
120 XCEPT_RAISE(stor::exception::I2OChain,
"Cannot add a first fragment to a non-empty I2OChain.");
129 fragKey_.secondaryId_ =
static_cast<uint32_t
>(
130 (uintptr_t)pRef->getDataLocation()
135 fragKey_.secondaryId_ =
static_cast<uint32_t
>(
time(0) );
141 lastFragmentTime_ = creationTime_;
142 staleWindowStartTime_ = creationTime_;
146 int workingIndex = -1;
148 if (validateDataLocation(pRef, INVALID_INITIAL_REFERENCE) &&
149 validateMessageSize(pRef, CORRUPT_INITIAL_HEADER) &&
150 validateFragmentIndexAndCount(pRef, CORRUPT_INITIAL_HEADER))
154 expectedNumberOfFragments_ = smMsg->
numFrames;
155 validateFragmentOrder(pRef, workingIndex);
159 toolbox::mem::Reference* curRef = pRef->getNextReference();
164 if (validateDataLocation(curRef, INVALID_SECONDARY_REFERENCE) &&
165 validateMessageSize(curRef, CORRUPT_SECONDARY_HEADER) &&
166 validateFragmentIndexAndCount(curRef, CORRUPT_SECONDARY_HEADER))
168 validateExpectedFragmentCount(curRef, TOTAL_COUNT_MISMATCH);
169 validateFragmentOrder(curRef, workingIndex);
170 validateMessageCode(curRef, i2oMessageCode_);
173 curRef = curRef->getNextReference();
177 checkForCompleteness();
184 addFirstFragment(newpart.
ref_);
188 if (parsable() && newpart.
parsable())
191 toolbox::mem::Reference* newRef = newpart.
ref_;
195 toolbox::mem::Reference* nextNewRef = newRef->getNextReference();
196 newRef->setNextReference(0);
204 if (newRef == newpart.
ref_) {newRef = newpart.
ref_->duplicate();}
207 bool fragmentWasAdded =
false;
216 unsigned int newFragmentTotalCount = thatMsg->
numFrames;
217 if (newFragmentTotalCount != expectedNumberOfFragments_)
219 faultyBits_ |= TOTAL_COUNT_MISMATCH;
225 unsigned int firstIndex = fragMsg->
frameCount;
227 if (newIndex < firstIndex)
229 newRef->setNextReference(ref_);
231 fragmentWasAdded =
true;
238 toolbox::mem::Reference* curRef = ref_;
239 for (
unsigned int idx = 0; idx < fragmentCount_; ++idx)
247 if (newIndex == curIndex)
249 faultyBits_ |= DUPLICATE_FRAGMENT;
250 newRef->setNextReference(curRef->getNextReference());
251 curRef->setNextReference(newRef);
252 fragmentWasAdded =
true;
259 toolbox::mem::Reference* nextRef = curRef->getNextReference();
262 curRef->setNextReference(newRef);
263 fragmentWasAdded =
true;
271 if (newIndex > curIndex && newIndex < nextIndex)
273 newRef->setNextReference(curRef->getNextReference());
274 curRef->setNextReference(newRef);
275 fragmentWasAdded =
true;
284 if (!fragmentWasAdded)
288 XCEPT_RAISE(stor::exception::I2OChain,
289 "A fragment was unable to be added to a chain.");
302 toolbox::mem::Reference* curRef = newpart.
ref_;
305 curRef = curRef->getNextReference();
309 toolbox::mem::Reference* lastRef = ref_;
310 curRef = ref_->getNextReference();
313 curRef = curRef->getNextReference();
315 lastRef->setNextReference(newpart.
ref_->duplicate());
319 staleWindowStartTime_ = lastFragmentTime_;
333 staleWindowStartTime_ = lastFragmentTime_;
339 checkForCompleteness();
344 if ((fragmentCount_ == expectedNumberOfFragments_) &&
345 ((faultyBits_ & (TOTAL_COUNT_MISMATCH | FRAGMENTS_OUT_OF_ORDER | DUPLICATE_FRAGMENT)) == 0))
351 faultyBits_ &= ~INCOMPLETE_MESSAGE;
353 validateAdler32Checksum();
358 faultyBits_ |= EXTERNALLY_REQUESTED;
363 faultyBits_ |= CORRUPT_INITIAL_HEADER;
369 ?
static_cast<unsigned long*
>(ref_->getDataLocation())
393 size_t memoryUsed = 0;
394 toolbox::mem::Reference* curRef = ref_;
397 memoryUsed += curRef->getDataSize();
398 curRef = curRef->getNextReference();
405 unsigned long totalSize = 0;
406 toolbox::mem::Reference* curRef = ref_;
407 for (
unsigned int idx = 0; idx < fragmentCount_; ++idx)
409 I2O_MESSAGE_FRAME *i2oMsg =
410 (I2O_MESSAGE_FRAME*) curRef->getDataLocation();
419 totalSize += (i2oMsg->MessageSize*4);
422 curRef = curRef->getNextReference();
429 toolbox::mem::Reference* curRef = ref_;
430 for (
int idx = 0; idx < fragmentIndex; ++idx)
432 curRef = curRef->getNextReference();
435 I2O_MESSAGE_FRAME *i2oMsg =
436 (I2O_MESSAGE_FRAME*) curRef->getDataLocation();
445 return (i2oMsg->MessageSize*4);
452 toolbox::mem::Reference* curRef = ref_;
453 for (
int idx = 0; idx < fragmentIndex; ++idx)
455 curRef = curRef->getNextReference();
460 return do_fragmentLocation(static_cast<unsigned char*>
461 (curRef->getDataLocation()));
465 return static_cast<unsigned char*
>(curRef->getDataLocation());
471 toolbox::mem::Reference* curRef = ref_;
472 for (
int idx = 0; idx < fragmentIndex; ++idx)
474 curRef = curRef->getNextReference();
492 unsigned long fullSize = totalDataSize();
493 if (targetBuffer.capacity() < fullSize)
495 targetBuffer.resize(fullSize);
497 unsigned char* targetLoc = (
unsigned char*)&targetBuffer[0];
499 toolbox::mem::Reference* curRef = ref_;
502 unsigned char* fragmentLoc =
503 (
unsigned char*) curRef->getDataLocation();
504 unsigned long sourceSize = 0;
505 unsigned char* sourceLoc = 0;
512 sourceLoc = do_fragmentLocation(fragmentLoc);
514 else if (fragmentLoc)
516 I2O_MESSAGE_FRAME *i2oMsg = (I2O_MESSAGE_FRAME*) fragmentLoc;
517 sourceSize = i2oMsg->MessageSize * 4;
518 sourceLoc = fragmentLoc;
523 std::copy(sourceLoc, sourceLoc+sourceSize, targetLoc);
524 targetLoc += sourceSize;
527 curRef = curRef->getNextReference();
530 return static_cast<unsigned int>(fullSize);
535 return do_headerSize();
540 return do_headerLocation();
551 std::string URL(smMsg->
hltURL, size);
556 return "unavailable";
573 return "unavailable";
579 return do_outputModuleId();
585 return do_outputModuleLabel();
590 return do_topFolderName();
600 do_hltTriggerNames(nameList);
605 do_hltTriggerSelections(nameList);
610 do_l1TriggerNames(nameList);
615 return do_hltTriggerCount();
621 do_hltTriggerBits(bitList);
626 do_assertRunNumber(runNumber);
631 return do_runNumber();
636 return do_lumiSection();
641 return do_eventNumber();
646 return do_adler32Checksum();
651 streamTags_.push_back(streamId);
656 eventConsumerTags_.push_back(queueId);
661 dqmEventConsumerTags_.push_back(queueId);
671 return eventConsumerTags_;
676 return dqmEventConsumerTags_;
681 return ( i2oMessageCode_ == I2O_EVM_LUMISECTION );
688 I2O_PRIVATE_MESSAGE_FRAME *pvtMsg =
689 (I2O_PRIVATE_MESSAGE_FRAME*) ref->getDataLocation();
692 faultyBits_ |= maskToUse;
702 I2O_PRIVATE_MESSAGE_FRAME *pvtMsg =
703 (I2O_PRIVATE_MESSAGE_FRAME*) ref->getDataLocation();
704 if ((
size_t)(pvtMsg->StdMessageFrame.MessageSize*4) <
707 faultyBits_ |= maskToUse;
721 faultyBits_ |= maskToUse;
733 if (smMsg->
numFrames != expectedNumberOfFragments_)
735 faultyBits_ |= maskToUse;
745 int problemCount = 0;
748 int thisIndex =
static_cast<int>(smMsg->
frameCount);
749 if (thisIndex == indexValue)
751 faultyBits_ |= DUPLICATE_FRAGMENT;
754 else if (thisIndex < indexValue)
756 faultyBits_ |= FRAGMENTS_OUT_OF_ORDER;
759 indexValue = thisIndex;
760 return (problemCount == 0);
765 unsigned short expectedI2OMessageCode)
767 I2O_PRIVATE_MESSAGE_FRAME *pvtMsg =
768 (I2O_PRIVATE_MESSAGE_FRAME*) ref->getDataLocation();
769 if (pvtMsg->XFunctionCode != expectedI2OMessageCode)
771 faultyBits_ |= CORRUPT_SECONDARY_HEADER;
779 if ( !complete() || !headerOkay() )
return false;
781 const uint32_t expected = adler32Checksum();
782 if (expected == 0)
return false;
784 const uint32_t calculated = calculateAdler32();
786 if ( calculated != expected )
788 faultyBits_ |= WRONG_CHECKSUM;
796 uint32_t adler = adler32(0
L, 0, 0);
798 toolbox::mem::Reference* curRef = ref_;
804 const unsigned long headerSize = do_headerSize();
807 const unsigned long payloadSize = curRef->getDataSize() - do_i2oFrameSize();
808 if ( headerSize > payloadSize )
811 offset = headerSize - payloadSize;
815 const unsigned char* dataLocation =
816 do_fragmentLocation((
unsigned char*)curRef->getDataLocation()) + headerSize;
817 adler = adler32(adler, dataLocation, smMsg->
dataSize - headerSize);
820 curRef = curRef->getNextReference();
826 const unsigned long payloadSize = curRef->getDataSize() - do_i2oFrameSize();
827 if ( offset > payloadSize )
829 offset -= payloadSize;
833 const unsigned char* dataLocation =
834 do_fragmentLocation((
unsigned char*)curRef->getDataLocation()) + offset;
835 adler = adler32(adler, dataLocation, smMsg->
dataSize - offset);
839 curRef = curRef->getNextReference();
847 std::stringstream
msg;
848 msg <<
"An output module ID is only available from a valid, ";
849 msg <<
"complete INIT or Event message.";
850 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
855 std::stringstream
msg;
856 msg <<
"An output module label is only available from a valid, ";
857 msg <<
"complete INIT message.";
858 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
863 std::stringstream
msg;
864 msg <<
"A top folder name is only available from a valid, ";
865 msg <<
"complete DQM event message.";
866 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
871 std::stringstream
msg;
872 msg <<
"The DQM key is only available from a valid, ";
873 msg <<
"complete DQM event message.";
874 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
879 std::stringstream
msg;
880 msg <<
"The HLT trigger names are only available from a valid, ";
881 msg <<
"complete INIT message.";
882 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
887 std::stringstream
msg;
888 msg <<
"The HLT trigger selections are only available from a valid, ";
889 msg <<
"complete INIT message.";
890 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
895 std::stringstream
msg;
896 msg <<
"The L1 trigger names are only available from a valid, ";
897 msg <<
"complete INIT message.";
898 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
903 std::stringstream
msg;
904 msg <<
"An HLT trigger count is only available from a valid, ";
905 msg <<
"complete Event message.";
906 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
912 std::stringstream
msg;
913 msg <<
"The HLT trigger bits are only available from a valid, ";
914 msg <<
"complete Event message.";
915 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
920 std::stringstream
msg;
921 msg <<
"A run number is only available from a valid, ";
922 msg <<
"complete EVENT or ERROR_EVENT message.";
923 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
928 std::stringstream
msg;
929 msg <<
"A luminosity section is only available from a valid, ";
930 msg <<
"complete EVENT or ERROR_EVENT message.";
931 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
936 std::stringstream
msg;
937 msg <<
"An event number is only available from a valid, ";
938 msg <<
"complete EVENT or ERROR_EVENT message.";
939 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
TimePoint_t getCurrentTime()
virtual void do_hltTriggerSelections(Strings &nameList) const
bool validateFragmentIndexAndCount(toolbox::mem::Reference *ref, BitMasksForFaulty maskToUse)
size_t memoryUsed() const
unsigned char * dataLocation(int fragmentIndex) const
unsigned short i2oMessageCode_
std::vector< StreamID > const & getStreamTags() const
virtual uint32_t do_lumiSection() const
unsigned long headerSize() const
uint32_t runNumber() const
unsigned char * headerLocation() const
virtual void do_l1TriggerNames(Strings &nameList) const
bool validateMessageSize(toolbox::mem::Reference *ref, BitMasksForFaulty maskToUse)
void hltTriggerNames(Strings &nameList) const
toolbox::mem::Reference * ref_
std::vector< QueueID > QueueIDs
unsigned long * getBufferData() const
std::string hltURL() const
uint32_t adler32Checksum() const
std::vector< StreamID > streamTags_
utils::TimePoint_t creationTime_
void addFirstFragment(toolbox::mem::Reference *)
virtual uint32_t do_outputModuleId() const
void assertRunNumber(uint32_t runNumber)
uint32_t outputModuleId() const
virtual void do_hltTriggerNames(Strings &nameList) const
void addToChain(ChainData const &)
unsigned int getFragmentID(int fragmentIndex) const
unsigned int expectedNumberOfFragments_
utils::TimePoint_t lastFragmentTime_
bool validateDataLocation(toolbox::mem::Reference *ref, BitMasksForFaulty maskToUse)
virtual uint32_t do_eventNumber() const
ChainData(unsigned short i2oMessageCode=0x9999, unsigned int messageCode=Header::INVALID)
unsigned int copyFragmentsIntoBuffer(std::vector< unsigned char > &buff) const
void swap(ChainData &other)
bool validateAdler32Checksum()
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
uint32_t eventNumber() const
bool validateExpectedFragmentCount(toolbox::mem::Reference *ref, BitMasksForFaulty maskToUse)
QueueIDs const & getEventConsumerTags() const
bool isEndOfLumiSectionMessage() const
virtual void do_hltTriggerBits(std::vector< unsigned char > &bitList) const
unsigned int offset(bool)
utils::TimePoint_t creationTime() const
uint32_t lumiSection() const
void hltTriggerSelections(Strings &nameList) const
virtual uint32_t do_hltTriggerCount() const
virtual DQMKey do_dqmKey() const
unsigned long totalDataSize() const
void tagForDQMEventConsumer(QueueID)
uint32_t hltTriggerCount() const
void tagForEventConsumer(QueueID)
void checkForCompleteness()
unsigned int fragmentCount_
void hltTriggerBits(std::vector< unsigned char > &bitList) const
std::string topFolderName() const
QueueIDs const & getDQMEventConsumerTags() const
uint32_t calculateAdler32() const
virtual std::string do_outputModuleLabel() const
bool validateMessageCode(toolbox::mem::Reference *ref, unsigned short expectedI2OMessageCode)
unsigned long dataSize(int fragmentIndex) const
QueueIDs eventConsumerTags_
char hltURL[MAX_I2O_SM_URLCHARS]
virtual std::string do_topFolderName() const
char hltClassName[MAX_I2O_SM_URLCHARS]
void tagForStream(StreamID)
unsigned int messageCode_
std::string hltClassName() const
#define MAX_I2O_SM_URLCHARS
virtual uint32_t do_runNumber() const
std::string outputModuleLabel() const
QueueIDs dqmEventConsumerTags_
bool validateFragmentOrder(toolbox::mem::Reference *ref, int &indexValue)
tuple size
Write out results.
std::string className(const T &t)
unsigned int faultyBits() const
utils::TimePoint_t staleWindowStartTime_
void l1TriggerNames(Strings &nameList) const