CMS 3D CMS Logo

StatisticsSenderService.cc
Go to the documentation of this file.
1 
9 
10 #include <string>
11 #include <cmath>
12 
13 #include <unistd.h>
14 #include <fcntl.h>
15 
16 #include <openssl/x509.h>
17 #include <openssl/pem.h>
18 
19 #define OUTPUT_STATISTIC(x) os << "\"" #x "\":" << (x - m_##x) << ", ";
20 
21 // Simple hack to define HOST_NAME_MAX on Mac.
22 // Allows arrays to be statically allocated
23 #ifndef HOST_NAME_MAX
24 #define HOST_NAME_MAX 128
25 #endif
26 
27 static constexpr char const *const JOB_UNIQUE_ID_ENV = "CRAB_UNIQUE_JOB_ID";
28 static constexpr char const *const JOB_UNIQUE_ID_ENV_V2 = "DashboardJobId";
29 
30 using namespace edm::storage;
31 
33  : m_read_single_operations(0),
34  m_read_single_bytes(0),
35  m_read_single_square(0),
36  m_read_vector_operations(0),
37  m_read_vector_bytes(0),
38  m_read_vector_square(0),
39  m_read_vector_count_sum(0),
40  m_read_vector_count_square(0),
41  m_start_time(time(nullptr)) {}
42 
43 void StatisticsSenderService::FileStatistics::fillUDP(std::ostringstream &os) const {
45  ssize_t read_single_operations = 0;
46  ssize_t read_single_bytes = 0;
47  ssize_t read_single_square = 0;
48  ssize_t read_vector_operations = 0;
49  ssize_t read_vector_bytes = 0;
50  ssize_t read_vector_square = 0;
51  ssize_t read_vector_count_sum = 0;
52  ssize_t read_vector_count_square = 0;
53  auto token = StorageAccount::tokenForStorageClassName("tstoragefile");
54  for (StorageAccount::StorageStats::const_iterator i = stats.begin(); i != stats.end(); ++i) {
55  if (i->first == token.value()) {
56  continue;
57  }
58  for (StorageAccount::OperationStats::const_iterator j = i->second.begin(); j != i->second.end(); ++j) {
59  if (j->first == static_cast<int>(StorageAccount::Operation::readv)) {
60  read_vector_operations += j->second.attempts;
61  read_vector_bytes += j->second.amount;
62  read_vector_count_square += j->second.vector_square;
63  read_vector_square += j->second.amount_square;
64  read_vector_count_sum += j->second.vector_count;
65  } else if (j->first == static_cast<int>(StorageAccount::Operation::read)) {
66  read_single_operations += j->second.attempts;
67  read_single_bytes += j->second.amount;
68  read_single_square += j->second.amount_square;
69  }
70  }
71  }
72  int64_t single_op_count = read_single_operations - m_read_single_operations;
73  if (single_op_count > 0) {
74  double single_sum = read_single_bytes - m_read_single_bytes;
75  double single_average = single_sum / static_cast<double>(single_op_count);
76  os << "\"read_single_sigma\":"
77  << sqrt(std::abs((static_cast<double>(read_single_square - m_read_single_square) -
78  single_average * single_average * single_op_count) /
79  static_cast<double>(single_op_count)))
80  << ", ";
81  os << "\"read_single_average\":" << single_average << ", ";
82  }
83  int64_t vector_op_count = read_vector_operations - m_read_vector_operations;
84  if (vector_op_count > 0) {
85  double vector_average =
86  static_cast<double>(read_vector_bytes - m_read_vector_bytes) / static_cast<double>(vector_op_count);
87  os << "\"read_vector_average\":" << vector_average << ", ";
88  os << "\"read_vector_sigma\":"
89  << sqrt(std::abs((static_cast<double>(read_vector_square - m_read_vector_square) -
90  vector_average * vector_average * vector_op_count) /
91  static_cast<double>(vector_op_count)))
92  << ", ";
93  double vector_count_average =
94  static_cast<double>(read_vector_count_sum - m_read_vector_count_sum) / static_cast<double>(vector_op_count);
95  os << "\"read_vector_count_average\":" << vector_count_average << ", ";
96  os << "\"read_vector_count_sigma\":"
97  << sqrt(std::abs((static_cast<double>(read_vector_count_square - m_read_vector_count_square) -
98  vector_count_average * vector_count_average * vector_op_count) /
99  static_cast<double>(vector_op_count)))
100  << ", ";
101  }
102 
103  os << "\"read_bytes\":" << (read_vector_bytes + read_single_bytes - m_read_vector_bytes - m_read_single_bytes)
104  << ", ";
105  os << "\"read_bytes_at_close\":"
106  << (read_vector_bytes + read_single_bytes - m_read_vector_bytes - m_read_single_bytes) << ", ";
107 
108  // See top of file for macros; not complex, just avoiding copy/paste
109  OUTPUT_STATISTIC(read_single_operations)
110  OUTPUT_STATISTIC(read_single_bytes)
111  OUTPUT_STATISTIC(read_vector_operations)
112  OUTPUT_STATISTIC(read_vector_bytes)
113 
114  os << "\"start_time\":" << m_start_time << ", ";
115  // NOTE: last entry doesn't have the trailing comma.
116  os << "\"end_time\":" << time(nullptr);
117 }
118 
121  ssize_t read_single_operations = 0;
122  ssize_t read_single_bytes = 0;
123  ssize_t read_single_square = 0;
124  ssize_t read_vector_operations = 0;
125  ssize_t read_vector_bytes = 0;
126  ssize_t read_vector_square = 0;
127  ssize_t read_vector_count_sum = 0;
128  ssize_t read_vector_count_square = 0;
129  auto token = StorageAccount::tokenForStorageClassName("tstoragefile");
130  for (StorageAccount::StorageStats::const_iterator i = stats.begin(); i != stats.end(); ++i) {
131  if (i->first == token.value()) {
132  continue;
133  }
134  for (StorageAccount::OperationStats::const_iterator j = i->second.begin(); j != i->second.end(); ++j) {
135  if (j->first == static_cast<int>(StorageAccount::Operation::readv)) {
136  read_vector_operations += j->second.attempts;
137  read_vector_bytes += j->second.amount;
138  read_vector_count_square += j->second.vector_square;
139  read_vector_square += j->second.amount_square;
140  read_vector_count_sum += j->second.vector_count;
141  } else if (j->first == static_cast<int>(StorageAccount::Operation::read)) {
142  read_single_operations += j->second.attempts;
143  read_single_bytes += j->second.amount;
144  read_single_square += j->second.amount_square;
145  }
146  }
147  }
148 
149  m_read_single_square = read_single_square;
150  m_read_vector_square = read_vector_square;
151  m_read_vector_count_square = read_vector_count_square;
152  m_read_vector_count_sum = read_vector_count_sum;
153  m_read_single_operations = read_single_operations;
154  m_read_single_bytes = read_single_bytes;
155  m_read_vector_operations = read_vector_operations;
156  m_read_vector_bytes = read_vector_bytes;
157  m_start_time = time(nullptr);
158 }
160  : m_filelfn(iLFN),
161  m_serverhost("unknown"),
162  m_serverdomain("unknown"),
163  m_type(iType),
164  m_size(-1),
165  m_id(0),
166  m_openCount(1) {}
167 
169  : m_clienthost("unknown"),
170  m_clientdomain("unknown"),
171  m_filestats(),
172  m_guid(Guid().toString()),
173  m_counter(0),
174  m_userdn("unknown"),
175  m_debug(iPSet.getUntrackedParameter<bool>("debug", false)) {
178  if (!getX509Subject(m_userdn)) {
179  m_userdn = "unknown";
180  }
181 }
182 
184  const char *id = getenv(JOB_UNIQUE_ID_ENV);
185  // Dashboard developers requested that we migrate to this environment variable.
186  return id ? id : getenv(JOB_UNIQUE_ID_ENV_V2);
187 }
188 
190  auto found = m_urlToLfn.find(iURL);
191  if (found != m_urlToLfn.end()) {
192  return &found->second;
193  }
194  for (auto const &v : m_lfnToFileInfo) {
195  if (v.first.size() < iURL.size()) {
196  if (v.first == iURL.substr(iURL.size() - v.first.size())) {
197  m_urlToLfn.emplace(iURL, v.first);
198  return &m_urlToLfn.find(iURL)->second;
199  }
200  }
201  }
202  //does the lfn have a protocol and the iURL not?
203  if (std::string::npos == iURL.find(':')) {
204  for (auto const &v : m_lfnToFileInfo) {
205  if ((std::string::npos != v.first.find(':')) and (v.first.size() > iURL.size())) {
206  if (iURL == v.first.substr(v.first.size() - iURL.size())) {
207  m_urlToLfn.emplace(iURL, v.first);
208  return &m_urlToLfn.find(iURL)->second;
209  }
210  }
211  }
212  }
213 
214  return nullptr;
215 }
216 
218  size_t dot_pos = servername.find('.');
219  std::string serverhost;
220  std::string serverdomain;
221  if (dot_pos == std::string::npos) {
222  serverhost = servername.substr(0, servername.find(":"));
223  serverdomain = "unknown";
224  } else {
225  serverhost = servername.substr(0, dot_pos);
226  serverdomain = servername.substr(dot_pos + 1, servername.find(":") - dot_pos - 1);
227  if (serverdomain.empty()) {
228  serverdomain = "unknown";
229  }
230  }
231  {
232  auto lfn = matchedLfn(url);
233  std::lock_guard<std::mutex> sentry(m_servermutex);
234  if (nullptr != lfn) {
235  auto found = m_lfnToFileInfo.find(*lfn);
236  if (found != m_lfnToFileInfo.end()) {
237  found->second.m_serverhost = std::move(serverhost);
238  found->second.m_serverdomain = std::move(serverdomain);
239  }
240  } else if (m_debug) {
241  edm::LogWarning("StatisticsSenderService") << "setCurrentServer: unknown url name " << url << "\n";
242  }
243  }
244 }
245 
247  m_urlToLfn.emplace(lfn, lfn);
248  auto attempt = m_lfnToFileInfo.emplace(lfn, FileInfo{lfn, type});
249  if (attempt.second) {
250  attempt.first->second.m_size = size;
251  attempt.first->second.m_id = m_counter++;
252  edm::LogInfo("StatisticsSenderService") << "openingFile: opening " << lfn << "\n";
253  } else {
254  ++(attempt.first->second.m_openCount);
255  edm::LogInfo("StatisticsSenderService") << "openingFile: re-opening" << lfn << "\n";
256  }
257 }
258 
259 void StatisticsSenderService::closedFile(std::string const &url, bool usedFallback) {
261  if (!pSLC.isAvailable()) {
262  return;
263  }
264 
265  const struct addrinfo *addresses = pSLC->statisticsDestination();
266  if (!addresses and !m_debug) {
267  return;
268  }
269 
270  std::set<std::string> const *info = pSLC->statisticsInfo();
271  if (info && !info->empty() && (m_userdn != "unknown") &&
272  ((info->find("dn") == info->end()) || (info->find("nodn") != info->end()))) {
273  m_userdn = "not reported";
274  }
275 
276  auto lfn = matchedLfn(url);
277  if (nullptr != lfn) {
278  auto found = m_lfnToFileInfo.find(*lfn);
279  assert(found != m_lfnToFileInfo.end());
280 
281  std::string results;
282  fillUDP(pSLC->siteName(), found->second, usedFallback, results);
283  if (m_debug) {
284  edm::LogSystem("StatisticSenderService") << "\n" << results << "\n";
285  }
286 
287  for (const struct addrinfo *address = addresses; address != nullptr; address = address->ai_next) {
288  int sock = socket(address->ai_family, address->ai_socktype, address->ai_protocol);
289  if (sock < 0) {
290  continue;
291  }
292  auto close_del = [](int *iSocket) { close(*iSocket); };
293  std::unique_ptr<int, decltype(close_del)> guard(&sock, close_del);
294  if (sendto(sock, results.c_str(), results.size(), 0, address->ai_addr, address->ai_addrlen) >= 0) {
295  break;
296  }
297  }
298 
299  auto c = --found->second.m_openCount;
300  if (m_debug) {
301  if (c == 0) {
302  edm::LogWarning("StatisticsSenderService") << "fully closed: " << *lfn << "\n";
303  } else {
304  edm::LogWarning("StatisticsSenderService") << "partially closed: " << *lfn << "\n";
305  }
306  }
307  } else if (m_debug) {
308  edm::LogWarning("StatisticsSenderService") << "closed: unknown url name " << url << "\n";
309  }
310 }
311 
313  //remove entries with openCount of 0
314  bool moreToTest = false;
315  do {
316  moreToTest = false;
317  for (auto it = m_lfnToFileInfo.begin(); it != m_lfnToFileInfo.end(); ++it) {
318  if (it->second.m_openCount == 0) {
319  auto lfn = it->first;
320  bool moreToTest2 = false;
321  do {
322  moreToTest2 = false;
323  for (auto it2 = m_urlToLfn.begin(); it2 != m_urlToLfn.end(); ++it2) {
324  if (it2->second == lfn) {
325  m_urlToLfn.unsafe_erase(it2);
326  moreToTest2 = true;
327  break;
328  }
329  }
330  } while (moreToTest2);
331 
332  m_lfnToFileInfo.unsafe_erase(it);
333  moreToTest = true;
334  break;
335  }
336  }
337  } while (moreToTest);
338 }
339 
341  auto lfn = matchedLfn(url);
342  if (nullptr != lfn) {
343  auto itFound = m_lfnToFileInfo.find(*lfn);
344  if (itFound != m_lfnToFileInfo.end()) {
345  itFound->second.m_size = size;
346  }
347  } else if (m_debug) {
348  edm::LogWarning("StatisticsSenderService") << "setSize: unknown url name " << url << "\n";
349  }
350 }
351 
352 void StatisticsSenderService::filePostCloseEvent(std::string const &lfn, bool usedFallback) {
353  //we are at a sync point in the framwework so no new files are being opened
354  cleanupOldFiles();
356 }
357 
359  char tmpName[HOST_NAME_MAX];
360  if (gethostname(tmpName, HOST_NAME_MAX) != 0) {
361  // Sigh, no way to log errors from here.
362  m_clienthost = "unknown";
363  } else {
364  m_clienthost = tmpName;
365  }
366  size_t dot_pos = m_clienthost.find(".");
367  if (dot_pos == std::string::npos) {
368  m_clientdomain = "unknown";
369  } else {
370  m_clientdomain = m_clienthost.substr(dot_pos + 1, m_clienthost.size() - dot_pos - 1);
371  m_clienthost = m_clienthost.substr(0, dot_pos);
372  }
373 }
374 
376  const FileInfo &fileinfo,
377  bool usedFallback,
378  std::string &udpinfo) const {
379  std::ostringstream os;
380 
381  // Header - same for all IO accesses
382  os << "{";
383  if (!siteName.empty()) {
384  os << "\"site_name\":\"" << siteName << "\", ";
385  }
386  if (usedFallback) {
387  os << "\"fallback\": true, ";
388  } else {
389  os << "\"fallback\": false, ";
390  }
391  os << "\"type\": ";
392  switch (fileinfo.m_type) {
394  os << "\"primary\", ";
395  break;
396  }
398  os << "\"secondary\", ";
399  break;
400  }
402  os << "\"embedded\", ";
403  break;
404  }
405  }
406  auto serverhost = fileinfo.m_serverhost;
407  auto serverdomain = fileinfo.m_serverdomain;
408 
409  os << "\"user_dn\":\"" << m_userdn << "\", ";
410  os << "\"client_host\":\"" << m_clienthost << "\", ";
411  os << "\"client_domain\":\"" << m_clientdomain << "\", ";
412  os << "\"server_host\":\"" << serverhost << "\", ";
413  os << "\"server_domain\":\"" << serverdomain << "\", ";
414  os << "\"unique_id\":\"" << m_guid << "-" << fileinfo.m_id << "\", ";
415  os << "\"file_lfn\":\"" << fileinfo.m_filelfn << "\", ";
416  // Dashboard devs requested that we send out no app_info if a job ID
417  // is not present in the environment.
418  const char *jobId = getJobID();
419  if (jobId) {
420  os << "\"app_info\":\"" << jobId << "\", ";
421  }
422 
423  if (fileinfo.m_size >= 0) {
424  os << "\"file_size\":" << fileinfo.m_size << ", ";
425  }
426 
427  m_filestats.fillUDP(os);
428 
429  os << "}";
430  udpinfo = os.str();
431 }
432 
433 /*
434  * Pull the X509 user subject from the environment.
435  * Based on initial code from the Frontier client:
436  * http://cdcvs.fnal.gov/cgi-bin/public-cvs/cvsweb-public.cgi/~checkout~/frontier/client/frontier.c?rev=1.57&content-type=text/plain
437  * This was further extended by walking up the returned chain similar to the Globus function
438  * globus_gsi_cert_utils-6.6/library/globus_gsi_cert_utils.c:globus_gsi_cert_utils_get_eec
439  * globus_gsi_credential-3.5/library/globus_gsi_credential.c:globus_gsi_cred_read_proxy_bio
440  */
441 
442 /*
443  * Given a stack of x509 proxies, take a guess at the EEC.
444  * Assumes the proxies are in reverse sorted order and looks for the first
445  * proxy which is not a substring of the prior proxy.
446  * THIS DOES NOT VERIFY THE RESULTS, and is a best-effort GUESS.
447  * Again, DO NOT REUSE THIS CODE THINKING IT VERIFIES THE CHAIN!
448  */
449 static X509 *findEEC(STACK_OF(X509) * certstack) {
450  int depth = sk_X509_num(certstack);
451  if (depth == 0) {
452  return nullptr;
453  }
454  int idx = depth - 1;
455  char *priorsubject = nullptr;
456  char *subject = nullptr;
457  X509 *x509cert = sk_X509_value(certstack, idx);
458  for (; x509cert && idx > 0; idx--) {
459  subject = X509_NAME_oneline(X509_get_subject_name(x509cert), nullptr, 0);
460  if (subject && priorsubject && (strncmp(subject, priorsubject, strlen(subject)) != 0)) {
461  break;
462  }
463  x509cert = sk_X509_value(certstack, idx);
464  if (subject) {
465  OPENSSL_free(subject);
466  subject = nullptr;
467  }
468  }
469  if (subject) {
470  OPENSSL_free(subject);
471  subject = nullptr;
472  }
473  return x509cert;
474 }
475 
477  BIO *biof = nullptr;
478  STACK_OF(X509) *certs = nullptr;
479  char *subject = nullptr;
480  unsigned char *data = nullptr;
481  char *header = nullptr;
482  char *name = nullptr;
483  long len = 0U;
484 
485  if ((biof = BIO_new_file(filename.c_str(), "r"))) {
486  certs = sk_X509_new_null();
487  bool encountered_error = false;
488  while ((!encountered_error) && (!BIO_eof(biof)) && PEM_read_bio(biof, &name, &header, &data, &len)) {
489  if (strcmp(name, PEM_STRING_X509) == 0 || strcmp(name, PEM_STRING_X509_OLD) == 0) {
490  X509 *tmp_cert = nullptr;
491  // See WARNINGS section in http://www.openssl.org/docs/crypto/d2i_X509.html
492  // Without this cmsRun crashes on a mac with a valid grid proxy.
493  const unsigned char *p;
494  p = data;
495  tmp_cert = d2i_X509(&tmp_cert, &p, len);
496  if (tmp_cert) {
497  sk_X509_push(certs, tmp_cert);
498  } else {
499  encountered_error = true;
500  }
501  } // Note we ignore any proxy key in the file.
502  if (data) {
503  OPENSSL_free(data);
504  data = nullptr;
505  }
506  if (header) {
507  OPENSSL_free(header);
508  header = nullptr;
509  }
510  if (name) {
511  OPENSSL_free(name);
512  name = nullptr;
513  }
514  }
515  X509 *x509cert = nullptr;
516  if (!encountered_error && sk_X509_num(certs)) {
517  x509cert = findEEC(certs);
518  }
519  if (x509cert) {
520  subject = X509_NAME_oneline(X509_get_subject_name(x509cert), nullptr, 0);
521  }
522  // Note we do not free x509cert directly, as it's still owned by the certs stack.
523  if (certs) {
524  sk_X509_pop_free(certs, X509_free);
525  x509cert = nullptr;
526  }
527  BIO_free(biof);
528  if (subject) {
529  result = subject;
530  OPENSSL_free(subject);
531  return true;
532  }
533  }
534  return false;
535 }
536 
538  char *filename = getenv("X509_USER_PROXY");
539  if (filename && getX509SubjectFromFile(filename, result)) {
540  return true;
541  }
542  std::stringstream ss;
543  ss << "/tmp/x509up_u" << geteuid();
544  return getX509SubjectFromFile(ss.str(), result);
545 }
size
Write out results.
type
Definition: HCALResponse.h:21
virtual struct addrinfo const * statisticsDestination() const =0
InputType
Definition: InputType.h:5
static const TGPicture * info(bool iBackgroundIsBlack)
static char const *const JOB_UNIQUE_ID_ENV_V2
Definition: X509.py:1
void setSize(const std::string &urlOrLfn, size_t size)
#define OUTPUT_STATISTIC(x)
#define nullptr
static const StorageStats & summary(void)
void fillUDP(const std::string &site, const FileInfo &fileinfo, bool, std::string &) const
virtual std::set< std::string > const * statisticsInfo() const =0
tbb::concurrent_unordered_map< std::string, std::string > m_urlToLfn
static X509 * findEEC(STACK_OF(X509)*certstack)
static char const *const JOB_UNIQUE_ID_ENV
tbb::concurrent_unordered_map< int, OperationStats > StorageStats
Definition: Guid.h:23
void watchPostCloseFile(PostCloseFile::slot_type const &iSlot)
static StorageClassToken tokenForStorageClassName(std::string const &iName)
virtual std::string const & siteName(void) const =0
T sqrt(T t)
Definition: SSEVec.h:18
std::string const * matchedLfn(std::string const &iURL)
bool isAvailable() const
Definition: Service.h:40
Abs< T >::type abs(const T &t)
Definition: Abs.h:22
std::string toString(const std::pair< T, T > &aT)
Definition: CaloEllipse.h:72
void openingFile(std::string const &lfn, edm::InputType type, size_t size=-1)
void setCurrentServer(const std::string &urlOrLfn, const std::string &servername)
tbb::concurrent_unordered_map< std::string, FileInfo > m_lfnToFileInfo
void closedFile(std::string const &lfn, bool usedFallback)
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
#define HOST_NAME_MAX
FileInfo(std::string const &iLFN, edm::InputType)
StatisticsSenderService(edm::ParameterSet const &pset, edm::ActivityRegistry &ar)
static bool getX509SubjectFromFile(const std::string &filename, std::string &result)
def move(src, dest)
Definition: eostools.py:511
#define constexpr
void filePostCloseEvent(std::string const &lfn, bool usedFallback)