CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
RFIOStorageMaker.cc
Go to the documentation of this file.
11 #include <cstdlib>
12 #include "shift/stager_api.h"
13 
15 {
20  {
22  // look for options
23  size_t suffix = path.find("?");
24  if (suffix == std::string::npos)
25  {
26  // convert old syntax to new but leave "host:/path" alone
27  suffix = 0;
28  if (path.find (":") == std::string::npos)
29  {
30  size_t e = path.find_first_not_of ("/");
31  if (e != std::string::npos)
32  {
33  size_t c = path.find("/castor/");
34  if ((c != std::string::npos) && (c == e-1))
35  {
36  // /castor/path -> rfio:///?path=/castor/path
37  suffix = c;
38  prefix = "rfio:///?path=";
39  }
40  else
41  {
42  c = path.find("/dpm/");
43  if ((c != std::string::npos) && (c == e-1))
44  {
45  // /dpm/path -> rfio:///dpm/path
46  suffix = c;
47  prefix = "rfio://";
48  }
49  }
50  }
51  }
52  }
53  else
54  {
55  // new syntax, leave alone except normalize host
56  prefix = path.substr(0, suffix);
57  size_t h = prefix.find_first_not_of('/');
58  size_t s = prefix.find_last_not_of('/');
59  prefix.resize(s+1);
60  prefix.replace(0,h,"rfio://");
61  prefix += '/';
62  }
63 
64  return prefix + path.substr(suffix);
65  }
66 
67 public:
69  {
70  std::string rfiotype("");
71  bool err = false;
72  try
73  {
75  if (!siteconfig.isAvailable())
76  err = true;
77  else
78  rfiotype = siteconfig->rfioType();
79  }
80  catch (const cms::Exception &e)
81  {
82  err = true;
83  }
84 
85  if (err)
86  edm::LogWarning("RFIOStorageMaker")
87  << "SiteLocalConfig Failed: SiteLocalConfigService is not loaded yet."
88  << "Will use default 'castor' RFIO implementation.";
89 
90  if (rfiotype.empty())
91  rfiotype = "castor";
92 
93  // Force Castor to move control messages out of client TCP buffers faster.
94  putenv(const_cast<char *>("RFIO_TCP_NODELAY=1"));
95 
96  RFIOPluginFactory::get()->create(rfiotype);
97  Cthread_init();
98  }
99 
100  virtual Storage *open (const std::string &proto,
101  const std::string &path,
102  int mode) override
103  {
105  StorageFactory::ReadHint readHint = f->readHint();
106  StorageFactory::CacheHint cacheHint = f->cacheHint();
107 
109  || cacheHint == StorageFactory::CACHE_HINT_STORAGE)
110  mode &= ~IOFlags::OpenUnbuffered;
111  else
112  mode |= IOFlags::OpenUnbuffered;
113 
114  Storage *file = new RFIOFile(normalise(path), mode);
115  return f->wrapNonLocalFile(file, proto, std::string(), mode);
116  }
117 
118  virtual void stagein (const std::string &proto,
119  const std::string &path) override
120  {
121  std::string npath = normalise(path);
122  size_t castor = npath.find("?path=/castor/");
123  size_t rest = npath.find("&");
124  if (proto != "rfio" || castor == std::string::npos)
125  return;
126 
127  castor += 6;
128  size_t len = (rest == std::string::npos ? rest : rest-castor);
129  std::string stagepath(npath, castor, len);
130 
131  stage_options opts;
132  opts.stage_host = getenv("STAGE_HOST");
133  opts.service_class = getenv("STAGE_SVCCLASS");
134  opts.stage_port = 0;
135  opts.stage_version = 2;
136 
137  stage_prepareToGet_filereq req;
138  req.protocol = (char *) "rfio";
139  req.filename = (char *) stagepath.c_str();
140  req.priority = 0;
141 
142  int nresp = 0;
143  stage_prepareToGet_fileresp *resp = 0;
144  int rc = stage_prepareToGet(0, &req, 1, &resp, &nresp, 0, &opts);
145  if (rc < 0) {
146  cms::Exception ex("FileStageInError");
147  ex << "Error while staging in '" << stagepath
148  << "', error was: " << rfio_serror()
149  << " (serrno=" << serrno << ")";
150  ex.addContext("Calling RFIOStorageMaker::stagein()");
151  throw ex;
152  }
153 
154  if (nresp == 1 && resp->errorCode != 0) {
155  cms::Exception ex("FileStageInError");
156  ex << "Error while staging in '" << stagepath
157  << "', stagein error was: " << resp->errorMessage
158  << " (code=" << resp->errorCode << ")";
159  ex.addContext("Calling RFIOStorageMaker::stagein()");
160  throw ex;
161  }
162 
163  free(resp->filename);
164  free(resp->errorMessage);
165  free(resp);
166  }
167 
168  virtual bool check (const std::string &/*proto*/,
169  const std::string &path,
170  IOOffset *size = 0) override
171  {
172  std::string npath = normalise(path);
173  if (rfio_access(npath.c_str (), R_OK) != 0)
174  return false;
175 
176  if (size)
177  {
178  struct stat buf;
179  if (rfio_stat64(npath.c_str (), &buf) != 0)
180  return false;
181 
182  *size = buf.st_size;
183  }
184 
185  return true;
186  }
187 };
188 
CacheHint cacheHint(void) const
Storage * wrapNonLocalFile(Storage *s, const std::string &proto, const std::string &path, int mode)
static StorageFactory * get(void)
int Cthread_init(void)
virtual bool check(const std::string &, const std::string &path, IOOffset *size=0) override
int rfio_access(const char *filepath, int mode)
#define serrno
Definition: RFIO.h:10
virtual Storage * open(const std::string &proto, const std::string &path, int mode) override
std::string normalise(const std::string &path)
bool isAvailable() const
Definition: Service.h:46
double f[11][100]
virtual void stagein(const std::string &proto, const std::string &path) override
char * rfio_serror()
The Signals That Services Can Subscribe To This is based on ActivityRegistry h
Helper function to determine trigger accepts.
Definition: Activities.doc:4
ReadHint readHint(void) const
void addContext(std::string const &context)
Definition: Exception.cc:227
int64_t IOOffset
Definition: IOTypes.h:19
#define DEFINE_EDM_PLUGIN(factory, type, name)
virtual std::string const rfioType(void) const =0
int rfio_stat64(const char *path, struct stat *statbuf)
tuple size
Write out results.
T get(const Candidate &c)
Definition: component.h:55