CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
XrdReadv.cc
Go to the documentation of this file.
1 
2 /*
3  * These functions are a re-implementation of upstream's readv.
4  * The important aspect is we have vectored scatter-gather IO.
5  * In the upstream readv, the vectored IO goes into one buffer -
6  * not scatter gathered.
7  *
8  * CMSSW now uses scatter-gather in the TFileAdapter's ReadReapacker.
9  * Hence, we have to emulate it using XrdClient::ReadV - horribly slow!
10  *
11  * Why not continue to use the XrdClient's internal cache? Each time we use a
12  * different TTC, it invalidates the cache. So, the internal cache and our
13  * trigger-pattern TTC usage forces a use of readv instead of prefetch.
14  *
15  */
16 
18 #include "XProtocol/XProtocol.hh"
19 #include "XrdClient/XrdClientProtocol.hh"
20 #include "XrdClient/XrdClientConst.hh"
21 #include "XrdClient/XrdClientSid.hh"
24 
25 #include <assert.h>
26 
28 {
29 public:
30  MutexSentry(pthread_mutex_t &mutex) : m_mutex(mutex) {pthread_mutex_lock(&m_mutex);}
31 
32  ~MutexSentry() {pthread_mutex_unlock(&m_mutex);}
33 
34 private:
35  pthread_mutex_t &m_mutex;
36 
37 };
38 
39 // This method is rarely used by CMS; hence, it is a small wrapper and not efficient.
40 IOSize
42 {
43  vector<IOPosBuffer> new_buf;
44  new_buf.reserve(n);
45  IOOffset off = 0;
46  for (IOSize i=0; i<n; i++) {
47  IOSize size = into[i].size();
48  new_buf[i] = IOPosBuffer(off, into[i].data(), size);
49  off += size;
50  }
51  return readv(&(new_buf[0]), n);
52 }
53 
54 /*
55  * A vectored scatter-gather read.
56  * Returns the total number of bytes successfully read.
57  *
58  */
59 IOSize
61 {
62  assert(m_client);
63 
64  // A trivial vector read - unlikely, considering ROOT data format.
65  if (unlikely(n == 0)) {
66  return 0;
67  }
68  if (unlikely(n == 1)) {
69  return read(into[0].data(), into[0].size(), into[0].offset());
70  }
71 
72  // The main challenge here is to turn the request into a form that can be
73  // fed to the Xrootd connection. In particular, Xrootd has a limit on the
74  // maximum number of chunks and the maximum size of each chunk. Hence, the
75  // loop unrolling.
76  //
77  IOSize total_len = 0;
78  readahead_list read_chunk_list[READV_MAXCHUNKS];
79  char *result_list[READV_MAXCHUNKS];
80  IOSize chunk_off = 0;
81  IOSize readv_total_len = 0;
82  const char * handle = m_client->GetHandle(); // also - 16 bytes offset from the location of m_client.
83  for (IOSize i = 0; i < n; ++i) {
84 
85  IOSize len = into[i].size();
86  if (unlikely(len > 0x7fffffff)) {
88  ex << "XrdFile::readv(name='" << m_name << "')[" << i
89  << "].size=" << len << " exceeds read size limit 0x7fffffff";
90  ex.addContext("Calling XrdFile::readv()");
91  addConnection(ex);
92  throw ex;
93  }
94 
95  IOOffset off = into[i].offset();
96  char *chunk_data = static_cast<char *>(into[i].data());
97  while (len > 0) { // Iterate as long as there is additional data to read.
98  // Each iteration will read up to READV_MAXCHUNKSIZE of this request.
99  IOSize chunk_size = len > READV_MAXCHUNKSIZE ? READV_MAXCHUNKSIZE : len;
100  len -= chunk_size;
101  readv_total_len += chunk_size;
102  read_chunk_list[chunk_off].rlen = chunk_size;
103  read_chunk_list[chunk_off].offset = off;
104  result_list[chunk_off] = chunk_data;
105  chunk_data += chunk_size;
106  off += chunk_size;
107  memcpy(&(read_chunk_list[chunk_off].fhandle), handle, 4);
108  chunk_off++;
109  if (chunk_off == READV_MAXCHUNKS) {
110  // Now that we have broken the readv into Xrootd-sized chunks, send the actual command.
111  // readv_send will also parse the response and place the data into the result_list buffers.
112  IOSize tmp_total_len = readv_send(result_list, *read_chunk_list, chunk_off, readv_total_len);
113  total_len += tmp_total_len;
114  if (tmp_total_len != readv_total_len)
115  return total_len;
116  readv_total_len = 0;
117  chunk_off = 0;
118  }
119  }
120  }
121  // Do the actual readv for all remaining chunks.
122  if (chunk_off) {
123  total_len += readv_send(result_list, *read_chunk_list, chunk_off, readv_total_len);
124  }
125  return total_len;
126 }
127 
128 /*
129  * Send the readv request to Xrootd.
130  * Returns the number of bytes stored into result_list.
131  * Assumes that read_chunk_list and result_list are of size n; the results of the reads
132  * described in read_chunk_list will be stored in the buffer pointed to by result_list.
133  * total_len should be the sum of the size of all reads.
134  */
135 IOSize
136 XrdFile::readv_send(char **result_list, readahead_list &read_chunk_list, IOSize n, IOSize total_len)
137 {
138  // Per the xrootd protocol document:
139  // Sending requests using the same streamid when a kXR_oksofar status code has been
140  // returned may produced unpredictable results. A client must serialize all requests
141  // using the streamid in the presence of partial results.
142 
143  XrdClientConn *xrdc = m_client->GetClientConn();
144  ClientRequest readvFileRequest;
145  memset( &readvFileRequest, 0, sizeof(readvFileRequest) );
146 
147  kXR_unt16 sid = ConnectionManager->SidManager()->GetNewSid();
148  memcpy(readvFileRequest.header.streamid, &sid, sizeof(kXR_unt16));
149  readvFileRequest.header.requestid = kXR_readv;
150  readvFileRequest.readv.dlen = n * sizeof(struct readahead_list);
151 
152  std::vector<char> res_buf;
153  res_buf.reserve( total_len + (n * sizeof(struct readahead_list)) );
154 
155  // Encode, then send the command.
156  clientMarshallReadAheadList(&read_chunk_list, readvFileRequest.readv.dlen);
157  bool success;
158  IOSize data_length;
159  {
160  MutexSentry sentry(m_readv_mutex);
161  success = xrdc->SendGenCommand(&readvFileRequest, &read_chunk_list, 0,
162  (void *)&(res_buf[0]), FALSE, (char *)"ReadV");
163  data_length = xrdc->LastServerResp.dlen;
164  }
165  clientUnMarshallReadAheadList(&read_chunk_list, readvFileRequest.readv.dlen);
166 
167  ConnectionManager->SidManager()->ReleaseSid(sid);
168 
169  if (success) {
170  return readv_unpack(result_list, res_buf, data_length, read_chunk_list, n);
171  } else {
172  return 0;
173  }
174 
175 }
176 
177 /*
178  * Unpack the response buffer from Xrootd into the final results buffer.
179  */
180 IOSize
181 XrdFile::readv_unpack(char **result_list, std::vector<char> &result_buf, IOSize response_length, readahead_list &read_chunk_list, IOSize n)
182 {
183  IOSize response_offset = 0;
184  IOSize total_len = 0;
185  for (IOSize i = 0; i < n; i++) {
186 
187  if (unlikely(response_offset + sizeof(struct readahead_list) > response_length)) {
189  ex << "XrdFile::readv(name='" << m_name << "')[" << i
190  << "] returned an incorrectly-sized response (short header)";
191  ex.addContext("Calling XrdFile::readv()");
192  addConnection(ex);
193  }
194 
195  kXR_int64 offset;
196  kXR_int32 rlen;
197  { // Done as a separate block so response is not used later - as it is all in network order!
198  const readahead_list *response = reinterpret_cast<struct readahead_list*>(&result_buf[response_offset]);
199  offset = ntohll(response->offset);
200  rlen = ntohl(response->rlen);
201  }
202 
203  // Sanity / consistency checks; verify the results correspond to the requested chunk
204  // Also check that the response buffer is sufficient large to read from.
205  if (unlikely((&read_chunk_list)[i].offset != offset)) {
207  ex << "XrdFile::readv(name='" << m_name << "')[" << i
208  << "] returned offset " << offset << " does not match requested offset "
209  << (&read_chunk_list)[i].offset;
210  ex.addContext("Calling XrdFile::readv()");
211  addConnection(ex);
212  throw ex;
213  }
214  if (unlikely((&read_chunk_list)[i].rlen != rlen)) {
216  ex << "XrdFile::readv(name='" << m_name << "')[" << i
217  << "] returned size " << rlen << " does not match requested size "
218  << (&read_chunk_list)[i].rlen;
219  ex.addContext("Calling XrdFile::readv()");
220  addConnection(ex);
221  throw ex;
222  }
223  if (unlikely(response_offset + rlen > response_length)) {
225  ex << "XrdFile::readv(name='" << m_name << "')[" << i
226  << "] returned an incorrectly-sized response (short data)";
227  ex.addContext("Calling XrdFile::readv()");
228  addConnection(ex);
229  }
230 
231  response_offset += sizeof(struct readahead_list); // Data is stored after header.
232  total_len += rlen;
233  // Copy the data into place; increase the offset.
234  memcpy(result_list[i], &result_buf[response_offset], rlen);
235  response_offset += rlen;
236  }
237 
238  return total_len;
239 }
240 
int i
Definition: DBlmapReader.cc:9
static boost::mutex mutex
Definition: LHEProxy.cc:11
IOSize readv_send(char **result_buffer, readahead_list &read_chunk_list, IOSize n, IOSize total_len)
Definition: XrdReadv.cc:136
#define unlikely(x)
~MutexSentry()
Definition: XrdReadv.cc:32
tuple handle
Definition: patZpeak.py:22
virtual IOSize readv(IOBuffer *into, IOSize n)
Definition: XrdReadv.cc:41
int read(void)
Definition: IOInput.cc:54
unsigned int offset(bool)
XrdClient * m_client
Definition: XrdFile.h:61
IOOffset offset(void) const
Definition: IOPosBuffer.h:54
void * data(void) const
Definition: IOPosBuffer.h:59
IOSize size(void) const
Definition: IOBuffer.h:50
std::string m_name
Definition: XrdFile.h:65
pthread_mutex_t & m_mutex
Definition: XrdReadv.cc:35
IOSize size(void) const
Definition: IOPosBuffer.h:64
virtual IOOffset size(void) const
Definition: Storage.cc:102
void addContext(std::string const &context)
Definition: Exception.cc:227
int64_t IOOffset
Definition: IOTypes.h:19
pthread_mutex_t m_readv_mutex
Definition: XrdFile.h:68
IOSize readv_unpack(char **result_buffer, std::vector< char > &res_buf, IOSize datalen, readahead_list &read_chunk_list, IOSize n)
Definition: XrdReadv.cc:181
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
size_t IOSize
Definition: IOTypes.h:14
void addConnection(cms::Exception &)
Definition: XrdFile.cc:405
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of FALSE
MutexSentry(pthread_mutex_t &mutex)
Definition: XrdReadv.cc:30