CMS 3D CMS Logo

/afs/cern.ch/work/a/aaltunda/public/www/CMSSW_5_3_13_patch3/src/FWCore/Framework/src/MessageReceiverForSource.cc

Go to the documentation of this file.
00001 // -*- C++ -*-
00002 //
00003 // Package:     Framework
00004 // Class  :     MessageReceiverForSource
00005 // 
00006 // Implementation:
00007 //     [Notes on implementation]
00008 //
00009 // Original Author:  Chris Jones
00010 //         Created:  Thu Dec 30 10:09:50 CST 2010
00011 //
00012 
00013 // system include files
00014 #include <sys/types.h>
00015 #include <sys/ipc.h>
00016 #include <sys/msg.h>
00017 #include <sys/socket.h>
00018 #include <errno.h>
00019 #include <string.h>
00020 
00021 // user include files
00022 #include "FWCore/Framework/interface/MessageReceiverForSource.h"
00023 #include "FWCore/Utilities/interface/Exception.h"
00024 #include "MessageForSource.h"
00025 #include "MessageForParent.h"
00026 
00027 using namespace edm::multicore;
00028 //
00029 // constants, enums and typedefs
00030 //
00031 
00032 //
00033 // static data member definitions
00034 //
00035 
00036 //
00037 // constructors and destructor
00038 //
00039 MessageReceiverForSource::MessageReceiverForSource(int parentSocket, int parentPipe) :
00040 m_parentSocket(parentSocket),
00041 m_parentPipe(parentPipe),
00042 m_maxFd(parentPipe),
00043 m_startIndex(0),
00044 m_numberOfConsecutiveIndices(0),
00045 m_numberToSkip(0)
00046 {
00047    if (parentSocket > parentPipe) {
00048       m_maxFd = parentSocket;
00049    }
00050    m_maxFd++;
00051 }
00052 
00053 // MessageReceiverForSource::MessageReceiverForSource(const MessageReceiverForSource& rhs)
00054 // {
00055 //    // do actual copying here;
00056 // }
00057 
00058 //MessageReceiverForSource::~MessageReceiverForSource()
00059 //{
00060 //}
00061 
00062 //
00063 // assignment operators
00064 //
00065 // const MessageReceiverForSource& MessageReceiverForSource::operator=(const MessageReceiverForSource& rhs)
00066 // {
00067 //   //An exception safe implementation is
00068 //   MessageReceiverForSource temp(rhs);
00069 //   swap(rhs);
00070 //
00071 //   return *this;
00072 // }
00073 
00074 //
00075 // member functions
00076 //
00077 /*
00078  * The child side of the parent-child communication.  See MessageSenderForSource in 
00079  * EventProcessor.cc for more information.
00080  *
00081  * If the parent terminates before/during send, the send will immediately fail.
00082  * If the parent terminates after send and before recv is successful, the recv may hang.
00083  * Hence, this socket has been marked as non-blocking and we wait until select indicate
00084  * data is available to read.  If the select times out, we write a byte on the watchdog
00085  * pipe.  The byte is meaningless - we are just testing to see if there is an error
00086  * (as EPIPE will be returned if the parent has exited).
00087  *
00088  * Note: if the parent dies between send/recv, it may take the child up to a second to
00089  * notice.  This was the best I could do without adding another watchdog pipe.
00090  */
00091 
00092 void 
00093 MessageReceiverForSource::receive()
00094 {
00095    unsigned long previousStartIndex = m_startIndex;
00096    unsigned long previousConsecutiveIndices = m_numberOfConsecutiveIndices;
00097   
00098    //request more work from the parent
00099    ssize_t rc, rc2;
00100 
00101    {
00102       MessageForParent parentMessage;
00103       errno = 0;
00104 
00105       // If parent has died, this will fail with "connection refused"
00106       if ((rc = send(m_parentSocket, reinterpret_cast<char *>(&parentMessage), parentMessage.sizeForBuffer(), 0)) != static_cast<int>(parentMessage.sizeForBuffer())) {
00107          m_numberOfConsecutiveIndices=0;
00108          m_startIndex=0;
00109          if (rc == -1) {
00110             throw cms::Exception("MulticoreCommunicationFailure") << "failed to send data to controller: errno=" << errno << " : " << strerror(errno);
00111          }
00112          throw cms::Exception("MulticoreCommunicationFailure") << "Unable to write full message to controller (" << rc << " of " << parentMessage.sizeForBuffer() << " byte written)";
00113       }
00114    }
00115 
00116    {  
00117       MessageForSource message;
00118       fd_set readSockets, errorSockets;
00119       errno = 0;
00120 
00121       do {
00122          // We reset the FD set after each select, as select changes the sets we pass it.
00123          FD_ZERO(&errorSockets); FD_SET(m_parentPipe, &errorSockets); FD_SET(m_parentSocket, &errorSockets);
00124          FD_ZERO(&readSockets); FD_SET(m_parentSocket, &readSockets);
00125          struct timeval tv; tv.tv_sec = 1; tv.tv_usec = 0;
00126          while (((rc = select(m_maxFd, &readSockets, NULL, &errorSockets, &tv)) < 0) && (errno == EINTR)) {}
00127 
00128          if (rc == 0) {
00129              // If we timeout waiting for the parent, this will detect if the parent is still alive.
00130              while (((rc2 = write(m_parentPipe, "\0", 1)) < 0) && (errno == EINTR)) {}
00131              if (rc2 < 0) {
00132                 if (errno == EPIPE) {
00133                    throw cms::Exception("MulticoreCommunicationFailure") << "Parent process appears to be dead.";
00134                 }
00135                 throw cms::Exception("MulticoreCommunicationFailure") << "Cannot write to parent:  errno=" << errno << " : " << strerror(errno);
00136              }
00137          }
00138       } while (rc == 0);
00139 
00140       // Check for errors
00141       if (FD_ISSET(m_parentSocket, &errorSockets) || FD_ISSET(m_parentPipe, &errorSockets)) {
00142          throw cms::Exception("MulticoreCommunicationFailure") << "Cannot communicate with parent (fd=" << m_parentSocket << "): errno=" << errno << " : " << strerror(errno);
00143       }
00144 
00145       if (!FD_ISSET(m_parentSocket, &readSockets)) {
00146          throw cms::Exception("MulticoreCommunicationFailure") << "Unable to read from parent socket";
00147       }
00148 
00149       // Note the parent can die between the send and recv; in this case, the recv will hang
00150       // forever.  Thus, this socket has been marked as non-blocking.  According to man pages, it's possible
00151       // for no data to be recieved after select indicates it's ready.  The size check below will catch this,
00152       // but not recover from it.  The various edge cases seemed esoteric enough to not care.
00153       rc = recv(m_parentSocket, &message, MessageForSource::sizeForBuffer(), 0);
00154       if (rc < 0) {
00155          m_numberOfConsecutiveIndices=0;
00156          m_startIndex=0;
00157          throw cms::Exception("MulticoreCommunicationFailure")<<"failed to receive data from controller: errno="<<errno<<" : "<<strerror(errno);
00158       }
00159 
00160       if (rc != (int)MessageForSource::sizeForBuffer()) {
00161          m_numberOfConsecutiveIndices=0;
00162          m_startIndex=0;
00163          throw cms::Exception("MulticoreCommunicationFailure")<<"Incorrect number of bytes received from controller (got " << rc << ", expected " << MessageForSource::sizeForBuffer() << ")";
00164       }
00165 
00166       m_startIndex = message.startIndex;
00167       m_numberOfConsecutiveIndices = message.nIndices;
00168       m_numberToSkip = m_startIndex-previousStartIndex-previousConsecutiveIndices;
00169 
00170       //printf("Start index: %lu, number consecutive: %lu, number to skip: %lu\n", m_startIndex, m_numberOfConsecutiveIndices, m_numberToSkip);
00171    }
00172 
00173    return;
00174 }
00175 
00176 //
00177 // const member functions
00178 //
00179 
00180 //
00181 // static member functions
00182 //