16 #include "interface/shared/i2oXFunctionCodes.h"
17 #include "interface/shared/version.h"
27 const unsigned int messageCode) :
30 dqmEventConsumerTags_(),
33 faultyBits_(INCOMPLETE_MESSAGE),
34 messageCode_(messageCode),
35 i2oMessageCode_(i2oMessageCode),
38 expectedNumberOfFragments_(0),
46 #ifdef STOR_DEBUG_CORRUPT_MESSAGES
47 double r =
rand()/
static_cast<double>(RAND_MAX);
55 std::cout <<
"Simulating faulty I2O message" << std::endl;
58 #endif // STOR_DEBUG_CORRUPT_MESSAGES
75 try { ref_->release(); }
93 return (faultyBits_ != 0);
95 return (faultyBits_ != INCOMPLETE_MESSAGE);
106 ((faultyBits_ & INVALID_INITIAL_REFERENCE & ~INCOMPLETE_MESSAGE) == 0) &&
107 ((faultyBits_ & CORRUPT_INITIAL_HEADER & ~INCOMPLETE_MESSAGE) == 0);
112 return ( (faultyBits_ & ~WRONG_CHECKSUM) == 0);
119 XCEPT_RAISE(stor::exception::I2OChain,
"Cannot add a first fragment to a non-empty I2OChain.");
128 fragKey_.secondaryId_ =
static_cast<uint32_t
>(
129 (uintptr_t)pRef->getDataLocation()
134 fragKey_.secondaryId_ =
static_cast<uint32_t
>(
time(0) );
140 lastFragmentTime_ = creationTime_;
141 staleWindowStartTime_ = creationTime_;
145 int workingIndex = -1;
147 if (validateDataLocation(pRef, INVALID_INITIAL_REFERENCE) &&
148 validateMessageSize(pRef, CORRUPT_INITIAL_HEADER) &&
149 validateFragmentIndexAndCount(pRef, CORRUPT_INITIAL_HEADER))
153 expectedNumberOfFragments_ = smMsg->
numFrames;
154 validateFragmentOrder(pRef, workingIndex);
158 toolbox::mem::Reference* curRef = pRef->getNextReference();
163 if (validateDataLocation(curRef, INVALID_SECONDARY_REFERENCE) &&
164 validateMessageSize(curRef, CORRUPT_SECONDARY_HEADER) &&
165 validateFragmentIndexAndCount(curRef, CORRUPT_SECONDARY_HEADER))
167 validateExpectedFragmentCount(curRef, TOTAL_COUNT_MISMATCH);
168 validateFragmentOrder(curRef, workingIndex);
169 validateMessageCode(curRef, i2oMessageCode_);
172 curRef = curRef->getNextReference();
176 checkForCompleteness();
183 addFirstFragment(newpart.
ref_);
187 if (parsable() && newpart.
parsable())
190 toolbox::mem::Reference* newRef = newpart.
ref_;
194 toolbox::mem::Reference* nextNewRef = newRef->getNextReference();
195 newRef->setNextReference(0);
203 if (newRef == newpart.
ref_) {newRef = newpart.
ref_->duplicate();}
206 bool fragmentWasAdded =
false;
215 unsigned int newFragmentTotalCount = thatMsg->
numFrames;
216 if (newFragmentTotalCount != expectedNumberOfFragments_)
218 faultyBits_ |= TOTAL_COUNT_MISMATCH;
224 unsigned int firstIndex = fragMsg->
frameCount;
226 if (newIndex < firstIndex)
228 newRef->setNextReference(ref_);
230 fragmentWasAdded =
true;
237 toolbox::mem::Reference* curRef = ref_;
238 for (
unsigned int idx = 0; idx < fragmentCount_; ++idx)
246 if (newIndex == curIndex)
248 faultyBits_ |= DUPLICATE_FRAGMENT;
249 newRef->setNextReference(curRef->getNextReference());
250 curRef->setNextReference(newRef);
251 fragmentWasAdded =
true;
258 toolbox::mem::Reference* nextRef = curRef->getNextReference();
261 curRef->setNextReference(newRef);
262 fragmentWasAdded =
true;
270 if (newIndex > curIndex && newIndex < nextIndex)
272 newRef->setNextReference(curRef->getNextReference());
273 curRef->setNextReference(newRef);
274 fragmentWasAdded =
true;
283 if (!fragmentWasAdded)
287 XCEPT_RAISE(stor::exception::I2OChain,
288 "A fragment was unable to be added to a chain.");
301 toolbox::mem::Reference* curRef = newpart.
ref_;
304 curRef = curRef->getNextReference();
308 toolbox::mem::Reference* lastRef = ref_;
309 curRef = ref_->getNextReference();
312 curRef = curRef->getNextReference();
314 lastRef->setNextReference(newpart.
ref_->duplicate());
318 staleWindowStartTime_ = lastFragmentTime_;
332 staleWindowStartTime_ = lastFragmentTime_;
338 checkForCompleteness();
343 if ((fragmentCount_ == expectedNumberOfFragments_) &&
344 ((faultyBits_ & (TOTAL_COUNT_MISMATCH | FRAGMENTS_OUT_OF_ORDER | DUPLICATE_FRAGMENT)) == 0))
350 faultyBits_ &= ~INCOMPLETE_MESSAGE;
352 validateAdler32Checksum();
357 faultyBits_ |= EXTERNALLY_REQUESTED;
362 faultyBits_ |= CORRUPT_INITIAL_HEADER;
368 ?
static_cast<unsigned long*
>(ref_->getDataLocation())
392 size_t memoryUsed = 0;
393 toolbox::mem::Reference* curRef = ref_;
396 memoryUsed += curRef->getDataSize();
397 curRef = curRef->getNextReference();
404 unsigned long totalSize = 0;
405 toolbox::mem::Reference* curRef = ref_;
406 for (
unsigned int idx = 0; idx < fragmentCount_; ++idx)
408 I2O_MESSAGE_FRAME *i2oMsg =
409 (I2O_MESSAGE_FRAME*) curRef->getDataLocation();
418 totalSize += (i2oMsg->MessageSize*4);
421 curRef = curRef->getNextReference();
428 toolbox::mem::Reference* curRef = ref_;
429 for (
int idx = 0; idx < fragmentIndex; ++idx)
431 curRef = curRef->getNextReference();
434 I2O_MESSAGE_FRAME *i2oMsg =
435 (I2O_MESSAGE_FRAME*) curRef->getDataLocation();
444 return (i2oMsg->MessageSize*4);
451 toolbox::mem::Reference* curRef = ref_;
452 for (
int idx = 0; idx < fragmentIndex; ++idx)
454 curRef = curRef->getNextReference();
459 return do_fragmentLocation(static_cast<unsigned char*>
460 (curRef->getDataLocation()));
464 return static_cast<unsigned char*
>(curRef->getDataLocation());
470 toolbox::mem::Reference* curRef = ref_;
471 for (
int idx = 0; idx < fragmentIndex; ++idx)
473 curRef = curRef->getNextReference();
491 unsigned long fullSize = totalDataSize();
492 if (targetBuffer.capacity() < fullSize)
494 targetBuffer.resize(fullSize);
496 unsigned char* targetLoc = (
unsigned char*)&targetBuffer[0];
498 toolbox::mem::Reference* curRef = ref_;
501 unsigned char* fragmentLoc =
502 (
unsigned char*) curRef->getDataLocation();
503 unsigned long sourceSize = 0;
504 unsigned char* sourceLoc = 0;
511 sourceLoc = do_fragmentLocation(fragmentLoc);
513 else if (fragmentLoc)
515 I2O_MESSAGE_FRAME *i2oMsg = (I2O_MESSAGE_FRAME*) fragmentLoc;
516 sourceSize = i2oMsg->MessageSize * 4;
517 sourceLoc = fragmentLoc;
522 std::copy(sourceLoc, sourceLoc+sourceSize, targetLoc);
523 targetLoc += sourceSize;
526 curRef = curRef->getNextReference();
529 return static_cast<unsigned int>(fullSize);
534 return do_headerSize();
539 return do_headerLocation();
550 std::string URL(smMsg->
hltURL, size);
555 return "unavailable";
572 return "unavailable";
578 return do_outputModuleLabel();
583 return do_outputModuleId();
588 return do_nExpectedEPs();
593 return do_topFolderName();
603 do_hltTriggerNames(nameList);
608 do_hltTriggerSelections(nameList);
613 do_l1TriggerNames(nameList);
618 return do_hltTriggerCount();
624 do_hltTriggerBits(bitList);
629 do_assertRunNumber(runNumber);
634 return do_runNumber();
639 return do_lumiSection();
644 return do_eventNumber();
649 return do_adler32Checksum();
654 streamTags_.push_back(streamId);
659 eventConsumerTags_.push_back(queueId);
664 dqmEventConsumerTags_.push_back(queueId);
674 return eventConsumerTags_;
679 return dqmEventConsumerTags_;
684 return do_droppedEventsCount();
689 do_setDroppedEventsCount(count);
694 return ( i2oMessageCode_ == I2O_EVM_LUMISECTION );
701 I2O_PRIVATE_MESSAGE_FRAME *pvtMsg =
702 (I2O_PRIVATE_MESSAGE_FRAME*) ref->getDataLocation();
705 faultyBits_ |= maskToUse;
715 I2O_PRIVATE_MESSAGE_FRAME *pvtMsg =
716 (I2O_PRIVATE_MESSAGE_FRAME*) ref->getDataLocation();
717 if ((
size_t)(pvtMsg->StdMessageFrame.MessageSize*4) <
720 faultyBits_ |= maskToUse;
734 faultyBits_ |= maskToUse;
746 if (smMsg->
numFrames != expectedNumberOfFragments_)
748 faultyBits_ |= maskToUse;
758 int problemCount = 0;
761 int thisIndex =
static_cast<int>(smMsg->
frameCount);
762 if (thisIndex == indexValue)
764 faultyBits_ |= DUPLICATE_FRAGMENT;
767 else if (thisIndex < indexValue)
769 faultyBits_ |= FRAGMENTS_OUT_OF_ORDER;
772 indexValue = thisIndex;
773 return (problemCount == 0);
778 unsigned short expectedI2OMessageCode)
780 I2O_PRIVATE_MESSAGE_FRAME *pvtMsg =
781 (I2O_PRIVATE_MESSAGE_FRAME*) ref->getDataLocation();
782 if (pvtMsg->XFunctionCode != expectedI2OMessageCode)
784 faultyBits_ |= CORRUPT_SECONDARY_HEADER;
792 if ( !complete() || !headerOkay() )
return false;
794 const uint32_t expected = adler32Checksum();
795 if (expected == 0)
return false;
797 const uint32_t calculated = calculateAdler32();
799 if ( calculated != expected )
801 faultyBits_ |= WRONG_CHECKSUM;
809 uint32_t adler = adler32(0
L, 0, 0);
811 toolbox::mem::Reference* curRef = ref_;
817 const unsigned long headerSize = do_headerSize();
820 const unsigned long payloadSize = curRef->getDataSize() - do_i2oFrameSize();
821 if ( headerSize > payloadSize )
824 offset = headerSize - payloadSize;
828 const unsigned char* dataLocation =
829 do_fragmentLocation((
unsigned char*)curRef->getDataLocation()) + headerSize;
830 adler = adler32(adler, dataLocation, smMsg->
dataSize - headerSize);
833 curRef = curRef->getNextReference();
839 const unsigned long payloadSize = curRef->getDataSize() - do_i2oFrameSize();
840 if ( offset > payloadSize )
842 offset -= payloadSize;
846 const unsigned char* dataLocation =
847 do_fragmentLocation((
unsigned char*)curRef->getDataLocation()) + offset;
848 adler = adler32(adler, dataLocation, smMsg->
dataSize - offset);
852 curRef = curRef->getNextReference();
860 std::stringstream
msg;
861 msg <<
"An output module label is only available from a valid, ";
862 msg <<
"complete INIT message.";
863 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
868 std::stringstream
msg;
869 msg <<
"An output module ID is only available from a valid, ";
870 msg <<
"complete INIT or Event message.";
871 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
876 std::stringstream
msg;
877 msg <<
"The number of slave EPs is only available from a valid, ";
878 msg <<
"complete INIT message.";
879 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
884 std::stringstream
msg;
885 msg <<
"A top folder name is only available from a valid, ";
886 msg <<
"complete DQM event message.";
887 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
892 std::stringstream
msg;
893 msg <<
"The DQM key is only available from a valid, ";
894 msg <<
"complete DQM event message.";
895 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
900 std::stringstream
msg;
901 msg <<
"The HLT trigger names are only available from a valid, ";
902 msg <<
"complete INIT message.";
903 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
908 std::stringstream
msg;
909 msg <<
"The HLT trigger selections are only available from a valid, ";
910 msg <<
"complete INIT message.";
911 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
916 std::stringstream
msg;
917 msg <<
"The L1 trigger names are only available from a valid, ";
918 msg <<
"complete INIT message.";
919 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
924 std::stringstream
msg;
925 msg <<
"An HLT trigger count is only available from a valid, ";
926 msg <<
"complete Event message.";
927 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
933 std::stringstream
msg;
934 msg <<
"The HLT trigger bits are only available from a valid, ";
935 msg <<
"complete Event message.";
936 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
942 std::stringstream
msg;
943 msg <<
"Dropped events count can only be retrieved from a valid, ";
944 msg <<
"complete Event message.";
945 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
951 std::stringstream
msg;
952 msg <<
"Dropped events count can only be added to a valid, ";
953 msg <<
"complete Event message.";
954 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
959 std::stringstream
msg;
960 msg <<
"A run number is only available from a valid, ";
961 msg <<
"complete EVENT or ERROR_EVENT message.";
962 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
967 std::stringstream
msg;
968 msg <<
"A luminosity section is only available from a valid, ";
969 msg <<
"complete EVENT or ERROR_EVENT message.";
970 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
975 std::stringstream
msg;
976 msg <<
"An event number is only available from a valid, ";
977 msg <<
"complete EVENT or ERROR_EVENT message.";
978 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
TimePoint_t getCurrentTime()
virtual void do_hltTriggerSelections(Strings &nameList) const
bool validateFragmentIndexAndCount(toolbox::mem::Reference *ref, BitMasksForFaulty maskToUse)
size_t memoryUsed() const
unsigned char * dataLocation(int fragmentIndex) const
unsigned short i2oMessageCode_
std::vector< StreamID > const & getStreamTags() const
virtual uint32_t do_lumiSection() const
unsigned long headerSize() const
uint32_t runNumber() const
unsigned char * headerLocation() const
virtual void do_l1TriggerNames(Strings &nameList) const
bool validateMessageSize(toolbox::mem::Reference *ref, BitMasksForFaulty maskToUse)
void hltTriggerNames(Strings &nameList) const
toolbox::mem::Reference * ref_
std::vector< QueueID > QueueIDs
unsigned long * getBufferData() const
std::string hltURL() const
uint32_t adler32Checksum() const
std::vector< StreamID > streamTags_
utils::TimePoint_t creationTime_
void addFirstFragment(toolbox::mem::Reference *)
virtual uint32_t do_outputModuleId() const
void assertRunNumber(uint32_t runNumber)
uint32_t outputModuleId() const
virtual void do_hltTriggerNames(Strings &nameList) const
void addToChain(ChainData const &)
unsigned int getFragmentID(int fragmentIndex) const
unsigned int expectedNumberOfFragments_
void setDroppedEventsCount(unsigned int)
utils::TimePoint_t lastFragmentTime_
bool validateDataLocation(toolbox::mem::Reference *ref, BitMasksForFaulty maskToUse)
virtual uint32_t do_eventNumber() const
ChainData(unsigned short i2oMessageCode=0x9999, unsigned int messageCode=Header::INVALID)
virtual unsigned int do_droppedEventsCount() const
unsigned int copyFragmentsIntoBuffer(std::vector< unsigned char > &buff) const
void swap(ChainData &other)
bool validateAdler32Checksum()
virtual void do_setDroppedEventsCount(unsigned int)
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
uint32_t eventNumber() const
bool validateExpectedFragmentCount(toolbox::mem::Reference *ref, BitMasksForFaulty maskToUse)
QueueIDs const & getEventConsumerTags() const
bool isEndOfLumiSectionMessage() const
virtual void do_hltTriggerBits(std::vector< unsigned char > &bitList) const
unsigned int offset(bool)
utils::TimePoint_t creationTime() const
uint32_t lumiSection() const
void hltTriggerSelections(Strings &nameList) const
uint32_t nExpectedEPs() const
virtual uint32_t do_hltTriggerCount() const
virtual DQMKey do_dqmKey() const
unsigned long totalDataSize() const
void tagForDQMEventConsumer(QueueID)
uint32_t hltTriggerCount() const
void tagForEventConsumer(QueueID)
void checkForCompleteness()
unsigned int fragmentCount_
void hltTriggerBits(std::vector< unsigned char > &bitList) const
std::string topFolderName() const
QueueIDs const & getDQMEventConsumerTags() const
uint32_t calculateAdler32() const
virtual std::string do_outputModuleLabel() const
bool validateMessageCode(toolbox::mem::Reference *ref, unsigned short expectedI2OMessageCode)
unsigned long dataSize(int fragmentIndex) const
unsigned int droppedEventsCount() const
QueueIDs eventConsumerTags_
char hltURL[MAX_I2O_SM_URLCHARS]
virtual std::string do_topFolderName() const
char hltClassName[MAX_I2O_SM_URLCHARS]
void tagForStream(StreamID)
unsigned int messageCode_
std::string hltClassName() const
#define MAX_I2O_SM_URLCHARS
virtual uint32_t do_nExpectedEPs() const
virtual uint32_t do_runNumber() const
std::string outputModuleLabel() const
QueueIDs dqmEventConsumerTags_
bool validateFragmentOrder(toolbox::mem::Reference *ref, int &indexValue)
tuple size
Write out results.
std::string className(const T &t)
unsigned int faultyBits() const
utils::TimePoint_t staleWindowStartTime_
void l1TriggerNames(Strings &nameList) const