Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #include "Utilities/XrdAdaptor/src/XrdFile.h"
00018 #include "XProtocol/XProtocol.hh"
00019 #include "XrdClient/XrdClientProtocol.hh"
00020 #include "XrdClient/XrdClientConst.hh"
00021 #include "XrdClient/XrdClientSid.hh"
00022 #include "FWCore/Utilities/interface/EDMException.h"
00023 #include "FWCore/Utilities/interface/Likely.h"
00024
00025 #include <assert.h>
00026
00027 class MutexSentry
00028 {
00029 public:
00030 MutexSentry(pthread_mutex_t &mutex) : m_mutex(mutex) {pthread_mutex_lock(&m_mutex);}
00031
00032 ~MutexSentry() {pthread_mutex_unlock(&m_mutex);}
00033
00034 private:
00035 pthread_mutex_t &m_mutex;
00036
00037 };
00038
00039
00040 IOSize
00041 XrdFile::readv (IOBuffer *into, IOSize n)
00042 {
00043 vector<IOPosBuffer> new_buf;
00044 new_buf.reserve(n);
00045 IOOffset off = 0;
00046 for (IOSize i=0; i<n; i++) {
00047 IOSize size = into[i].size();
00048 new_buf[i] = IOPosBuffer(off, into[i].data(), size);
00049 off += size;
00050 }
00051 return readv(&(new_buf[0]), n);
00052 }
00053
00054
00055
00056
00057
00058
00059 IOSize
00060 XrdFile::readv (IOPosBuffer *into, IOSize n)
00061 {
00062 assert(m_client);
00063
00064
00065 if (unlikely(n == 0)) {
00066 return 0;
00067 }
00068 if (unlikely(n == 1)) {
00069 return read(into[0].data(), into[0].size(), into[0].offset());
00070 }
00071
00072
00073
00074
00075
00076
00077 IOSize total_len = 0;
00078 readahead_list read_chunk_list[READV_MAXCHUNKS];
00079 char *result_list[READV_MAXCHUNKS];
00080 IOSize chunk_off = 0;
00081 IOSize readv_total_len = 0;
00082 const char * handle = m_client->GetHandle();
00083 for (IOSize i = 0; i < n; ++i) {
00084
00085 IOSize len = into[i].size();
00086 if (unlikely(len > 0x7fffffff)) {
00087 edm::Exception ex(edm::errors::FileReadError);
00088 ex << "XrdFile::readv(name='" << m_name << "')[" << i
00089 << "].size=" << len << " exceeds read size limit 0x7fffffff";
00090 ex.addContext("Calling XrdFile::readv()");
00091 addConnection(ex);
00092 throw ex;
00093 }
00094
00095 IOOffset off = into[i].offset();
00096 char *chunk_data = static_cast<char *>(into[i].data());
00097 while (len > 0) {
00098
00099 IOSize chunk_size = len > READV_MAXCHUNKSIZE ? READV_MAXCHUNKSIZE : len;
00100 len -= chunk_size;
00101 readv_total_len += chunk_size;
00102 read_chunk_list[chunk_off].rlen = chunk_size;
00103 read_chunk_list[chunk_off].offset = off;
00104 result_list[chunk_off] = chunk_data;
00105 chunk_data += chunk_size;
00106 off += chunk_size;
00107 memcpy(&(read_chunk_list[chunk_off].fhandle), handle, 4);
00108 chunk_off++;
00109 if (chunk_off == READV_MAXCHUNKS) {
00110
00111
00112 IOSize tmp_total_len = readv_send(result_list, *read_chunk_list, chunk_off, readv_total_len);
00113 total_len += tmp_total_len;
00114 if (tmp_total_len != readv_total_len)
00115 return total_len;
00116 readv_total_len = 0;
00117 chunk_off = 0;
00118 }
00119 }
00120 }
00121
00122 if (chunk_off) {
00123 total_len += readv_send(result_list, *read_chunk_list, chunk_off, readv_total_len);
00124 }
00125 return total_len;
00126 }
00127
00128
00129
00130
00131
00132
00133
00134
00135 IOSize
00136 XrdFile::readv_send(char **result_list, readahead_list &read_chunk_list, IOSize n, IOSize total_len)
00137 {
00138
00139
00140
00141
00142
00143 XrdClientConn *xrdc = m_client->GetClientConn();
00144 ClientRequest readvFileRequest;
00145 memset( &readvFileRequest, 0, sizeof(readvFileRequest) );
00146
00147 kXR_unt16 sid = ConnectionManager->SidManager()->GetNewSid();
00148 memcpy(readvFileRequest.header.streamid, &sid, sizeof(kXR_unt16));
00149 readvFileRequest.header.requestid = kXR_readv;
00150 readvFileRequest.readv.dlen = n * sizeof(struct readahead_list);
00151
00152 std::vector<char> res_buf;
00153 res_buf.reserve( total_len + (n * sizeof(struct readahead_list)) );
00154
00155
00156 clientMarshallReadAheadList(&read_chunk_list, readvFileRequest.readv.dlen);
00157 bool success;
00158 IOSize data_length;
00159 {
00160 MutexSentry sentry(m_readv_mutex);
00161 success = xrdc->SendGenCommand(&readvFileRequest, &read_chunk_list, 0,
00162 (void *)&(res_buf[0]), FALSE, (char *)"ReadV");
00163 data_length = xrdc->LastServerResp.dlen;
00164 }
00165 clientUnMarshallReadAheadList(&read_chunk_list, readvFileRequest.readv.dlen);
00166
00167 ConnectionManager->SidManager()->ReleaseSid(sid);
00168
00169 if (success) {
00170 return readv_unpack(result_list, res_buf, data_length, read_chunk_list, n);
00171 } else {
00172 return 0;
00173 }
00174
00175 }
00176
00177
00178
00179
00180 IOSize
00181 XrdFile::readv_unpack(char **result_list, std::vector<char> &result_buf, IOSize response_length, readahead_list &read_chunk_list, IOSize n)
00182 {
00183 IOSize response_offset = 0;
00184 IOSize total_len = 0;
00185 for (IOSize i = 0; i < n; i++) {
00186
00187 if (unlikely(response_offset + sizeof(struct readahead_list) > response_length)) {
00188 edm::Exception ex(edm::errors::FileReadError);
00189 ex << "XrdFile::readv(name='" << m_name << "')[" << i
00190 << "] returned an incorrectly-sized response (short header)";
00191 ex.addContext("Calling XrdFile::readv()");
00192 addConnection(ex);
00193 }
00194
00195 kXR_int64 offset;
00196 kXR_int32 rlen;
00197 {
00198 const readahead_list *response = reinterpret_cast<struct readahead_list*>(&result_buf[response_offset]);
00199 offset = ntohll(response->offset);
00200 rlen = ntohl(response->rlen);
00201 }
00202
00203
00204
00205 if (unlikely((&read_chunk_list)[i].offset != offset)) {
00206 edm::Exception ex(edm::errors::FileReadError);
00207 ex << "XrdFile::readv(name='" << m_name << "')[" << i
00208 << "] returned offset " << offset << " does not match requested offset "
00209 << (&read_chunk_list)[i].offset;
00210 ex.addContext("Calling XrdFile::readv()");
00211 addConnection(ex);
00212 throw ex;
00213 }
00214 if (unlikely((&read_chunk_list)[i].rlen != rlen)) {
00215 edm::Exception ex(edm::errors::FileReadError);
00216 ex << "XrdFile::readv(name='" << m_name << "')[" << i
00217 << "] returned size " << rlen << " does not match requested size "
00218 << (&read_chunk_list)[i].rlen;
00219 ex.addContext("Calling XrdFile::readv()");
00220 addConnection(ex);
00221 throw ex;
00222 }
00223 if (unlikely(response_offset + rlen > response_length)) {
00224 edm::Exception ex(edm::errors::FileReadError);
00225 ex << "XrdFile::readv(name='" << m_name << "')[" << i
00226 << "] returned an incorrectly-sized response (short data)";
00227 ex.addContext("Calling XrdFile::readv()");
00228 addConnection(ex);
00229 }
00230
00231 response_offset += sizeof(struct readahead_list);
00232 total_len += rlen;
00233
00234 memcpy(result_list[i], &result_buf[response_offset], rlen);
00235 response_offset += rlen;
00236 }
00237
00238 return total_len;
00239 }
00240