00001 #include "Utilities/StorageFactory/interface/StatisticsSenderService.h"
00002 #include "FWCore/ServiceRegistry/interface/Service.h"
00003 #include "Utilities/XrdAdaptor/src/XrdFile.h"
00004 #include "FWCore/Utilities/interface/EDMException.h"
00005 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00006 #include "FWCore/Utilities/interface/Likely.h"
00007 #include <vector>
00008 #include <sstream>
00009
00010 XrdFile::XrdFile (void)
00011 : m_client (0),
00012 m_offset (0),
00013 m_stat(),
00014 m_close (false),
00015 m_name()
00016 {
00017 memset(&m_stat, 0, sizeof (m_stat));
00018 pthread_mutex_init(&m_readv_mutex, 0);
00019 }
00020
00021 XrdFile::XrdFile (const char *name,
00022 int flags ,
00023 int perms )
00024 : m_client (0),
00025 m_offset (0),
00026 m_stat(),
00027 m_close (false),
00028 m_name()
00029 {
00030 memset(&m_stat, 0, sizeof (m_stat));
00031 pthread_mutex_init(&m_readv_mutex, 0);
00032 open (name, flags, perms);
00033 }
00034
00035 XrdFile::XrdFile (const std::string &name,
00036 int flags ,
00037 int perms )
00038 : m_client (0),
00039 m_offset (0),
00040 m_stat(),
00041 m_close (false),
00042 m_name()
00043 {
00044 memset(&m_stat, 0, sizeof (m_stat));
00045 pthread_mutex_init(&m_readv_mutex, 0);
00046 open (name.c_str (), flags, perms);
00047 }
00048
00049 XrdFile::~XrdFile (void)
00050 {
00051 if (m_close)
00052 edm::LogError("XrdFileError")
00053 << "Destructor called on XROOTD file '" << m_name
00054 << "' but the file is still open";
00055 pthread_mutex_destroy(&m_readv_mutex);
00056 }
00057
00059 void
00060 XrdFile::create (const char *name,
00061 bool exclusive ,
00062 int perms )
00063 {
00064 open (name,
00065 (IOFlags::OpenCreate | IOFlags::OpenWrite | IOFlags::OpenTruncate
00066 | (exclusive ? IOFlags::OpenExclusive : 0)),
00067 perms);
00068 }
00069
00070 void
00071 XrdFile::create (const std::string &name,
00072 bool exclusive ,
00073 int perms )
00074 {
00075 open (name.c_str (),
00076 (IOFlags::OpenCreate | IOFlags::OpenWrite | IOFlags::OpenTruncate
00077 | (exclusive ? IOFlags::OpenExclusive : 0)),
00078 perms);
00079 }
00080
00081 void
00082 XrdFile::open (const std::string &name,
00083 int flags ,
00084 int perms )
00085 { open (name.c_str (), flags, perms); }
00086
00087 void
00088 XrdFile::open (const char *name,
00089 int flags ,
00090 int perms )
00091 {
00092
00093 if ((name == 0) || (*name == 0)) {
00094 edm::Exception ex(edm::errors::FileOpenError);
00095 ex << "Cannot open a file without a name";
00096 ex.addContext("Calling XrdFile::open()");
00097 throw ex;
00098 }
00099 if ((flags & (IOFlags::OpenRead | IOFlags::OpenWrite)) == 0) {
00100 edm::Exception ex(edm::errors::FileOpenError);
00101 ex << "Must open file '" << name << "' at least for read or write";
00102 ex.addContext("Calling XrdFile::open()");
00103 throw ex;
00104 }
00105
00106 if (m_client && m_close)
00107 close();
00108 else
00109 abort();
00110
00111
00112 int openflags = 0;
00113
00114 if (flags & IOFlags::OpenWrite)
00115 openflags |= kXR_open_updt;
00116 else if (flags & IOFlags::OpenRead)
00117 openflags |= kXR_open_read;
00118
00119 if (flags & IOFlags::OpenAppend) {
00120 edm::Exception ex(edm::errors::FileOpenError);
00121 ex << "Opening file '" << name << "' in append mode not supported";
00122 ex.addContext("Calling XrdFile::open()");
00123 throw ex;
00124 }
00125
00126 if (flags & IOFlags::OpenCreate)
00127 {
00128 if (! (flags & IOFlags::OpenExclusive))
00129 openflags |= kXR_delete;
00130 openflags |= kXR_new;
00131 openflags |= kXR_mkpath;
00132 }
00133
00134 if ((flags & IOFlags::OpenTruncate) && (flags & IOFlags::OpenWrite))
00135 openflags |= kXR_delete;
00136
00137 m_name = name;
00138 m_client = new XrdClient(name);
00139 if (! m_client->Open(perms, openflags)
00140 || m_client->LastServerResp()->status != kXR_ok) {
00141 edm::Exception ex(edm::errors::FileOpenError);
00142 ex << "XrdClient::Open(name='" << name
00143 << "', flags=0x" << std::hex << openflags
00144 << ", permissions=0" << std::oct << perms << std::dec
00145 << ") => error '" << m_client->LastServerError()->errmsg
00146 << "' (errno=" << m_client->LastServerError()->errnum << ")";
00147 ex.addContext("Calling XrdFile::open()");
00148 addConnection(ex);
00149 throw ex;
00150 }
00151 if (! m_client->Stat(&m_stat)) {
00152 edm::Exception ex(edm::errors::FileOpenError);
00153 ex << "XrdClient::Stat(name='" << name
00154 << ") => error '" << m_client->LastServerError()->errmsg
00155 << "' (errno=" << m_client->LastServerError()->errnum << ")";
00156 ex.addContext("Calling XrdFile::open()");
00157 addConnection(ex);
00158 throw ex;
00159 }
00160 m_offset = 0;
00161 m_close = true;
00162
00163
00164
00165 const char * crabJobId = edm::storage::StatisticsSenderService::getJobID();
00166 if (crabJobId) {
00167 kXR_unt32 dictId;
00168 m_client->SendMonitoringInfo(crabJobId, &dictId);
00169 edm::LogInfo("XrdFileInfo") << "Set monitoring ID to " << crabJobId << " with resulting dictId " << dictId << ".";
00170 }
00171
00172 edm::LogInfo("XrdFileInfo") << "Opened " << m_name;
00173
00174 XrdClientConn *conn = m_client->GetClientConn();
00175 edm::LogInfo("XrdFileInfo") << "Connection URL " << conn->GetCurrentUrl().GetUrl().c_str();
00176
00177 const char * host = conn->GetCurrentUrl().Host.c_str();
00178 edm::Service<edm::storage::StatisticsSenderService> statsService;
00179 if (statsService.isAvailable()) {
00180 statsService->setCurrentServer(host);
00181 }
00182 }
00183
00184 void
00185 XrdFile::close (void)
00186 {
00187 if (! m_client)
00188 {
00189 edm::LogError("XrdFileError")
00190 << "XrdFile::close(name='" << m_name
00191 << "') called but the file is not open";
00192 m_close = false;
00193 return;
00194 }
00195
00196 if (! m_client->Close())
00197 edm::LogWarning("XrdFileWarning")
00198 << "XrdFile::close(name='" << m_name
00199 << "') failed with error '" << m_client->LastServerError()->errmsg
00200 << "' (errno=" << m_client->LastServerError()->errnum << ")";
00201 delete m_client;
00202 m_client = 0;
00203
00204 m_close = false;
00205 m_offset = 0;
00206 memset(&m_stat, 0, sizeof (m_stat));
00207 edm::LogInfo("XrdFileInfo") << "Closed " << m_name;
00208 }
00209
00210 void
00211 XrdFile::abort (void)
00212 {
00213 delete m_client;
00214 m_client = 0;
00215 m_close = false;
00216 m_offset = 0;
00217 memset(&m_stat, 0, sizeof (m_stat));
00218 }
00219
00221 IOSize
00222 XrdFile::read (void *into, IOSize n)
00223 {
00224 if (n > 0x7fffffff) {
00225 edm::Exception ex(edm::errors::FileReadError);
00226 ex << "XrdFile::read(name='" << m_name << "', n=" << n
00227 << ") too many bytes, limit is 0x7fffffff";
00228 ex.addContext("Calling XrdFile::read()");
00229 addConnection(ex);
00230 throw ex;
00231 }
00232 int s = m_client->Read(into, m_offset, n);
00233 if (s < 0) {
00234 edm::Exception ex(edm::errors::FileReadError);
00235 ex << "XrdClient::Read(name='" << m_name
00236 << "', offset=" << m_offset << ", n=" << n
00237 << ") failed with error '" << m_client->LastServerError()->errmsg
00238 << "' (errno=" << m_client->LastServerError()->errnum << ")";
00239 ex.addContext("Calling XrdFile::read()");
00240 addConnection(ex);
00241 throw ex;
00242 }
00243 m_offset += s;
00244 return s;
00245 }
00246
00247 IOSize
00248 XrdFile::read (void *into, IOSize n, IOOffset pos)
00249 {
00250 if (n > 0x7fffffff) {
00251 edm::Exception ex(edm::errors::FileReadError);
00252 ex << "XrdFile::read(name='" << m_name << "', n=" << n
00253 << ") exceeds read size limit 0x7fffffff";
00254 ex.addContext("Calling XrdFile::read()");
00255 addConnection(ex);
00256 throw ex;
00257 }
00258 int s = m_client->Read(into, pos, n);
00259 if (s < 0) {
00260 edm::Exception ex(edm::errors::FileReadError);
00261 ex << "XrdClient::Read(name='" << m_name
00262 << "', offset=" << m_offset << ", n=" << n
00263 << ") failed with error '" << m_client->LastServerError()->errmsg
00264 << "' (errno=" << m_client->LastServerError()->errnum << ")";
00265 ex.addContext("Calling XrdFile::read()");
00266 addConnection(ex);
00267 throw ex;
00268 }
00269 return s;
00270 }
00271
00272 IOSize
00273 XrdFile::write (const void *from, IOSize n)
00274 {
00275 if (n > 0x7fffffff) {
00276 cms::Exception ex("FileWriteError");
00277 ex << "XrdFile::write(name='" << m_name << "', n=" << n
00278 << ") too many bytes, limit is 0x7fffffff";
00279 ex.addContext("Calling XrdFile::write()");
00280 addConnection(ex);
00281 throw ex;
00282 }
00283 ssize_t s = m_client->Write(from, m_offset, n);
00284 if (s < 0) {
00285 cms::Exception ex("FileWriteError");
00286 ex << "XrdFile::write(name='" << m_name << "', n=" << n
00287 << ") failed with error '" << m_client->LastServerError()->errmsg
00288 << "' (errno=" << m_client->LastServerError()->errnum << ")";
00289 ex.addContext("Calling XrdFile::write()");
00290 addConnection(ex);
00291 throw ex;
00292 }
00293 m_offset += s;
00294 if (m_offset > m_stat.size)
00295 m_stat.size = m_offset;
00296
00297 return s;
00298 }
00299
00300 IOSize
00301 XrdFile::write (const void *from, IOSize n, IOOffset pos)
00302 {
00303 if (n > 0x7fffffff) {
00304 cms::Exception ex("FileWriteError");
00305 ex << "XrdFile::write(name='" << m_name << "', n=" << n
00306 << ") too many bytes, limit is 0x7fffffff";
00307 ex.addContext("Calling XrdFile::write()");
00308 addConnection(ex);
00309 throw ex;
00310 }
00311 ssize_t s = m_client->Write(from, pos, n);
00312 if (s < 0) {
00313 cms::Exception ex("FileWriteError");
00314 ex << "XrdFile::write(name='" << m_name << "', n=" << n
00315 << ") failed with error '" << m_client->LastServerError()->errmsg
00316 << "' (errno=" << m_client->LastServerError()->errnum << ")";
00317 ex.addContext("Calling XrdFile::write()");
00318 addConnection(ex);
00319 throw ex;
00320 }
00321 if (pos + s > m_stat.size)
00322 m_stat.size = pos + s;
00323
00324 return s;
00325 }
00326
00327 bool
00328 XrdFile::prefetch (const IOPosBuffer *what, IOSize n)
00329 {
00330
00331
00332
00333 if (unlikely((n == 1) && (what[0].offset() == 0) && (what[0].size() == PREFETCH_PROBE_LENGTH))) {
00334 return false;
00335 }
00336 std::vector<long long> offsets; offsets.resize(n);
00337 std::vector<int> lens; lens.resize(n);
00338 kXR_int64 total = 0;
00339 for (IOSize i = 0; i < n; ++i) {
00340 offsets[i] = what[i].offset();
00341 lens[i] = what[i].size();
00342 total += what[i].size();
00343 }
00344
00345 kXR_int64 r = m_client->ReadV(NULL, &offsets[0], &lens[0], n);
00346 return r == total;
00347 }
00348
00352 IOOffset
00353 XrdFile::position (IOOffset offset, Relative whence )
00354 {
00355 if (! m_client) {
00356 cms::Exception ex("FilePositionError");
00357 ex << "XrdFile::position() called on a closed file";
00358 ex.addContext("Calling XrdFile::position()");
00359 addConnection(ex);
00360 throw ex;
00361 }
00362 switch (whence)
00363 {
00364 case SET:
00365 m_offset = offset;
00366 break;
00367
00368 case CURRENT:
00369 m_offset += offset;
00370 break;
00371
00372 case END:
00373 m_offset = m_stat.size + offset;
00374 break;
00375
00376 default:
00377 cms::Exception ex("FilePositionError");
00378 ex << "XrdFile::position() called with incorrect 'whence' parameter";
00379 ex.addContext("Calling XrdFile::position()");
00380 addConnection(ex);
00381 throw ex;
00382 }
00383
00384 if (m_offset < 0)
00385 m_offset = 0;
00386 if (m_offset > m_stat.size)
00387 m_stat.size = m_offset;
00388
00389 return m_offset;
00390 }
00391
00392 void
00393 XrdFile::resize (IOOffset )
00394 {
00395 cms::Exception ex("FileResizeError");
00396 ex << "XrdFile::resize(name='" << m_name << "') not implemented";
00397 ex.addContext("Calling XrdFile::resize()");
00398 addConnection(ex);
00399 throw ex;
00400 }
00401
00402 void
00403 XrdFile::addConnection (cms::Exception &ex)
00404 {
00405 XrdClientConn *conn = m_client->GetClientConn();
00406 if (conn) {
00407 std::stringstream ss;
00408 ss << "Current server connection: " << conn->GetCurrentUrl().GetUrl().c_str();
00409 ex.addAdditionalInfo(ss.str());
00410 }
00411 }
00412