Go to the documentation of this file.00001 #include "Utilities/StorageFactory/interface/StorageMaker.h"
00002 #include "Utilities/StorageFactory/interface/StorageFactory.h"
00003 #include "Utilities/StorageFactory/interface/StorageMakerFactory.h"
00004 #include "Utilities/RFIOAdaptor/interface/RFIOFile.h"
00005 #include "Utilities/RFIOAdaptor/interface/RFIO.h"
00006 #include "Utilities/RFIOAdaptor/interface/RFIOPluginFactory.h"
00007 #include "FWCore/ServiceRegistry/interface/Service.h"
00008 #include "FWCore/Catalog/interface/SiteLocalConfig.h"
00009 #include "FWCore/MessageLogger/interface/MessageLogger.h"
00010 #include "FWCore/Utilities/interface/Exception.h"
00011 #include <cstdlib>
00012 #include "shift/stager_api.h"
00013
00014 class RFIOStorageMaker : public StorageMaker
00015 {
00019 std::string normalise (const std::string &path)
00020 {
00021 std::string prefix;
00022
00023 size_t suffix = path.find("?");
00024 if (suffix == std::string::npos)
00025 {
00026
00027 suffix = 0;
00028 if (path.find (":") == std::string::npos)
00029 {
00030 size_t e = path.find_first_not_of ("/");
00031 if (e != std::string::npos)
00032 {
00033 size_t c = path.find("/castor/");
00034 if ((c != std::string::npos) && (c == e-1))
00035 {
00036
00037 suffix = c;
00038 prefix = "rfio:///?path=";
00039 }
00040 else
00041 {
00042 c = path.find("/dpm/");
00043 if ((c != std::string::npos) && (c == e-1))
00044 {
00045
00046 suffix = c;
00047 prefix = "rfio://";
00048 }
00049 }
00050 }
00051 }
00052 }
00053 else
00054 {
00055
00056 prefix = path.substr(0, suffix);
00057 size_t h = prefix.find_first_not_of('/');
00058 size_t s = prefix.find_last_not_of('/');
00059 prefix.resize(s+1);
00060 prefix.replace(0,h,"rfio://");
00061 prefix += '/';
00062 }
00063
00064 return prefix + path.substr(suffix);
00065 }
00066
00067 public:
00068 RFIOStorageMaker()
00069 {
00070 std::string rfiotype("");
00071 bool err = false;
00072 try
00073 {
00074 edm::Service<edm::SiteLocalConfig> siteconfig;
00075 if (!siteconfig.isAvailable())
00076 err = true;
00077 else
00078 rfiotype = siteconfig->rfioType();
00079 }
00080 catch (const cms::Exception &e)
00081 {
00082 err = true;
00083 }
00084
00085 if (err)
00086 edm::LogWarning("RFIOStorageMaker")
00087 << "SiteLocalConfig Failed: SiteLocalConfigService is not loaded yet."
00088 << "Will use default 'castor' RFIO implementation.";
00089
00090 if (rfiotype.empty())
00091 rfiotype = "castor";
00092
00093
00094 putenv(const_cast<char *>("RFIO_TCP_NODELAY=1"));
00095
00096 RFIOPluginFactory::get()->create(rfiotype);
00097 Cthread_init();
00098 }
00099
00100 virtual Storage *open (const std::string &proto,
00101 const std::string &path,
00102 int mode)
00103 {
00104 StorageFactory *f = StorageFactory::get();
00105 StorageFactory::ReadHint readHint = f->readHint();
00106 StorageFactory::CacheHint cacheHint = f->cacheHint();
00107
00108 if (readHint == StorageFactory::READ_HINT_READAHEAD
00109 || cacheHint == StorageFactory::CACHE_HINT_STORAGE)
00110 mode &= ~IOFlags::OpenUnbuffered;
00111 else
00112 mode |= IOFlags::OpenUnbuffered;
00113
00114 Storage *file = new RFIOFile(normalise(path), mode);
00115 return f->wrapNonLocalFile(file, proto, std::string(), mode);
00116 }
00117
00118 virtual void stagein (const std::string &proto,
00119 const std::string &path)
00120 {
00121 std::string npath = normalise(path);
00122 size_t castor = npath.find("?path=/castor/");
00123 size_t rest = npath.find("&");
00124 if (proto != "rfio" || castor == std::string::npos)
00125 return;
00126
00127 castor += 6;
00128 size_t len = (rest == std::string::npos ? rest : rest-castor);
00129 std::string stagepath(npath, castor, len);
00130
00131 stage_options opts;
00132 opts.stage_host = getenv("STAGE_HOST");
00133 opts.service_class = getenv("STAGE_SVCCLASS");
00134 opts.stage_port = 0;
00135 opts.stage_version = 2;
00136
00137 stage_prepareToGet_filereq req;
00138 req.protocol = (char *) "rfio";
00139 req.filename = (char *) stagepath.c_str();
00140 req.priority = 0;
00141
00142 int nresp = 0;
00143 stage_prepareToGet_fileresp *resp = 0;
00144 int rc = stage_prepareToGet(0, &req, 1, &resp, &nresp, 0, &opts);
00145 if (rc < 0) {
00146 cms::Exception ex("FileStageInError");
00147 ex << "Error while staging in '" << stagepath
00148 << "', error was: " << rfio_serror()
00149 << " (serrno=" << serrno << ")";
00150 ex.addContext("Calling RFIOStorageMaker::stagein()");
00151 throw ex;
00152 }
00153
00154 if (nresp == 1 && resp->errorCode != 0) {
00155 cms::Exception ex("FileStageInError");
00156 ex << "Error while staging in '" << stagepath
00157 << "', stagein error was: " << resp->errorMessage
00158 << " (code=" << resp->errorCode << ")";
00159 ex.addContext("Calling RFIOStorageMaker::stagein()");
00160 throw ex;
00161 }
00162
00163 free(resp->filename);
00164 free(resp->errorMessage);
00165 free(resp);
00166 }
00167
00168 virtual bool check (const std::string &,
00169 const std::string &path,
00170 IOOffset *size = 0)
00171 {
00172 std::string npath = normalise(path);
00173 if (rfio_access(npath.c_str (), R_OK) != 0)
00174 return false;
00175
00176 if (size)
00177 {
00178 struct stat buf;
00179 if (rfio_stat64(npath.c_str (), &buf) != 0)
00180 return false;
00181
00182 *size = buf.st_size;
00183 }
00184
00185 return true;
00186 }
00187 };
00188
00189 DEFINE_EDM_PLUGIN (StorageMakerFactory, RFIOStorageMaker, "rfio");