00001
00002
00003
00004
00005
00013
00014
00015
00016
00017
00018
00019
00020 #include "DataFormats/Provenance/interface/EventID.h"
00021 #include "FWCore/Framework/interface/EDAnalyzer.h"
00022 #include "FWCore/Framework/interface/Event.h"
00023 #include "FWCore/Framework/interface/LuminosityBlock.h"
00024 #include "FWCore/Framework/interface/MakerMacros.h"
00025 #include "FWCore/Framework/interface/Run.h"
00026 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
00027 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00028 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
00029 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
00030
00031
00032 #include <boost/shared_ptr.hpp>
00033
00034 #include <algorithm>
00035 #include <map>
00036 #include <memory>
00037 #include <vector>
00038 #include <signal.h>
00039 #include <sys/ipc.h>
00040 #include <sys/msg.h>
00041 #include <sys/signal.h>
00042 #include <sys/types.h>
00043 #include <sys/wait.h>
00044 #include <unistd.h>
00045 #include <errno.h>
00046
00047 #include <boost/thread/thread.hpp>
00048
00049
00050
00051
00052 class MulticoreRunLumiEventChecker : public edm::EDAnalyzer {
00053 public:
00054 explicit MulticoreRunLumiEventChecker(edm::ParameterSet const&);
00055 ~MulticoreRunLumiEventChecker();
00056 static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
00057
00058 private:
00059 virtual void beginJob();
00060 virtual void analyze(edm::Event const&, edm::EventSetup const&);
00061 virtual void endJob();
00062 virtual void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren);
00063 virtual void preForkReleaseResources();
00064
00065 virtual void beginRun(edm::Run const& run, edm::EventSetup const& es);
00066 virtual void endRun(edm::Run const& run, edm::EventSetup const& es);
00067
00068 virtual void beginLuminosityBlock(edm::LuminosityBlock const& lumi, edm::EventSetup const& es);
00069 virtual void endLuminosityBlock(edm::LuminosityBlock const& lumi, edm::EventSetup const& es);
00070
00071 void check(edm::EventID const& iID, bool isEvent);
00072
00073
00074 std::vector<edm::EventID> ids_;
00075 unsigned int index_;
00076 std::map<edm::EventID, unsigned int> seenIDs_;
00077
00078 unsigned int multiProcessSequentialEvents_;
00079 unsigned int numberOfEventsLeftBeforeSearch_;
00080 bool mustSearch_;
00081
00082 boost::shared_ptr<boost::thread> listenerThread_;
00083 int messageQueue_;
00084 };
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097 MulticoreRunLumiEventChecker::MulticoreRunLumiEventChecker(edm::ParameterSet const& iConfig) :
00098 ids_(iConfig.getUntrackedParameter<std::vector<edm::EventID> >("eventSequence")),
00099 index_(0),
00100 multiProcessSequentialEvents_(iConfig.getUntrackedParameter<unsigned int>("multiProcessSequentialEvents")),
00101 numberOfEventsLeftBeforeSearch_(0),
00102 mustSearch_(false),
00103 messageQueue_(-1)
00104 {
00105
00106 }
00107
00108
00109 MulticoreRunLumiEventChecker::~MulticoreRunLumiEventChecker() {
00110
00111
00112 }
00113
00114
00115
00116
00117
00118 namespace {
00119 struct CompareWithoutLumi {
00120 CompareWithoutLumi(edm::EventID const& iThis):
00121 m_this(iThis) {}
00122 bool operator()(edm::EventID const& iOther) {
00123 return m_this.run() == iOther.run() && m_this.event() == iOther.event();
00124 }
00125 edm::EventID m_this;
00126 };
00127
00128 struct MsgToListener {
00129 long mtype;
00130 edm::EventID id;
00131 MsgToListener():
00132 mtype(MsgToListener::messageType()) {}
00133 static size_t sizeForBuffer() {
00134 return sizeof(MsgToListener)-sizeof(long);
00135 }
00136 static long messageType() { return 10;}
00137 };
00138
00139 class Listener {
00140 public:
00141 Listener(std::map<edm::EventID, unsigned int>* iToFill, int iQueueID, unsigned int iMaxChildren):
00142 fill_(iToFill),
00143 queueID_(iQueueID),
00144 maxChildren_(iMaxChildren),
00145 stoppedChildren_(0){}
00146
00147 void operator()(){
00148 for(;;) {
00149 MsgToListener rcvmsg;
00150 if(msgrcv(queueID_, &rcvmsg, MsgToListener::sizeForBuffer(), MsgToListener::messageType(), 0) < 0) {
00151 perror("failed to receive message from controller");
00152 exit(EXIT_FAILURE);
00153 }
00154 if(rcvmsg.id.run() == 0) {
00155 ++stoppedChildren_;
00156 if(stoppedChildren_ == maxChildren_) {
00157 return;
00158 }
00159 continue;
00160 }
00161 ++((*fill_)[rcvmsg.id]);
00162 }
00163 }
00164
00165 private:
00166 std::map<edm::EventID, unsigned int>* fill_;
00167 int queueID_;
00168 unsigned int maxChildren_;
00169 unsigned int stoppedChildren_;
00170 };
00171 }
00172
00173 void
00174 MulticoreRunLumiEventChecker::check(edm::EventID const& iEventID, bool iIsEvent) {
00175 if(mustSearch_) {
00176 if(0 == numberOfEventsLeftBeforeSearch_) {
00177 if(iIsEvent) {
00178 numberOfEventsLeftBeforeSearch_ = multiProcessSequentialEvents_;
00179 }
00180
00181
00182 std::vector<edm::EventID>::iterator itFind= std::find(ids_.begin() + index_, ids_.end(), iEventID);
00183 if(itFind == ids_.end()) {
00184 throw cms::Exception("MissedEvent") << "The event " << iEventID << "is not in the list.\n";
00185 }
00186 index_ = itFind-ids_.begin();
00187 }
00188 if(iIsEvent) {
00189 --numberOfEventsLeftBeforeSearch_;
00190 }
00191 MsgToListener sndmsg;
00192 sndmsg.id = iEventID;
00193 errno = 0;
00194 int value = msgsnd(messageQueue_, &sndmsg, MsgToListener::sizeForBuffer(), 0);
00195 if(value != 0) {
00196 throw cms::Exception("MessageFailure") << "Failed to send EventID message " << strerror(errno);
00197 }
00198 }
00199
00200 if(index_ >= ids_.size()) {
00201 throw cms::Exception("TooManyEvents") << "Was passes " << ids_.size() << " EventIDs but have processed more events than that\n";
00202 }
00203 if(iEventID != ids_[index_]) {
00204 throw cms::Exception("WrongEvent") << "Was expecting event " << ids_[index_] << " but was given " << iEventID << "\n";
00205 }
00206 ++index_;
00207 }
00208
00209
00210 void
00211 MulticoreRunLumiEventChecker::analyze(edm::Event const& iEvent, edm::EventSetup const&) {
00212 check(iEvent.id(), true);
00213 }
00214
00215 void
00216 MulticoreRunLumiEventChecker::beginRun(edm::Run const& run, edm::EventSetup const&) {
00217 check(edm::EventID(run.id().run(), 0, 0), false);
00218 }
00219 void
00220 MulticoreRunLumiEventChecker::endRun(edm::Run const& run, edm::EventSetup const&) {
00221 check(edm::EventID(run.id().run(), 0, 0), false);
00222 }
00223
00224 void
00225 MulticoreRunLumiEventChecker::beginLuminosityBlock(edm::LuminosityBlock const& lumi, edm::EventSetup const&) {
00226 check(edm::EventID(lumi.id().run(), lumi.id().luminosityBlock(), 0), false);
00227 }
00228
00229 void
00230 MulticoreRunLumiEventChecker::endLuminosityBlock(edm::LuminosityBlock const& lumi, edm::EventSetup const&) {
00231 check(edm::EventID(lumi.id().run(), lumi.id().luminosityBlock(), 0), false);
00232 }
00233
00234
00235
00236 void
00237 MulticoreRunLumiEventChecker::beginJob() {
00238 }
00239
00240
00241 void
00242 MulticoreRunLumiEventChecker::endJob() {
00243 if(mustSearch_) {
00244 MsgToListener sndmsg;
00245 sndmsg.id = edm::EventID();
00246 errno = 0;
00247 int value = msgsnd(messageQueue_,&sndmsg, MsgToListener::sizeForBuffer(),0);
00248 if(value != 0) {
00249 throw cms::Exception("MessageFailure")<<"Failed to send finished message "<<strerror(errno)<<"\n";
00250 }
00251 } else {
00252 if(index_ != ids_.size()) {
00253 throw cms::Exception("WrongNumberOfEvents")<<"Saw "<<index_<<" events but was supposed to see "<<ids_.size()<<"\n";
00254 }
00255 }
00256
00257 if(listenerThread_) {
00258 listenerThread_->join();
00259 msgctl(messageQueue_, IPC_RMID, 0);
00260
00261 std::set<edm::EventID> uniqueIDs(ids_.begin(), ids_.end());
00262 if(seenIDs_.size() != uniqueIDs.size()) {
00263 throw cms::Exception("WrongNumberOfEvents") << "Saw " << seenIDs_.size() << " events but was supposed to see " << ids_.size() << "\n";
00264 }
00265
00266 std::set<edm::EventID> duplicates;
00267 for(std::map<edm::EventID, unsigned int>::iterator it = seenIDs_.begin(), itEnd = seenIDs_.end();
00268 it != itEnd;
00269 ++it) {
00270 if(it->second > 1 && it->first.event() != 0) {
00271 duplicates.insert(it->first);
00272 }
00273 }
00274 if(duplicates.size() != 0) {
00275 throw cms::Exception("DuplicateEvents") << "saw " << duplicates.size() << " events\n";
00276 }
00277 }
00278 }
00279
00280
00281 void
00282 MulticoreRunLumiEventChecker::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
00283 edm::ParameterSetDescription desc;
00284 desc.addUntracked<std::vector<edm::EventID> >("eventSequence");
00285 desc.addUntracked<unsigned int>("multiProcessSequentialEvents", 0U);
00286 descriptions.add("eventIDChecker", desc);
00287 }
00288
00289 void
00290 MulticoreRunLumiEventChecker::preForkReleaseResources() {
00291
00292 messageQueue_ = msgget(IPC_PRIVATE, IPC_CREAT|0660);
00293 if(-1 == messageQueue_) {
00294 throw cms::Exception("FailedToCreateQueue")<<" call to 'msgget' failed to create a message queue. errno: "<<errno<<" "<<strerror(errno);
00295 }
00296 }
00297
00298 void
00299 MulticoreRunLumiEventChecker::postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren) {
00300 mustSearch_ = true;
00301
00302 if(0 == iChildIndex) {
00303
00304 sigset_t oldset;
00305 edm::disableAllSigs(&oldset);
00306
00307 Listener listener(&seenIDs_, messageQueue_, iNumberOfChildren);
00308 listenerThread_ = boost::shared_ptr<boost::thread>(new boost::thread(listener)) ;
00309 edm::reenableSigs(&oldset);
00310 }
00311 }
00312
00313
00314 DEFINE_FWK_MODULE(MulticoreRunLumiEventChecker);