CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
MessageReceiverForSource.cc
Go to the documentation of this file.
1 // -*- C++ -*-
2 //
3 // Package: Framework
4 // Class : MessageReceiverForSource
5 //
6 // Implementation:
7 // [Notes on implementation]
8 //
9 // Original Author: Chris Jones
10 // Created: Thu Dec 30 10:09:50 CST 2010
11 //
12 
13 // system include files
14 #include <sys/types.h>
15 #include <sys/ipc.h>
16 #include <sys/msg.h>
17 #include <sys/socket.h>
18 #include <errno.h>
19 #include <string.h>
20 
21 // user include files
24 #include "MessageForSource.h"
25 #include "MessageForParent.h"
26 
27 using namespace edm::multicore;
28 //
29 // constants, enums and typedefs
30 //
31 
32 //
33 // static data member definitions
34 //
35 
36 //
37 // constructors and destructor
38 //
39 MessageReceiverForSource::MessageReceiverForSource(int parentSocket, int parentPipe) :
40 m_parentSocket(parentSocket),
41 m_parentPipe(parentPipe),
42 m_maxFd(parentPipe),
43 m_startIndex(0),
44 m_numberOfConsecutiveIndices(0),
45 m_numberToSkip(0)
46 {
47  if (parentSocket > parentPipe) {
48  m_maxFd = parentSocket;
49  }
50  m_maxFd++;
51 }
52 
53 // MessageReceiverForSource::MessageReceiverForSource(const MessageReceiverForSource& rhs)
54 // {
55 // // do actual copying here;
56 // }
57 
58 //MessageReceiverForSource::~MessageReceiverForSource()
59 //{
60 //}
61 
62 //
63 // assignment operators
64 //
65 // const MessageReceiverForSource& MessageReceiverForSource::operator=(const MessageReceiverForSource& rhs)
66 // {
67 // //An exception safe implementation is
68 // MessageReceiverForSource temp(rhs);
69 // swap(rhs);
70 //
71 // return *this;
72 // }
73 
74 //
75 // member functions
76 //
77 /*
78  * The child side of the parent-child communication. See MessageSenderForSource in
79  * EventProcessor.cc for more information.
80  *
81  * If the parent terminates before/during send, the send will immediately fail.
82  * If the parent terminates after send and before recv is successful, the recv may hang.
83  * Hence, this socket has been marked as non-blocking and we wait until select indicate
84  * data is available to read. If the select times out, we write a byte on the watchdog
85  * pipe. The byte is meaningless - we are just testing to see if there is an error
86  * (as EPIPE will be returned if the parent has exited).
87  *
88  * Note: if the parent dies between send/recv, it may take the child up to a second to
89  * notice. This was the best I could do without adding another watchdog pipe.
90  */
91 
92 void
94 {
95  unsigned long previousStartIndex = m_startIndex;
96  unsigned long previousConsecutiveIndices = m_numberOfConsecutiveIndices;
97 
98  //request more work from the parent
99  ssize_t rc, rc2;
100 
101  {
102  MessageForParent parentMessage;
103  errno = 0;
104 
105  // If parent has died, this will fail with "connection refused"
106  if ((rc = send(m_parentSocket, reinterpret_cast<char *>(&parentMessage), parentMessage.sizeForBuffer(), 0)) != static_cast<int>(parentMessage.sizeForBuffer())) {
108  m_startIndex=0;
109  if (rc == -1) {
110  throw cms::Exception("MulticoreCommunicationFailure") << "failed to send data to controller: errno=" << errno << " : " << strerror(errno);
111  }
112  throw cms::Exception("MulticoreCommunicationFailure") << "Unable to write full message to controller (" << rc << " of " << parentMessage.sizeForBuffer() << " byte written)";
113  }
114  }
115 
116  {
118  fd_set readSockets, errorSockets;
119  errno = 0;
120 
121  do {
122  // We reset the FD set after each select, as select changes the sets we pass it.
123  FD_ZERO(&errorSockets); FD_SET(m_parentPipe, &errorSockets); FD_SET(m_parentSocket, &errorSockets);
124  FD_ZERO(&readSockets); FD_SET(m_parentSocket, &readSockets);
125  struct timeval tv; tv.tv_sec = 1; tv.tv_usec = 0;
126  while (((rc = select(m_maxFd, &readSockets, NULL, &errorSockets, &tv)) < 0) && (errno == EINTR)) {}
127 
128  if (rc == 0) {
129  // If we timeout waiting for the parent, this will detect if the parent is still alive.
130  while (((rc2 = write(m_parentPipe, "\0", 1)) < 0) && (errno == EINTR)) {}
131  if (rc2 < 0) {
132  if (errno == EPIPE) {
133  throw cms::Exception("MulticoreCommunicationFailure") << "Parent process appears to be dead.";
134  }
135  throw cms::Exception("MulticoreCommunicationFailure") << "Cannot write to parent: errno=" << errno << " : " << strerror(errno);
136  }
137  }
138  } while (rc == 0);
139 
140  // Check for errors
141  if (FD_ISSET(m_parentSocket, &errorSockets) || FD_ISSET(m_parentPipe, &errorSockets)) {
142  throw cms::Exception("MulticoreCommunicationFailure") << "Cannot communicate with parent (fd=" << m_parentSocket << "): errno=" << errno << " : " << strerror(errno);
143  }
144 
145  if (!FD_ISSET(m_parentSocket, &readSockets)) {
146  throw cms::Exception("MulticoreCommunicationFailure") << "Unable to read from parent socket";
147  }
148 
149  // Note the parent can die between the send and recv; in this case, the recv will hang
150  // forever. Thus, this socket has been marked as non-blocking. According to man pages, it's possible
151  // for no data to be recieved after select indicates it's ready. The size check below will catch this,
152  // but not recover from it. The various edge cases seemed esoteric enough to not care.
153  rc = recv(m_parentSocket, &message, MessageForSource::sizeForBuffer(), 0);
154  if (rc < 0) {
156  m_startIndex=0;
157  throw cms::Exception("MulticoreCommunicationFailure")<<"failed to receive data from controller: errno="<<errno<<" : "<<strerror(errno);
158  }
159 
160  if (rc != (int)MessageForSource::sizeForBuffer()) {
162  m_startIndex=0;
163  throw cms::Exception("MulticoreCommunicationFailure")<<"Incorrect number of bytes received from controller (got " << rc << ", expected " << MessageForSource::sizeForBuffer() << ")";
164  }
165 
166  m_startIndex = message.startIndex;
168  m_numberToSkip = m_startIndex-previousStartIndex-previousConsecutiveIndices;
169 
170  //printf("Start index: %lu, number consecutive: %lu, number to skip: %lu\n", m_startIndex, m_numberOfConsecutiveIndices, m_numberToSkip);
171  }
172 
173  return;
174 }
175 
176 //
177 // const member functions
178 //
179 
180 //
181 // static member functions
182 //
#define NULL
Definition: scimark2.h:8
MessageReceiverForSource(int parentSocket, int parentPipe)
Takes the fd of the read and write socket for communication with parent.