17 #include "interface/shared/i2oXFunctionCodes.h"
18 #include "interface/shared/version.h"
28 const unsigned int messageCode) :
31 dqmEventConsumerTags_(),
34 faultyBits_(INCOMPLETE_MESSAGE),
35 messageCode_(messageCode),
36 i2oMessageCode_(i2oMessageCode),
39 expectedNumberOfFragments_(0),
47 #ifdef STOR_DEBUG_CORRUPT_MESSAGES
48 double r =
rand()/
static_cast<double>(RAND_MAX);
56 std::cout <<
"Simulating faulty I2O message" << std::endl;
59 #endif // STOR_DEBUG_CORRUPT_MESSAGES
76 try { ref_->release(); }
94 return (faultyBits_ != 0);
96 return (faultyBits_ != INCOMPLETE_MESSAGE);
107 ((faultyBits_ & INVALID_INITIAL_REFERENCE & ~INCOMPLETE_MESSAGE) == 0) &&
108 ((faultyBits_ & CORRUPT_INITIAL_HEADER & ~INCOMPLETE_MESSAGE) == 0);
113 return ( (faultyBits_ & ~WRONG_CHECKSUM) == 0);
120 XCEPT_RAISE(stor::exception::I2OChain,
"Cannot add a first fragment to a non-empty I2OChain.");
129 fragKey_.secondaryId_ =
static_cast<uint32_t
>(
130 (uintptr_t)pRef->getDataLocation()
135 fragKey_.secondaryId_ =
static_cast<uint32_t
>(
time(0) );
141 lastFragmentTime_ = creationTime_;
142 staleWindowStartTime_ = creationTime_;
146 int workingIndex = -1;
148 if (validateDataLocation(pRef, INVALID_INITIAL_REFERENCE) &&
149 validateMessageSize(pRef, CORRUPT_INITIAL_HEADER) &&
150 validateFragmentIndexAndCount(pRef, CORRUPT_INITIAL_HEADER))
154 expectedNumberOfFragments_ = smMsg->
numFrames;
155 validateFragmentOrder(pRef, workingIndex);
159 toolbox::mem::Reference* curRef = pRef->getNextReference();
164 if (validateDataLocation(curRef, INVALID_SECONDARY_REFERENCE) &&
165 validateMessageSize(curRef, CORRUPT_SECONDARY_HEADER) &&
166 validateFragmentIndexAndCount(curRef, CORRUPT_SECONDARY_HEADER))
168 validateExpectedFragmentCount(curRef, TOTAL_COUNT_MISMATCH);
169 validateFragmentOrder(curRef, workingIndex);
170 validateMessageCode(curRef, i2oMessageCode_);
173 curRef = curRef->getNextReference();
177 checkForCompleteness();
184 addFirstFragment(newpart.
ref_);
188 if (parsable() && newpart.
parsable())
191 toolbox::mem::Reference* newRef = newpart.
ref_;
195 toolbox::mem::Reference* nextNewRef = newRef->getNextReference();
196 newRef->setNextReference(0);
204 if (newRef == newpart.
ref_) {newRef = newpart.
ref_->duplicate();}
207 bool fragmentWasAdded =
false;
216 unsigned int newFragmentTotalCount = thatMsg->
numFrames;
217 if (newFragmentTotalCount != expectedNumberOfFragments_)
219 faultyBits_ |= TOTAL_COUNT_MISMATCH;
225 unsigned int firstIndex = fragMsg->
frameCount;
227 if (newIndex < firstIndex)
229 newRef->setNextReference(ref_);
231 fragmentWasAdded =
true;
238 toolbox::mem::Reference* curRef = ref_;
239 for (
unsigned int idx = 0; idx < fragmentCount_; ++idx)
247 if (newIndex == curIndex)
249 faultyBits_ |= DUPLICATE_FRAGMENT;
250 newRef->setNextReference(curRef->getNextReference());
251 curRef->setNextReference(newRef);
252 fragmentWasAdded =
true;
259 toolbox::mem::Reference* nextRef = curRef->getNextReference();
262 curRef->setNextReference(newRef);
263 fragmentWasAdded =
true;
271 if (newIndex > curIndex && newIndex < nextIndex)
273 newRef->setNextReference(curRef->getNextReference());
274 curRef->setNextReference(newRef);
275 fragmentWasAdded =
true;
284 if (!fragmentWasAdded)
288 XCEPT_RAISE(stor::exception::I2OChain,
289 "A fragment was unable to be added to a chain.");
302 toolbox::mem::Reference* curRef = newpart.
ref_;
305 curRef = curRef->getNextReference();
309 toolbox::mem::Reference* lastRef = ref_;
310 curRef = ref_->getNextReference();
313 curRef = curRef->getNextReference();
315 lastRef->setNextReference(newpart.
ref_->duplicate());
319 staleWindowStartTime_ = lastFragmentTime_;
333 staleWindowStartTime_ = lastFragmentTime_;
339 checkForCompleteness();
344 if ((fragmentCount_ == expectedNumberOfFragments_) &&
345 ((faultyBits_ & (TOTAL_COUNT_MISMATCH | FRAGMENTS_OUT_OF_ORDER | DUPLICATE_FRAGMENT)) == 0))
351 faultyBits_ &= ~INCOMPLETE_MESSAGE;
353 validateAdler32Checksum();
358 faultyBits_ |= EXTERNALLY_REQUESTED;
363 faultyBits_ |= CORRUPT_INITIAL_HEADER;
369 ?
static_cast<unsigned long*
>(ref_->getDataLocation())
393 size_t memoryUsed = 0;
394 toolbox::mem::Reference* curRef = ref_;
397 memoryUsed += curRef->getDataSize();
398 curRef = curRef->getNextReference();
405 unsigned long totalSize = 0;
406 toolbox::mem::Reference* curRef = ref_;
407 for (
unsigned int idx = 0; idx < fragmentCount_; ++idx)
409 I2O_MESSAGE_FRAME *i2oMsg =
410 (I2O_MESSAGE_FRAME*) curRef->getDataLocation();
419 totalSize += (i2oMsg->MessageSize*4);
422 curRef = curRef->getNextReference();
429 toolbox::mem::Reference* curRef = ref_;
430 for (
int idx = 0; idx < fragmentIndex; ++idx)
432 curRef = curRef->getNextReference();
435 I2O_MESSAGE_FRAME *i2oMsg =
436 (I2O_MESSAGE_FRAME*) curRef->getDataLocation();
445 return (i2oMsg->MessageSize*4);
452 toolbox::mem::Reference* curRef = ref_;
453 for (
int idx = 0; idx < fragmentIndex; ++idx)
455 curRef = curRef->getNextReference();
460 return do_fragmentLocation(static_cast<unsigned char*>
461 (curRef->getDataLocation()));
465 return static_cast<unsigned char*
>(curRef->getDataLocation());
471 toolbox::mem::Reference* curRef = ref_;
472 for (
int idx = 0; idx < fragmentIndex; ++idx)
474 curRef = curRef->getNextReference();
492 unsigned long fullSize = totalDataSize();
493 if (targetBuffer.capacity() < fullSize)
495 targetBuffer.resize(fullSize);
497 unsigned char* targetLoc = (
unsigned char*)&targetBuffer[0];
499 toolbox::mem::Reference* curRef = ref_;
502 unsigned char* fragmentLoc =
503 (
unsigned char*) curRef->getDataLocation();
504 unsigned long sourceSize = 0;
505 unsigned char* sourceLoc = 0;
512 sourceLoc = do_fragmentLocation(fragmentLoc);
514 else if (fragmentLoc)
516 I2O_MESSAGE_FRAME *i2oMsg = (I2O_MESSAGE_FRAME*) fragmentLoc;
517 sourceSize = i2oMsg->MessageSize * 4;
518 sourceLoc = fragmentLoc;
523 std::copy(sourceLoc, sourceLoc+sourceSize, targetLoc);
524 targetLoc += sourceSize;
527 curRef = curRef->getNextReference();
530 return static_cast<unsigned int>(fullSize);
535 return do_headerSize();
540 return do_headerLocation();
551 std::string URL(smMsg->
hltURL, size);
556 return "unavailable";
573 return "unavailable";
579 return do_outputModuleId();
585 return do_outputModuleLabel();
590 return do_topFolderName();
600 do_hltTriggerNames(nameList);
605 do_hltTriggerSelections(nameList);
610 do_l1TriggerNames(nameList);
615 return do_hltTriggerCount();
621 do_hltTriggerBits(bitList);
626 do_assertRunNumber(runNumber);
631 return do_runNumber();
636 return do_lumiSection();
641 return do_eventNumber();
646 return do_adler32Checksum();
651 streamTags_.push_back(streamId);
656 eventConsumerTags_.push_back(queueId);
661 dqmEventConsumerTags_.push_back(queueId);
671 return eventConsumerTags_;
676 return dqmEventConsumerTags_;
681 return do_droppedEventsCount();
686 do_setDroppedEventsCount(count);
691 return ( i2oMessageCode_ == I2O_EVM_LUMISECTION );
698 I2O_PRIVATE_MESSAGE_FRAME *pvtMsg =
699 (I2O_PRIVATE_MESSAGE_FRAME*) ref->getDataLocation();
702 faultyBits_ |= maskToUse;
712 I2O_PRIVATE_MESSAGE_FRAME *pvtMsg =
713 (I2O_PRIVATE_MESSAGE_FRAME*) ref->getDataLocation();
714 if ((
size_t)(pvtMsg->StdMessageFrame.MessageSize*4) <
717 faultyBits_ |= maskToUse;
731 faultyBits_ |= maskToUse;
743 if (smMsg->
numFrames != expectedNumberOfFragments_)
745 faultyBits_ |= maskToUse;
755 int problemCount = 0;
758 int thisIndex =
static_cast<int>(smMsg->
frameCount);
759 if (thisIndex == indexValue)
761 faultyBits_ |= DUPLICATE_FRAGMENT;
764 else if (thisIndex < indexValue)
766 faultyBits_ |= FRAGMENTS_OUT_OF_ORDER;
769 indexValue = thisIndex;
770 return (problemCount == 0);
775 unsigned short expectedI2OMessageCode)
777 I2O_PRIVATE_MESSAGE_FRAME *pvtMsg =
778 (I2O_PRIVATE_MESSAGE_FRAME*) ref->getDataLocation();
779 if (pvtMsg->XFunctionCode != expectedI2OMessageCode)
781 faultyBits_ |= CORRUPT_SECONDARY_HEADER;
789 if ( !complete() || !headerOkay() )
return false;
791 const uint32_t expected = adler32Checksum();
792 if (expected == 0)
return false;
794 const uint32_t calculated = calculateAdler32();
796 if ( calculated != expected )
798 faultyBits_ |= WRONG_CHECKSUM;
806 uint32_t adler = adler32(0
L, 0, 0);
808 toolbox::mem::Reference* curRef = ref_;
814 const unsigned long headerSize = do_headerSize();
817 const unsigned long payloadSize = curRef->getDataSize() - do_i2oFrameSize();
818 if ( headerSize > payloadSize )
821 offset = headerSize - payloadSize;
825 const unsigned char* dataLocation =
826 do_fragmentLocation((
unsigned char*)curRef->getDataLocation()) + headerSize;
827 adler = adler32(adler, dataLocation, smMsg->
dataSize - headerSize);
830 curRef = curRef->getNextReference();
836 const unsigned long payloadSize = curRef->getDataSize() - do_i2oFrameSize();
837 if ( offset > payloadSize )
839 offset -= payloadSize;
843 const unsigned char* dataLocation =
844 do_fragmentLocation((
unsigned char*)curRef->getDataLocation()) + offset;
845 adler = adler32(adler, dataLocation, smMsg->
dataSize - offset);
849 curRef = curRef->getNextReference();
857 std::stringstream
msg;
858 msg <<
"An output module ID is only available from a valid, ";
859 msg <<
"complete INIT or Event message.";
860 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
865 std::stringstream
msg;
866 msg <<
"An output module label is only available from a valid, ";
867 msg <<
"complete INIT message.";
868 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
873 std::stringstream
msg;
874 msg <<
"A top folder name is only available from a valid, ";
875 msg <<
"complete DQM event message.";
876 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
881 std::stringstream
msg;
882 msg <<
"The DQM key is only available from a valid, ";
883 msg <<
"complete DQM event message.";
884 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
889 std::stringstream
msg;
890 msg <<
"The HLT trigger names are only available from a valid, ";
891 msg <<
"complete INIT message.";
892 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
897 std::stringstream
msg;
898 msg <<
"The HLT trigger selections are only available from a valid, ";
899 msg <<
"complete INIT message.";
900 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
905 std::stringstream
msg;
906 msg <<
"The L1 trigger names are only available from a valid, ";
907 msg <<
"complete INIT message.";
908 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
913 std::stringstream
msg;
914 msg <<
"An HLT trigger count is only available from a valid, ";
915 msg <<
"complete Event message.";
916 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
922 std::stringstream
msg;
923 msg <<
"The HLT trigger bits are only available from a valid, ";
924 msg <<
"complete Event message.";
925 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
931 std::stringstream
msg;
932 msg <<
"Dropped events count can only be retrieved from a valid, ";
933 msg <<
"complete Event message.";
934 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
940 std::stringstream
msg;
941 msg <<
"Dropped events count can only be added to a valid, ";
942 msg <<
"complete Event message.";
943 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
948 std::stringstream
msg;
949 msg <<
"A run number is only available from a valid, ";
950 msg <<
"complete EVENT or ERROR_EVENT message.";
951 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
956 std::stringstream
msg;
957 msg <<
"A luminosity section is only available from a valid, ";
958 msg <<
"complete EVENT or ERROR_EVENT message.";
959 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
964 std::stringstream
msg;
965 msg <<
"An event number is only available from a valid, ";
966 msg <<
"complete EVENT or ERROR_EVENT message.";
967 XCEPT_RAISE(stor::exception::WrongI2OMessageType, msg.str());
TimePoint_t getCurrentTime()
virtual void do_hltTriggerSelections(Strings &nameList) const
bool validateFragmentIndexAndCount(toolbox::mem::Reference *ref, BitMasksForFaulty maskToUse)
size_t memoryUsed() const
unsigned char * dataLocation(int fragmentIndex) const
unsigned short i2oMessageCode_
std::vector< StreamID > const & getStreamTags() const
virtual uint32_t do_lumiSection() const
unsigned long headerSize() const
uint32_t runNumber() const
unsigned char * headerLocation() const
virtual void do_l1TriggerNames(Strings &nameList) const
bool validateMessageSize(toolbox::mem::Reference *ref, BitMasksForFaulty maskToUse)
void hltTriggerNames(Strings &nameList) const
toolbox::mem::Reference * ref_
std::vector< QueueID > QueueIDs
unsigned long * getBufferData() const
std::string hltURL() const
uint32_t adler32Checksum() const
std::vector< StreamID > streamTags_
utils::TimePoint_t creationTime_
void addFirstFragment(toolbox::mem::Reference *)
virtual uint32_t do_outputModuleId() const
void assertRunNumber(uint32_t runNumber)
uint32_t outputModuleId() const
virtual void do_hltTriggerNames(Strings &nameList) const
void addToChain(ChainData const &)
unsigned int getFragmentID(int fragmentIndex) const
unsigned int expectedNumberOfFragments_
void setDroppedEventsCount(unsigned int)
utils::TimePoint_t lastFragmentTime_
bool validateDataLocation(toolbox::mem::Reference *ref, BitMasksForFaulty maskToUse)
virtual uint32_t do_eventNumber() const
ChainData(unsigned short i2oMessageCode=0x9999, unsigned int messageCode=Header::INVALID)
virtual unsigned int do_droppedEventsCount() const
unsigned int copyFragmentsIntoBuffer(std::vector< unsigned char > &buff) const
void swap(ChainData &other)
bool validateAdler32Checksum()
virtual void do_setDroppedEventsCount(unsigned int)
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
uint32_t eventNumber() const
bool validateExpectedFragmentCount(toolbox::mem::Reference *ref, BitMasksForFaulty maskToUse)
QueueIDs const & getEventConsumerTags() const
bool isEndOfLumiSectionMessage() const
virtual void do_hltTriggerBits(std::vector< unsigned char > &bitList) const
unsigned int offset(bool)
utils::TimePoint_t creationTime() const
uint32_t lumiSection() const
void hltTriggerSelections(Strings &nameList) const
virtual uint32_t do_hltTriggerCount() const
virtual DQMKey do_dqmKey() const
unsigned long totalDataSize() const
void tagForDQMEventConsumer(QueueID)
uint32_t hltTriggerCount() const
void tagForEventConsumer(QueueID)
void checkForCompleteness()
unsigned int fragmentCount_
void hltTriggerBits(std::vector< unsigned char > &bitList) const
std::string topFolderName() const
QueueIDs const & getDQMEventConsumerTags() const
uint32_t calculateAdler32() const
virtual std::string do_outputModuleLabel() const
bool validateMessageCode(toolbox::mem::Reference *ref, unsigned short expectedI2OMessageCode)
unsigned long dataSize(int fragmentIndex) const
unsigned int droppedEventsCount() const
QueueIDs eventConsumerTags_
char hltURL[MAX_I2O_SM_URLCHARS]
virtual std::string do_topFolderName() const
char hltClassName[MAX_I2O_SM_URLCHARS]
void tagForStream(StreamID)
unsigned int messageCode_
std::string hltClassName() const
#define MAX_I2O_SM_URLCHARS
virtual uint32_t do_runNumber() const
std::string outputModuleLabel() const
QueueIDs dqmEventConsumerTags_
bool validateFragmentOrder(toolbox::mem::Reference *ref, int &indexValue)
tuple size
Write out results.
std::string className(const T &t)
unsigned int faultyBits() const
utils::TimePoint_t staleWindowStartTime_
void l1TriggerNames(Strings &nameList) const