00001 #include "Utilities/StorageFactory/interface/StorageMaker.h"
00002 #include "Utilities/StorageFactory/interface/StorageFactory.h"
00003 #include "Utilities/StorageFactory/interface/StorageMakerFactory.h"
00004 #include "Utilities/StorageFactory/interface/StorageAccountProxy.h"
00005 #include "Utilities/StorageFactory/interface/LocalCacheFile.h"
00006 #include "Utilities/RFIOAdaptor/interface/RFIOFile.h"
00007 #include "Utilities/RFIOAdaptor/interface/RFIO.h"
00008 #include "Utilities/RFIOAdaptor/interface/RFIOPluginFactory.h"
00009 #include "FWCore/ServiceRegistry/interface/Service.h"
00010 #include "FWCore/Catalog/interface/SiteLocalConfig.h"
00011 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00012 #include "FWCore/Utilities/interface/Exception.h"
00013 #include <cstdlib>
00014 #include "shift/stager_api.h"
00015
00016 class RFIOStorageMaker : public StorageMaker
00017 {
00021 std::string normalise (const std::string &path)
00022 {
00023 std::string prefix;
00024
00025 size_t suffix = path.find("?");
00026 if (suffix == std::string::npos)
00027 {
00028
00029 suffix = 0;
00030 if (path.find (":") == std::string::npos)
00031 {
00032 size_t e = path.find_first_not_of ("/");
00033 if (e != std::string::npos)
00034 {
00035 size_t c = path.find("/castor/");
00036 if ((c != std::string::npos) && (c == e-1))
00037 {
00038
00039 suffix = c;
00040 prefix = "rfio:///?path=";
00041 }
00042 else
00043 {
00044 c = path.find("/dpm/");
00045 if ((c != std::string::npos) && (c == e-1))
00046 {
00047
00048 suffix = c;
00049 prefix = "rfio://";
00050 }
00051 }
00052 }
00053 }
00054 }
00055 else
00056 {
00057
00058 prefix = path.substr(0, suffix);
00059 size_t h = prefix.find_first_not_of('/');
00060 size_t s = prefix.find_last_not_of('/');
00061 prefix.resize(s+1);
00062 prefix.replace(0,h,"rfio://");
00063 prefix += '/';
00064 }
00065
00066 return prefix + path.substr(suffix);
00067 }
00068
00069 public:
00070 RFIOStorageMaker()
00071 {
00072 std::string rfiotype("");
00073 bool err = false;
00074 try
00075 {
00076 edm::Service<edm::SiteLocalConfig> siteconfig;
00077 if (!siteconfig.isAvailable())
00078 err = true;
00079 else
00080 rfiotype = siteconfig->rfioType();
00081 }
00082 catch (const cms::Exception &e)
00083 {
00084 err = true;
00085 }
00086
00087 if (err)
00088 edm::LogWarning("RFIOStorageMaker")
00089 << "SiteLocalConfig Failed: SiteLocalConfigService is not loaded yet."
00090 << "Will use default 'castor' RFIO implementation.";
00091
00092 if (rfiotype.empty())
00093 rfiotype = "castor";
00094
00095
00096 putenv("RFIO_TCP_NODELAY=1");
00097
00098 RFIOPluginFactory::get()->create(rfiotype);
00099 Cthread_init();
00100 }
00101
00102 virtual Storage *open (const std::string &proto,
00103 const std::string &path,
00104 int mode,
00105 const std::string &tmpdir)
00106 {
00107 StorageFactory *f = StorageFactory::get();
00108 StorageFactory::ReadHint readHint = f->readHint();
00109 StorageFactory::CacheHint cacheHint = f->cacheHint();
00110
00111 if (readHint != StorageFactory::READ_HINT_UNBUFFERED
00112 || cacheHint == StorageFactory::CACHE_HINT_STORAGE)
00113 mode &= ~IOFlags::OpenUnbuffered;
00114 else
00115 mode |= IOFlags::OpenUnbuffered;
00116
00117 Storage *file = new RFIOFile(normalise(path), mode);
00118 if ((cacheHint == StorageFactory::CACHE_HINT_LAZY_DOWNLOAD
00119 || cacheHint == StorageFactory::CACHE_HINT_AUTO_DETECT)
00120 && ! (mode & IOFlags::OpenWrite))
00121 {
00122 if (f->accounting())
00123 file = new StorageAccountProxy(proto, file);
00124 file = new LocalCacheFile(file, f->tempDir());
00125 }
00126 return file;
00127 }
00128
00129 virtual void stagein (const std::string &proto,
00130 const std::string &path)
00131 {
00132 std::string npath = normalise(path);
00133 size_t castor = npath.find("?path=/castor/");
00134 size_t rest = npath.find("&");
00135 if (proto != "rfio" || castor == std::string::npos)
00136 return;
00137
00138 castor += 6;
00139 size_t len = (rest == std::string::npos ? rest : rest-castor);
00140 std::string stagepath(npath, castor, len);
00141
00142 stage_options opts;
00143 opts.stage_host = getenv("STAGE_HOST");
00144 opts.service_class = getenv("STAGE_SVCCLASS");
00145 opts.stage_port = 0;
00146 opts.stage_version = 2;
00147
00148 stage_prepareToGet_filereq req;
00149 req.protocol = (char *) "rfio";
00150 req.filename = (char *) stagepath.c_str();
00151 req.priority = 0;
00152
00153 int nresp = 0;
00154 stage_prepareToGet_fileresp *resp = 0;
00155 int rc = stage_prepareToGet(0, &req, 1, &resp, &nresp, 0, &opts);
00156 if (rc < 0)
00157 throw cms::Exception("RFIOStorageMaker::stagein()")
00158 << "Error while staging in '" << stagepath
00159 << "', error was: " << rfio_serror()
00160 << " (serrno=" << serrno << ")";
00161
00162 if (nresp == 1 && resp->errorCode != 0)
00163 throw cms::Exception("RFIOStorageMaker::stagein()")
00164 << "Error while staging in '" << stagepath
00165 << "', stagein error was: " << resp->errorMessage
00166 << " (code=" << resp->errorCode << ")";
00167
00168 free(resp->filename);
00169 free(resp->errorMessage);
00170 free(resp);
00171 }
00172
00173 virtual bool check (const std::string &proto,
00174 const std::string &path,
00175 IOOffset *size = 0)
00176 {
00177 std::string npath = normalise(path);
00178 if (rfio_access(npath.c_str (), R_OK) != 0)
00179 return false;
00180
00181 if (size)
00182 {
00183 struct stat buf;
00184 if (rfio_stat64(npath.c_str (), &buf) != 0)
00185 return false;
00186
00187 *size = buf.st_size;
00188 }
00189
00190 return true;
00191 }
00192 };
00193
00194 DEFINE_EDM_PLUGIN (StorageMakerFactory, RFIOStorageMaker, "rfio");