CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_6_1_1/src/Utilities/XrdAdaptor/src/XrdReadv.cc

Go to the documentation of this file.
00001 
00002 /*
00003  * These functions are a re-implementation of upstream's readv.
00004  * The important aspect is we have vectored scatter-gather IO.
00005  * In the upstream readv, the vectored IO goes into one buffer - 
00006  * not scatter gathered.
00007  *
00008  * CMSSW now uses scatter-gather in the TFileAdapter's ReadReapacker.
00009  * Hence, we have to emulate it using XrdClient::ReadV - horribly slow!
00010  *
00011  * Why not continue to use the XrdClient's internal cache?  Each time we use a
00012  * different TTC, it invalidates the cache.  So, the internal cache and our
00013  * trigger-pattern TTC usage forces a use of readv instead of prefetch.
00014  *
00015  */
00016 
00017 #include "Utilities/XrdAdaptor/src/XrdFile.h"
00018 #include "XProtocol/XProtocol.hh"
00019 #include "XrdClient/XrdClientProtocol.hh"
00020 #include "XrdClient/XrdClientConst.hh"
00021 #include "XrdClient/XrdClientSid.hh"
00022 #include "FWCore/Utilities/interface/EDMException.h"
00023 #include "FWCore/Utilities/interface/Likely.h"
00024 
00025 #include <assert.h>
00026 
00027 class MutexSentry
00028 {
00029 public:
00030   MutexSentry(pthread_mutex_t &mutex) : m_mutex(mutex) {pthread_mutex_lock(&m_mutex);}
00031 
00032  ~MutexSentry() {pthread_mutex_unlock(&m_mutex);}
00033 
00034 private:
00035  pthread_mutex_t &m_mutex;
00036 
00037 };
00038 
00039 // This method is rarely used by CMS; hence, it is a small wrapper and not efficient.
00040 IOSize
00041 XrdFile::readv (IOBuffer *into, IOSize n)
00042 {
00043   vector<IOPosBuffer> new_buf;
00044   new_buf.reserve(n);
00045   IOOffset off = 0;
00046   for (IOSize i=0; i<n; i++) {
00047     IOSize size = into[i].size();
00048     new_buf[i] = IOPosBuffer(off, into[i].data(), size);
00049     off += size;
00050   }
00051   return readv(&(new_buf[0]), n);
00052 }
00053 
00054 /*
00055  * A vectored scatter-gather read.
00056  * Returns the total number of bytes successfully read.
00057  *
00058  */
00059 IOSize
00060 XrdFile::readv (IOPosBuffer *into, IOSize n)
00061 {
00062   assert(m_client);
00063   
00064   // A trivial vector read - unlikely, considering ROOT data format.
00065   if (unlikely(n == 0)) {
00066     return 0;
00067   }
00068   if (unlikely(n == 1)) {
00069     return read(into[0].data(), into[0].size(), into[0].offset());
00070   }
00071 
00072   // The main challenge here is to turn the request into a form that can be
00073   // fed to the Xrootd connection.  In particular, Xrootd has a limit on the
00074   // maximum number of chunks and the maximum size of each chunk.  Hence, the
00075   // loop unrolling.
00076   // 
00077   IOSize total_len = 0;
00078   readahead_list read_chunk_list[READV_MAXCHUNKS];
00079   char *result_list[READV_MAXCHUNKS];
00080   IOSize chunk_off = 0;
00081   IOSize readv_total_len = 0;
00082   const char * handle = m_client->GetHandle(); // also - 16 bytes offset from the location of m_client.
00083   for (IOSize i = 0; i < n; ++i) {
00084 
00085     IOSize len = into[i].size();
00086     if (unlikely(len > 0x7fffffff)) {
00087       edm::Exception ex(edm::errors::FileReadError);
00088       ex << "XrdFile::readv(name='" << m_name << "')[" << i
00089          << "].size=" << len << " exceeds read size limit 0x7fffffff";
00090       ex.addContext("Calling XrdFile::readv()");
00091       addConnection(ex);
00092       throw ex;
00093     }
00094 
00095     IOOffset off = into[i].offset();
00096     char *chunk_data = static_cast<char *>(into[i].data());
00097     while (len > 0) { // Iterate as long as there is additional data to read.
00098                       // Each iteration will read up to READV_MAXCHUNKSIZE of this request.
00099       IOSize chunk_size = len > READV_MAXCHUNKSIZE ? READV_MAXCHUNKSIZE : len;
00100       len -= chunk_size;
00101       readv_total_len += chunk_size;
00102       read_chunk_list[chunk_off].rlen = chunk_size;
00103       read_chunk_list[chunk_off].offset = off;
00104       result_list[chunk_off] = chunk_data;
00105       chunk_data += chunk_size;
00106       off += chunk_size;
00107       memcpy(&(read_chunk_list[chunk_off].fhandle), handle, 4);
00108       chunk_off++;
00109       if (chunk_off == READV_MAXCHUNKS) {
00110         // Now that we have broken the readv into Xrootd-sized chunks, send the actual command.
00111         // readv_send will also parse the response and place the data into the result_list buffers.
00112         IOSize tmp_total_len = readv_send(result_list, *read_chunk_list, chunk_off, readv_total_len);
00113         total_len += tmp_total_len;
00114         if (tmp_total_len != readv_total_len)
00115           return total_len;
00116         readv_total_len = 0;
00117         chunk_off = 0;
00118       }
00119     }
00120   }
00121   // Do the actual readv for all remaining chunks.
00122   if (chunk_off) {
00123     total_len += readv_send(result_list, *read_chunk_list, chunk_off, readv_total_len);
00124   }
00125   return total_len;
00126 }
00127 
00128 /*
00129  * Send the readv request to Xrootd.
00130  * Returns the number of bytes stored into result_list.
00131  * Assumes that read_chunk_list and result_list are of size n; the results of the reads
00132  * described in read_chunk_list will be stored in the buffer pointed to by result_list.
00133  * total_len should be the sum of the size of all reads.
00134  */
00135 IOSize
00136 XrdFile::readv_send(char **result_list, readahead_list &read_chunk_list, IOSize n, IOSize total_len)
00137 {
00138   // Per the xrootd protocol document:
00139   // Sending requests using the same streamid when a kXR_oksofar status code has been 
00140   // returned may produced unpredictable results. A client must serialize all requests 
00141   // using the streamid in the presence of partial results.
00142 
00143   XrdClientConn *xrdc = m_client->GetClientConn();
00144   ClientRequest readvFileRequest;
00145   memset( &readvFileRequest, 0, sizeof(readvFileRequest) );
00146   
00147   kXR_unt16 sid = ConnectionManager->SidManager()->GetNewSid();
00148   memcpy(readvFileRequest.header.streamid, &sid, sizeof(kXR_unt16));
00149   readvFileRequest.header.requestid = kXR_readv;
00150   readvFileRequest.readv.dlen = n * sizeof(struct readahead_list);
00151 
00152   std::vector<char> res_buf;
00153   res_buf.reserve( total_len + (n * sizeof(struct readahead_list)) );
00154 
00155   // Encode, then send the command.
00156   clientMarshallReadAheadList(&read_chunk_list, readvFileRequest.readv.dlen);
00157   bool success;
00158   IOSize data_length;
00159   {
00160     MutexSentry sentry(m_readv_mutex);
00161     success = xrdc->SendGenCommand(&readvFileRequest, &read_chunk_list, 0,
00162                                    (void *)&(res_buf[0]), FALSE, (char *)"ReadV");
00163     data_length = xrdc->LastServerResp.dlen;
00164   }
00165   clientUnMarshallReadAheadList(&read_chunk_list, readvFileRequest.readv.dlen);
00166 
00167   ConnectionManager->SidManager()->ReleaseSid(sid);
00168 
00169   if (success) {
00170     return readv_unpack(result_list, res_buf, data_length, read_chunk_list, n);
00171   } else {
00172     return 0;
00173   }
00174 
00175 }
00176 
00177 /*
00178  * Unpack the response buffer from Xrootd into the final results buffer.
00179  */
00180 IOSize
00181 XrdFile::readv_unpack(char **result_list, std::vector<char> &result_buf, IOSize response_length, readahead_list &read_chunk_list, IOSize n)
00182 {
00183   IOSize response_offset = 0;
00184   IOSize total_len = 0;
00185   for (IOSize i = 0; i < n; i++) {
00186 
00187     if (unlikely(response_offset + sizeof(struct readahead_list) > response_length)) {
00188       edm::Exception ex(edm::errors::FileReadError);
00189       ex << "XrdFile::readv(name='" << m_name << "')[" << i
00190          << "] returned an incorrectly-sized response (short header)";
00191       ex.addContext("Calling XrdFile::readv()");
00192       addConnection(ex);
00193     }
00194 
00195     kXR_int64 offset;
00196     kXR_int32 rlen;
00197     { // Done as a separate block so response is not used later - as it is all in network order!
00198       const readahead_list *response = reinterpret_cast<struct readahead_list*>(&result_buf[response_offset]);
00199       offset = ntohll(response->offset);
00200       rlen = ntohl(response->rlen);
00201     }
00202 
00203     // Sanity / consistency checks; verify the results correspond to the requested chunk
00204     // Also check that the response buffer is sufficient large to read from.
00205     if (unlikely((&read_chunk_list)[i].offset != offset)) {
00206       edm::Exception ex(edm::errors::FileReadError);
00207       ex << "XrdFile::readv(name='" << m_name << "')[" << i
00208          << "] returned offset " << offset << " does not match requested offset "
00209          << (&read_chunk_list)[i].offset; 
00210       ex.addContext("Calling XrdFile::readv()");
00211       addConnection(ex);
00212       throw ex;
00213     }
00214     if (unlikely((&read_chunk_list)[i].rlen != rlen)) {
00215       edm::Exception ex(edm::errors::FileReadError);
00216       ex << "XrdFile::readv(name='" << m_name << "')[" << i
00217          << "] returned size " << rlen << " does not match requested size "
00218          << (&read_chunk_list)[i].rlen;
00219       ex.addContext("Calling XrdFile::readv()");
00220       addConnection(ex);
00221       throw ex;
00222     }
00223     if (unlikely(response_offset + rlen > response_length)) {
00224       edm::Exception ex(edm::errors::FileReadError);
00225       ex << "XrdFile::readv(name='" << m_name << "')[" << i
00226          << "] returned an incorrectly-sized response (short data)";
00227       ex.addContext("Calling XrdFile::readv()");
00228       addConnection(ex);
00229     }
00230 
00231     response_offset += sizeof(struct readahead_list); // Data is stored after header.
00232     total_len += rlen;
00233     // Copy the data into place; increase the offset.
00234     memcpy(result_list[i], &result_buf[response_offset], rlen);
00235     response_offset += rlen;
00236   }
00237 
00238   return total_len;
00239 }
00240