CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
XrdReadv.cc
Go to the documentation of this file.
1 
2 /*
3  * These functions are a re-implementation of upstream's readv.
4  * The important aspect is we have vectored scatter-gather IO.
5  * In the upstream readv, the vectored IO goes into one buffer -
6  * not scatter gathered.
7  *
8  * CMSSW now uses scatter-gather in the TFileAdapter's ReadReapacker.
9  * Hence, we have to emulate it using XrdClient::ReadV - horribly slow!
10  *
11  * Why not continue to use the XrdClient's internal cache? Each time we use a
12  * different TTC, it invalidates the cache. So, the internal cache and our
13  * trigger-pattern TTC usage forces a use of readv instead of prefetch.
14  *
15  */
16 
18 #include "XProtocol/XProtocol.hh"
19 #include "XrdClient/XrdClientConn.hh"
20 #include "XrdClient/XrdClientProtocol.hh"
21 #include "XrdClient/XrdClientConst.hh"
22 #include "XrdClient/XrdClientSid.hh"
23 #include "XrdClient/XrdClientEnv.hh"
26 
27 #include <assert.h>
28 
30 {
31 public:
32  MutexSentry(pthread_mutex_t &mutex) : m_mutex(mutex) {pthread_mutex_lock(&m_mutex);}
33 
34  ~MutexSentry() {pthread_mutex_unlock(&m_mutex);}
35 
36 private:
37  pthread_mutex_t &m_mutex;
38 
39 };
40 
41 // This method is rarely used by CMS; hence, it is a small wrapper and not efficient.
42 IOSize
44 {
45  vector<IOPosBuffer> new_buf;
46  new_buf.reserve(n);
47  IOOffset off = 0;
48  for (IOSize i=0; i<n; i++) {
49  IOSize size = into[i].size();
50  new_buf[i] = IOPosBuffer(off, into[i].data(), size);
51  off += size;
52  }
53  return readv(&(new_buf[0]), n);
54 }
55 
56 /*
57  * A vectored scatter-gather read.
58  * Returns the total number of bytes successfully read.
59  *
60  */
61 IOSize
63 {
64  assert(m_client);
65 
66  // A trivial vector read - unlikely, considering ROOT data format.
67  if (unlikely(n == 0)) {
68  return 0;
69  }
70  if (unlikely(n == 1)) {
71  return read(into[0].data(), into[0].size(), into[0].offset());
72  }
73 
74  XrdClientConn *xrdc = m_client->GetClientConn();
75  if (xrdc) {xrdc->SetOpTimeLimit(EnvGetLong(NAME_TRANSACTIONTIMEOUT));}
76 
77  // The main challenge here is to turn the request into a form that can be
78  // fed to the Xrootd connection. In particular, Xrootd has a limit on the
79  // maximum number of chunks and the maximum size of each chunk. Hence, the
80  // loop unrolling.
81  //
82  IOSize total_len = 0;
83  readahead_list read_chunk_list[READV_MAXCHUNKS];
84  char *result_list[READV_MAXCHUNKS];
85  IOSize chunk_off = 0;
86  IOSize readv_total_len = 0;
87  const char * handle = m_client->GetHandle(); // also - 16 bytes offset from the location of m_client.
88  for (IOSize i = 0; i < n; ++i) {
89 
90  IOSize len = into[i].size();
91  if (unlikely(len > 0x7fffffff)) {
93  ex << "XrdFile::readv(name='" << m_name << "')[" << i
94  << "].size=" << len << " exceeds read size limit 0x7fffffff";
95  ex.addContext("Calling XrdFile::readv()");
96  addConnection(ex);
97  throw ex;
98  }
99 
100  IOOffset off = into[i].offset();
101  char *chunk_data = static_cast<char *>(into[i].data());
102  while (len > 0) { // Iterate as long as there is additional data to read.
103  // Each iteration will read up to READV_MAXCHUNKSIZE of this request.
104  IOSize chunk_size = len > READV_MAXCHUNKSIZE ? READV_MAXCHUNKSIZE : len;
105  len -= chunk_size;
106  readv_total_len += chunk_size;
107  read_chunk_list[chunk_off].rlen = chunk_size;
108  read_chunk_list[chunk_off].offset = off;
109  result_list[chunk_off] = chunk_data;
110  chunk_data += chunk_size;
111  off += chunk_size;
112  memcpy(&(read_chunk_list[chunk_off].fhandle), handle, 4);
113  chunk_off++;
114  if (chunk_off == READV_MAXCHUNKS) {
115  // Now that we have broken the readv into Xrootd-sized chunks, send the actual command.
116  // readv_send will also parse the response and place the data into the result_list buffers.
117  IOSize tmp_total_len = readv_send(result_list, *read_chunk_list, chunk_off, readv_total_len);
118  total_len += tmp_total_len;
119  if (unlikely(tmp_total_len != readv_total_len))
120  {
122  ex << "XrdFile::readv(name='" << m_name << "')"
123  << ".size=" << n << " Chunk of " << readv_total_len << " requested but "
124  << tmp_total_len << " bytes returned by server.";
125  ex.addContext("Calling XrdFile::readv()");
126  addConnection(ex);
127  throw ex;
128  }
129  readv_total_len = 0;
130  chunk_off = 0;
131  }
132  }
133  }
134  // Do the actual readv for all remaining chunks.
135  if (chunk_off) {
136  IOSize tmp_total_len = readv_send(result_list, *read_chunk_list, chunk_off, readv_total_len);
137  if (unlikely(tmp_total_len != readv_total_len))
138  {
140  ex << "XrdFile::readv(name='" << m_name << "')"
141  << ".size=" << n << " Chunk of " << readv_total_len << " requested but "
142  << tmp_total_len << " bytes returned by server.";
143  ex.addContext("Calling XrdFile::readv()");
144  addConnection(ex);
145  throw ex;
146  }
147  total_len += tmp_total_len;
148  }
149  return total_len;
150 }
151 
152 /*
153  * Send the readv request to Xrootd.
154  * Returns the number of bytes stored into result_list.
155  * Assumes that read_chunk_list and result_list are of size n; the results of the reads
156  * described in read_chunk_list will be stored in the buffer pointed to by result_list.
157  * total_len should be the sum of the size of all reads.
158  */
159 IOSize
160 XrdFile::readv_send(char **result_list, readahead_list &read_chunk_list, IOSize n, IOSize total_len)
161 {
162  // Per the xrootd protocol document:
163  // Sending requests using the same streamid when a kXR_oksofar status code has been
164  // returned may produced unpredictable results. A client must serialize all requests
165  // using the streamid in the presence of partial results.
166 
167  XrdClientConn *xrdc = m_client->GetClientConn();
168  ClientRequest readvFileRequest;
169  memset( &readvFileRequest, 0, sizeof(readvFileRequest) );
170 
171  kXR_unt16 sid = ConnectionManager->SidManager()->GetNewSid();
172  memcpy(readvFileRequest.header.streamid, &sid, sizeof(kXR_unt16));
173  readvFileRequest.header.requestid = kXR_readv;
174  readvFileRequest.readv.dlen = n * sizeof(struct readahead_list);
175 
176  std::vector<char> res_buf;
177  res_buf.reserve( total_len + (n * sizeof(struct readahead_list)) );
178 
179  // Encode, then send the command.
180  clientMarshallReadAheadList(&read_chunk_list, readvFileRequest.readv.dlen);
181  bool success;
182  IOSize data_length;
183  {
184  MutexSentry sentry(m_readv_mutex);
185  success = xrdc->SendGenCommand(&readvFileRequest, &read_chunk_list, 0,
186  (void *)&(res_buf[0]), FALSE, (char *)"ReadV");
187  data_length = xrdc->LastServerResp.dlen;
188  }
189  clientUnMarshallReadAheadList(&read_chunk_list, readvFileRequest.readv.dlen);
190 
191  ConnectionManager->SidManager()->ReleaseSid(sid);
192 
193  if (success) {
194  return readv_unpack(result_list, res_buf, data_length, read_chunk_list, n);
195  } else {
196  return 0;
197  }
198 
199 }
200 
201 /*
202  * Unpack the response buffer from Xrootd into the final results buffer.
203  */
204 IOSize
205 XrdFile::readv_unpack(char **result_list, std::vector<char> &result_buf, IOSize response_length, readahead_list &read_chunk_list, IOSize n)
206 {
207  IOSize response_offset = 0;
208  IOSize total_len = 0;
209  for (IOSize i = 0; i < n; i++) {
210 
211  if (unlikely(response_offset + sizeof(struct readahead_list) > response_length)) {
213  ex << "XrdFile::readv(name='" << m_name << "')[" << i
214  << "] returned an incorrectly-sized response (short header)";
215  ex.addContext("Calling XrdFile::readv()");
216  addConnection(ex);
217  }
218 
219  kXR_int64 offset;
220  kXR_int32 rlen;
221  { // Done as a separate block so response is not used later - as it is all in network order!
222  const readahead_list *response = reinterpret_cast<struct readahead_list*>(&result_buf[response_offset]);
223  offset = ntohll(response->offset);
224  rlen = ntohl(response->rlen);
225  }
226 
227  // Sanity / consistency checks; verify the results correspond to the requested chunk
228  // Also check that the response buffer is sufficient large to read from.
229  if (unlikely((&read_chunk_list)[i].offset != offset)) {
231  ex << "XrdFile::readv(name='" << m_name << "')[" << i
232  << "] returned offset " << offset << " does not match requested offset "
233  << (&read_chunk_list)[i].offset;
234  ex.addContext("Calling XrdFile::readv()");
235  addConnection(ex);
236  throw ex;
237  }
238  if (unlikely((&read_chunk_list)[i].rlen != rlen)) {
240  ex << "XrdFile::readv(name='" << m_name << "')[" << i
241  << "] returned size " << rlen << " does not match requested size "
242  << (&read_chunk_list)[i].rlen;
243  ex.addContext("Calling XrdFile::readv()");
244  addConnection(ex);
245  throw ex;
246  }
247  if (unlikely(response_offset + rlen > response_length)) {
249  ex << "XrdFile::readv(name='" << m_name << "')[" << i
250  << "] returned an incorrectly-sized response (short data)";
251  ex.addContext("Calling XrdFile::readv()");
252  addConnection(ex);
253  }
254 
255  response_offset += sizeof(struct readahead_list); // Data is stored after header.
256  total_len += rlen;
257  // Copy the data into place; increase the offset.
258  memcpy(result_list[i], &result_buf[response_offset], rlen);
259  response_offset += rlen;
260  }
261 
262  return total_len;
263 }
264 
int i
Definition: DBlmapReader.cc:9
static boost::mutex mutex
Definition: LHEProxy.cc:11
IOSize readv_send(char **result_buffer, readahead_list &read_chunk_list, IOSize n, IOSize total_len)
Definition: XrdReadv.cc:160
#define unlikely(x)
Definition: Likely.h:21
~MutexSentry()
Definition: XrdReadv.cc:34
tuple handle
Definition: patZpeak.py:22
virtual IOSize readv(IOBuffer *into, IOSize n)
Definition: XrdReadv.cc:43
int read(void)
Definition: IOInput.cc:54
unsigned int offset(bool)
XrdClient * m_client
Definition: XrdFile.h:61
IOOffset offset(void) const
Definition: IOPosBuffer.h:54
void * data(void) const
Definition: IOPosBuffer.h:59
IOSize size(void) const
Definition: IOBuffer.h:50
std::string m_name
Definition: XrdFile.h:65
pthread_mutex_t & m_mutex
Definition: XrdReadv.cc:37
IOSize size(void) const
Definition: IOPosBuffer.h:64
virtual IOOffset size(void) const
Definition: Storage.cc:102
void addContext(std::string const &context)
Definition: Exception.cc:227
int64_t IOOffset
Definition: IOTypes.h:19
pthread_mutex_t m_readv_mutex
Definition: XrdFile.h:68
IOSize readv_unpack(char **result_buffer, std::vector< char > &res_buf, IOSize datalen, readahead_list &read_chunk_list, IOSize n)
Definition: XrdReadv.cc:205
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
size_t IOSize
Definition: IOTypes.h:14
void addConnection(cms::Exception &)
Definition: XrdFile.cc:407
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of FALSE
MutexSentry(pthread_mutex_t &mutex)
Definition: XrdReadv.cc:32