CMS 3D CMS Logo

/data/doxygen/doxygen-1.7.3/gen/CMSSW_4_2_8/src/FWCore/Modules/src/MulticoreRunLumiEventChecker.cc

Go to the documentation of this file.
00001 // -*- C++ -*-
00002 //
00003 // Package:    Modules
00004 // Class:      MulticoreRunLumiEventChecker
00005 //
00013 //
00014 // Original Author:  Chris Jones
00015 //         Created:  Tue Jun 16 15:42:17 CDT 2009
00016 //
00017 //
00018 
00019 // user include files
00020 #include "DataFormats/Provenance/interface/EventID.h"
00021 #include "FWCore/Framework/interface/EDAnalyzer.h"
00022 #include "FWCore/Framework/interface/Event.h"
00023 #include "FWCore/Framework/interface/LuminosityBlock.h"
00024 #include "FWCore/Framework/interface/MakerMacros.h"
00025 #include "FWCore/Framework/interface/Run.h"
00026 #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
00027 #include "FWCore/ParameterSet/interface/ParameterSet.h"
00028 #include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
00029 #include "FWCore/Utilities/interface/UnixSignalHandlers.h"
00030 
00031 // system include files
00032 #include <boost/shared_ptr.hpp>
00033 
00034 #include <algorithm>
00035 #include <map>
00036 #include <memory>
00037 #include <vector>
00038 #include <signal.h>
00039 #include <sys/ipc.h>
00040 #include <sys/msg.h>
00041 #include <sys/signal.h>
00042 #include <sys/types.h>
00043 #include <sys/wait.h>
00044 #include <unistd.h>
00045 #include <errno.h>
00046 
00047 #include <boost/thread/thread.hpp>
00048 //
00049 // class decleration
00050 //
00051 
00052 class MulticoreRunLumiEventChecker : public edm::EDAnalyzer {
00053 public:
00054    explicit MulticoreRunLumiEventChecker(edm::ParameterSet const&);
00055    ~MulticoreRunLumiEventChecker();
00056     static void fillDescriptions(edm::ConfigurationDescriptions& descriptions);
00057 
00058 private:
00059    virtual void beginJob();
00060    virtual void analyze(edm::Event const&, edm::EventSetup const&);
00061    virtual void endJob();
00062    virtual void postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren);
00063    virtual void preForkReleaseResources();
00064 
00065    virtual void beginRun(edm::Run const& run, edm::EventSetup const& es);
00066    virtual void endRun(edm::Run const& run, edm::EventSetup const& es);
00067    
00068    virtual void beginLuminosityBlock(edm::LuminosityBlock const& lumi, edm::EventSetup const& es);
00069    virtual void endLuminosityBlock(edm::LuminosityBlock const& lumi, edm::EventSetup const& es);
00070 
00071    void check(edm::EventID const& iID, bool isEvent);
00072    
00073    // ----------member data ---------------------------
00074    std::vector<edm::EventID> ids_;
00075    unsigned int index_;
00076    std::map<edm::EventID, unsigned int> seenIDs_;
00077 
00078    unsigned int multiProcessSequentialEvents_;
00079    unsigned int numberOfEventsLeftBeforeSearch_;
00080    bool mustSearch_;
00081    
00082    boost::shared_ptr<boost::thread> listenerThread_;
00083    int messageQueue_;
00084 };
00085 
00086 //
00087 // constants, enums and typedefs
00088 //
00089 
00090 //
00091 // static data member definitions
00092 //
00093 
00094 //
00095 // constructors and destructor
00096 //
00097 MulticoreRunLumiEventChecker::MulticoreRunLumiEventChecker(edm::ParameterSet const& iConfig) :
00098   ids_(iConfig.getUntrackedParameter<std::vector<edm::EventID> >("eventSequence")),
00099   index_(0),
00100   multiProcessSequentialEvents_(iConfig.getUntrackedParameter<unsigned int>("multiProcessSequentialEvents")),
00101   numberOfEventsLeftBeforeSearch_(0),
00102   mustSearch_(false),
00103   messageQueue_(-1)
00104 {
00105    //now do what ever initialization is needed
00106 }
00107 
00108 
00109 MulticoreRunLumiEventChecker::~MulticoreRunLumiEventChecker() {
00110    // do anything here that needs to be done at desctruction time
00111    // (e.g. close files, deallocate resources etc.)
00112 }
00113 
00114 //
00115 // member functions
00116 //
00117 
00118 namespace {
00119    struct CompareWithoutLumi {
00120       CompareWithoutLumi(edm::EventID const& iThis):
00121       m_this(iThis) {}
00122       bool operator()(edm::EventID const& iOther) {
00123          return m_this.run() == iOther.run() && m_this.event() == iOther.event();
00124       }
00125       edm::EventID m_this;
00126    };
00127    
00128    struct MsgToListener {
00129       long mtype;
00130       edm::EventID id;
00131       MsgToListener():
00132       mtype(MsgToListener::messageType()) {}
00133       static size_t sizeForBuffer() {
00134          return sizeof(MsgToListener)-sizeof(long);
00135       }
00136       static long messageType() { return 10;}
00137    };
00138    
00139    class Listener {
00140    public:
00141       Listener(std::map<edm::EventID, unsigned int>* iToFill, int iQueueID, unsigned int iMaxChildren):
00142       fill_(iToFill),
00143       queueID_(iQueueID),
00144       maxChildren_(iMaxChildren),
00145       stoppedChildren_(0){}
00146       
00147       void operator()(){
00148          for(;;) {
00149             MsgToListener rcvmsg;
00150             if(msgrcv(queueID_, &rcvmsg, MsgToListener::sizeForBuffer(), MsgToListener::messageType(), 0) < 0) {
00151                perror("failed to receive message from controller");
00152                exit(EXIT_FAILURE);
00153             }
00154             if(rcvmsg.id.run() == 0) {
00155                ++stoppedChildren_;
00156                if(stoppedChildren_ == maxChildren_) {
00157                   return;
00158                }
00159                continue;
00160             }
00161             ++((*fill_)[rcvmsg.id]);
00162          }
00163       }
00164       
00165    private:
00166       std::map<edm::EventID, unsigned int>* fill_;
00167       int queueID_;
00168       unsigned int maxChildren_;
00169       unsigned int stoppedChildren_;
00170    };
00171 }
00172 
00173 void
00174 MulticoreRunLumiEventChecker::check(edm::EventID const& iEventID, bool iIsEvent) {
00175    if(mustSearch_) { 
00176       if(0 == numberOfEventsLeftBeforeSearch_) {
00177          if(iIsEvent) {
00178             numberOfEventsLeftBeforeSearch_ = multiProcessSequentialEvents_;
00179          }
00180          //the event must be after the last event in our list since multicore doesn't go backwards
00181          //std::vector<edm::EventID>::iterator itFind= std::find_if(ids_.begin() + index_, ids_.end(), CompareWithoutLumi(iEventID));
00182          std::vector<edm::EventID>::iterator itFind= std::find(ids_.begin() + index_, ids_.end(), iEventID);
00183          if(itFind == ids_.end()) {
00184             throw cms::Exception("MissedEvent") << "The event " << iEventID << "is not in the list.\n";
00185          }
00186          index_ = itFind-ids_.begin();
00187       } 
00188       if(iIsEvent) {
00189          --numberOfEventsLeftBeforeSearch_;
00190       }
00191       MsgToListener sndmsg;
00192       sndmsg.id = iEventID;
00193       errno = 0;
00194       int value = msgsnd(messageQueue_, &sndmsg, MsgToListener::sizeForBuffer(), 0);
00195       if(value != 0) {
00196          throw cms::Exception("MessageFailure") << "Failed to send EventID message " << strerror(errno);
00197       }
00198    }
00199    
00200    if(index_ >= ids_.size()) {
00201       throw cms::Exception("TooManyEvents") << "Was passes " << ids_.size() << " EventIDs but have processed more events than that\n";
00202    }
00203    if(iEventID  != ids_[index_]) {
00204       throw cms::Exception("WrongEvent") << "Was expecting event " << ids_[index_] << " but was given " << iEventID << "\n";
00205    }
00206    ++index_;
00207 }
00208 
00209 // ------------ method called to for each event  ------------
00210 void
00211 MulticoreRunLumiEventChecker::analyze(edm::Event const& iEvent, edm::EventSetup const&) {
00212    check(iEvent.id(), true);
00213 }
00214 
00215 void 
00216 MulticoreRunLumiEventChecker::beginRun(edm::Run const& run, edm::EventSetup const&) {
00217    check(edm::EventID(run.id().run(), 0, 0), false);   
00218 }
00219 void 
00220 MulticoreRunLumiEventChecker::endRun(edm::Run const& run, edm::EventSetup const&) {
00221    check(edm::EventID(run.id().run(), 0, 0), false);   
00222 }
00223 
00224 void 
00225 MulticoreRunLumiEventChecker::beginLuminosityBlock(edm::LuminosityBlock const& lumi, edm::EventSetup const&) {
00226    check(edm::EventID(lumi.id().run(), lumi.id().luminosityBlock(), 0), false);   
00227 }
00228 
00229 void 
00230 MulticoreRunLumiEventChecker::endLuminosityBlock(edm::LuminosityBlock const& lumi, edm::EventSetup const&) {
00231    check(edm::EventID(lumi.id().run(), lumi.id().luminosityBlock(), 0), false);   
00232 }
00233 
00234 
00235 // ------------ method called once each job just before starting event loop  ------------
00236 void
00237 MulticoreRunLumiEventChecker::beginJob() {
00238 }
00239 
00240 // ------------ method called once each job just after ending the event loop  ------------
00241 void
00242 MulticoreRunLumiEventChecker::endJob() {
00243    if(mustSearch_) {
00244       MsgToListener sndmsg;
00245       sndmsg.id = edm::EventID();
00246       errno = 0;
00247       int value = msgsnd(messageQueue_,&sndmsg, MsgToListener::sizeForBuffer(),0);
00248       if(value != 0) {
00249         throw cms::Exception("MessageFailure")<<"Failed to send finished message "<<strerror(errno)<<"\n";
00250       }
00251    } else {
00252      if(index_ != ids_.size()) {
00253          throw cms::Exception("WrongNumberOfEvents")<<"Saw "<<index_<<" events but was supposed to see "<<ids_.size()<<"\n";
00254      }
00255    }
00256    
00257    if(listenerThread_) {
00258       listenerThread_->join();
00259       msgctl(messageQueue_, IPC_RMID, 0);
00260       
00261       std::set<edm::EventID> uniqueIDs(ids_.begin(), ids_.end());
00262       if(seenIDs_.size() != uniqueIDs.size()) {
00263          throw cms::Exception("WrongNumberOfEvents") << "Saw " << seenIDs_.size() << " events but was supposed to see " << ids_.size() << "\n";
00264       }
00265       
00266       std::set<edm::EventID> duplicates;
00267       for(std::map<edm::EventID, unsigned int>::iterator it = seenIDs_.begin(), itEnd = seenIDs_.end();
00268           it != itEnd;
00269           ++it) {
00270          if(it->second > 1 && it->first.event() != 0) {
00271             duplicates.insert(it->first);
00272          }
00273       }
00274       if(duplicates.size() != 0) {
00275          throw cms::Exception("DuplicateEvents") << "saw " << duplicates.size() << " events\n";
00276       }
00277    }
00278 }
00279 
00280 // ------------ method called once each job for validation
00281 void
00282 MulticoreRunLumiEventChecker::fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
00283   edm::ParameterSetDescription desc;
00284   desc.addUntracked<std::vector<edm::EventID> >("eventSequence");
00285   desc.addUntracked<unsigned int>("multiProcessSequentialEvents", 0U);
00286   descriptions.add("eventIDChecker", desc);
00287 }
00288 
00289 void 
00290 MulticoreRunLumiEventChecker::preForkReleaseResources() {
00291   //This queue is used to communicate between children
00292   messageQueue_ = msgget(IPC_PRIVATE, IPC_CREAT|0660);
00293   if(-1 == messageQueue_) {
00294     throw cms::Exception("FailedToCreateQueue")<<" call to 'msgget' failed to create a message queue. errno: "<<errno<<" "<<strerror(errno);
00295   }
00296 }
00297 
00298 void
00299 MulticoreRunLumiEventChecker::postForkReacquireResources(unsigned int iChildIndex, unsigned int iNumberOfChildren) {
00300    mustSearch_ = true;
00301    
00302    if(0 == iChildIndex) {
00303       //NOTE: must temporarily disable signals so the new thread never tries to process a signal
00304       sigset_t oldset;
00305       edm::disableAllSigs(&oldset);
00306       
00307       Listener listener(&seenIDs_, messageQueue_, iNumberOfChildren);
00308       listenerThread_ = boost::shared_ptr<boost::thread>(new boost::thread(listener)) ;
00309       edm::reenableSigs(&oldset);
00310    }
00311 }
00312 
00313 //define this as a plug-in
00314 DEFINE_FWK_MODULE(MulticoreRunLumiEventChecker);