CMS 3D CMS Logo

StatisticsSenderService.cc
Go to the documentation of this file.
1 
10 
11 #include <string>
12 #include <cmath>
13 
14 #include <unistd.h>
15 #include <fcntl.h>
16 
17 #include <openssl/x509.h>
18 #include <openssl/pem.h>
19 
20 #define OUTPUT_STATISTIC(x) os << "\"" #x "\":" << (x - m_##x) << ", ";
21 
22 // Simple hack to define HOST_NAME_MAX on Mac.
23 // Allows arrays to be statically allocated
24 #ifndef HOST_NAME_MAX
25 #define HOST_NAME_MAX 128
26 #endif
27 
28 static constexpr char const *const JOB_UNIQUE_ID_ENV = "CRAB_UNIQUE_JOB_ID";
29 static constexpr char const *const JOB_UNIQUE_ID_ENV_V2 = "DashboardJobId";
30 
31 using namespace edm::storage;
32 
34  : m_read_single_operations(0),
35  m_read_single_bytes(0),
36  m_read_single_square(0),
37  m_read_vector_operations(0),
38  m_read_vector_bytes(0),
39  m_read_vector_square(0),
40  m_read_vector_count_sum(0),
41  m_read_vector_count_square(0),
42  m_start_time(time(nullptr)) {}
43 
44 void StatisticsSenderService::FileStatistics::fillUDP(std::ostringstream &os) const {
46  ssize_t read_single_operations = 0;
47  ssize_t read_single_bytes = 0;
48  ssize_t read_single_square = 0;
49  ssize_t read_vector_operations = 0;
50  ssize_t read_vector_bytes = 0;
51  ssize_t read_vector_square = 0;
52  ssize_t read_vector_count_sum = 0;
53  ssize_t read_vector_count_square = 0;
54  auto token = StorageAccount::tokenForStorageClassName("tstoragefile");
55  for (StorageAccount::StorageStats::const_iterator i = stats.begin(); i != stats.end(); ++i) {
56  if (i->first == token.value()) {
57  continue;
58  }
59  for (StorageAccount::OperationStats::const_iterator j = i->second.begin(); j != i->second.end(); ++j) {
60  if (j->first == static_cast<int>(StorageAccount::Operation::readv)) {
61  read_vector_operations += j->second.attempts;
62  read_vector_bytes += j->second.amount;
63  read_vector_count_square += j->second.vector_square;
64  read_vector_square += j->second.amount_square;
65  read_vector_count_sum += j->second.vector_count;
66  } else if (j->first == static_cast<int>(StorageAccount::Operation::read)) {
67  read_single_operations += j->second.attempts;
68  read_single_bytes += j->second.amount;
69  read_single_square += j->second.amount_square;
70  }
71  }
72  }
73  int64_t single_op_count = read_single_operations - m_read_single_operations;
74  if (single_op_count > 0) {
75  double single_sum = read_single_bytes - m_read_single_bytes;
76  double single_average = single_sum / static_cast<double>(single_op_count);
77  os << "\"read_single_sigma\":"
78  << sqrt(std::abs((static_cast<double>(read_single_square - m_read_single_square) -
79  single_average * single_average * single_op_count) /
80  static_cast<double>(single_op_count)))
81  << ", ";
82  os << "\"read_single_average\":" << single_average << ", ";
83  }
84  int64_t vector_op_count = read_vector_operations - m_read_vector_operations;
85  if (vector_op_count > 0) {
86  double vector_average =
87  static_cast<double>(read_vector_bytes - m_read_vector_bytes) / static_cast<double>(vector_op_count);
88  os << "\"read_vector_average\":" << vector_average << ", ";
89  os << "\"read_vector_sigma\":"
90  << sqrt(std::abs((static_cast<double>(read_vector_square - m_read_vector_square) -
91  vector_average * vector_average * vector_op_count) /
92  static_cast<double>(vector_op_count)))
93  << ", ";
94  double vector_count_average =
95  static_cast<double>(read_vector_count_sum - m_read_vector_count_sum) / static_cast<double>(vector_op_count);
96  os << "\"read_vector_count_average\":" << vector_count_average << ", ";
97  os << "\"read_vector_count_sigma\":"
98  << sqrt(std::abs((static_cast<double>(read_vector_count_square - m_read_vector_count_square) -
99  vector_count_average * vector_count_average * vector_op_count) /
100  static_cast<double>(vector_op_count)))
101  << ", ";
102  }
103 
104  os << "\"read_bytes\":" << (read_vector_bytes + read_single_bytes - m_read_vector_bytes - m_read_single_bytes)
105  << ", ";
106  os << "\"read_bytes_at_close\":"
107  << (read_vector_bytes + read_single_bytes - m_read_vector_bytes - m_read_single_bytes) << ", ";
108 
109  // See top of file for macros; not complex, just avoiding copy/paste
110  OUTPUT_STATISTIC(read_single_operations)
111  OUTPUT_STATISTIC(read_single_bytes)
112  OUTPUT_STATISTIC(read_vector_operations)
113  OUTPUT_STATISTIC(read_vector_bytes)
114 
115  os << "\"start_time\":" << m_start_time << ", ";
116  // NOTE: last entry doesn't have the trailing comma.
117  os << "\"end_time\":" << time(nullptr);
118 }
119 
122  ssize_t read_single_operations = 0;
123  ssize_t read_single_bytes = 0;
124  ssize_t read_single_square = 0;
125  ssize_t read_vector_operations = 0;
126  ssize_t read_vector_bytes = 0;
127  ssize_t read_vector_square = 0;
128  ssize_t read_vector_count_sum = 0;
129  ssize_t read_vector_count_square = 0;
130  auto token = StorageAccount::tokenForStorageClassName("tstoragefile");
131  for (StorageAccount::StorageStats::const_iterator i = stats.begin(); i != stats.end(); ++i) {
132  if (i->first == token.value()) {
133  continue;
134  }
135  for (StorageAccount::OperationStats::const_iterator j = i->second.begin(); j != i->second.end(); ++j) {
136  if (j->first == static_cast<int>(StorageAccount::Operation::readv)) {
137  read_vector_operations += j->second.attempts;
138  read_vector_bytes += j->second.amount;
139  read_vector_count_square += j->second.vector_square;
140  read_vector_square += j->second.amount_square;
141  read_vector_count_sum += j->second.vector_count;
142  } else if (j->first == static_cast<int>(StorageAccount::Operation::read)) {
143  read_single_operations += j->second.attempts;
144  read_single_bytes += j->second.amount;
145  read_single_square += j->second.amount_square;
146  }
147  }
148  }
149 
150  m_read_single_square = read_single_square;
151  m_read_vector_square = read_vector_square;
152  m_read_vector_count_square = read_vector_count_square;
153  m_read_vector_count_sum = read_vector_count_sum;
154  m_read_single_operations = read_single_operations;
155  m_read_single_bytes = read_single_bytes;
156  m_read_vector_operations = read_vector_operations;
157  m_read_vector_bytes = read_vector_bytes;
158  m_start_time = time(nullptr);
159 }
161  : m_filelfn(iLFN),
162  m_serverhost("unknown"),
163  m_serverdomain("unknown"),
164  m_type(iType),
165  m_size(-1),
166  m_id(0),
167  m_openCount(1) {}
168 
170  : m_clienthost("unknown"),
171  m_clientdomain("unknown"),
172  m_filestats(),
174  m_counter(0),
175  m_userdn("unknown"),
176  m_debug(iPSet.getUntrackedParameter<bool>("debug", false)) {
179  if (!getX509Subject(m_userdn)) {
180  m_userdn = "unknown";
181  }
182 }
183 
185  const char *id = std::getenv(JOB_UNIQUE_ID_ENV);
186  // Dashboard developers requested that we migrate to this environment variable.
187  return id ? id : std::getenv(JOB_UNIQUE_ID_ENV_V2);
188 }
189 
191  auto found = m_urlToLfn.find(iURL);
192  if (found != m_urlToLfn.end()) {
193  return &found->second;
194  }
195  for (auto const &v : m_lfnToFileInfo) {
196  if (v.first.size() < iURL.size()) {
197  if (v.first == iURL.substr(iURL.size() - v.first.size())) {
198  m_urlToLfn.emplace(iURL, v.first);
199  return &m_urlToLfn.find(iURL)->second;
200  }
201  }
202  }
203  //does the lfn have a protocol and the iURL not?
204  if (std::string::npos == iURL.find(':')) {
205  for (auto const &v : m_lfnToFileInfo) {
206  if ((std::string::npos != v.first.find(':')) and (v.first.size() > iURL.size())) {
207  if (iURL == v.first.substr(v.first.size() - iURL.size())) {
208  m_urlToLfn.emplace(iURL, v.first);
209  return &m_urlToLfn.find(iURL)->second;
210  }
211  }
212  }
213  }
214 
215  return nullptr;
216 }
217 
219  size_t dot_pos = servername.find('.');
220  std::string serverhost;
221  std::string serverdomain;
222  if (dot_pos == std::string::npos) {
223  serverhost = servername.substr(0, servername.find(':'));
224  serverdomain = "unknown";
225  } else {
226  serverhost = servername.substr(0, dot_pos);
227  serverdomain = servername.substr(dot_pos + 1, servername.find(':') - dot_pos - 1);
228  if (serverdomain.empty()) {
229  serverdomain = "unknown";
230  }
231  }
232  {
233  auto lfn = matchedLfn(url);
234  std::lock_guard<std::mutex> sentry(m_servermutex);
235  if (nullptr != lfn) {
236  auto found = m_lfnToFileInfo.find(*lfn);
237  if (found != m_lfnToFileInfo.end()) {
238  found->second.m_serverhost = std::move(serverhost);
239  found->second.m_serverdomain = std::move(serverdomain);
240  }
241  } else if (m_debug) {
242  edm::LogWarning("StatisticsSenderService") << "setCurrentServer: unknown url name " << url << "\n";
243  }
244  }
245 }
246 
248  m_urlToLfn.emplace(lfn, lfn);
249  auto attempt = m_lfnToFileInfo.emplace(lfn, FileInfo{lfn, type});
250  if (attempt.second) {
251  attempt.first->second.m_size = size;
252  attempt.first->second.m_id = m_counter++;
253  edm::LogInfo("StatisticsSenderService") << "openingFile: opening " << lfn << "\n";
254  } else {
255  ++(attempt.first->second.m_openCount);
256  edm::LogInfo("StatisticsSenderService") << "openingFile: re-opening" << lfn << "\n";
257  }
258 }
259 
260 void StatisticsSenderService::closedFile(std::string const &url, bool usedFallback) {
262  if (!pSLC.isAvailable()) {
263  return;
264  }
265 
266  const struct addrinfo *addresses = pSLC->statisticsDestination();
267  if (!addresses and !m_debug) {
268  return;
269  }
270 
271  std::set<std::string> const *info = pSLC->statisticsInfo();
272  if (info && !info->empty() && (m_userdn != "unknown") &&
273  ((info->find("dn") == info->end()) || (info->find("nodn") != info->end()))) {
274  m_userdn = "not reported";
275  }
276 
277  auto lfn = matchedLfn(url);
278  if (nullptr != lfn) {
279  auto found = m_lfnToFileInfo.find(*lfn);
280  assert(found != m_lfnToFileInfo.end());
281 
283  fillUDP(pSLC->siteName(), found->second, usedFallback, results);
284  if (m_debug) {
285  edm::LogSystem("StatisticSenderService") << "\n" << results << "\n";
286  }
287 
288  for (const struct addrinfo *address = addresses; address != nullptr; address = address->ai_next) {
289  int sock = socket(address->ai_family, address->ai_socktype, address->ai_protocol);
290  if (sock < 0) {
291  continue;
292  }
293  auto close_del = [](int *iSocket) { close(*iSocket); };
294  std::unique_ptr<int, decltype(close_del)> guard(&sock, close_del);
295  if (sendto(sock, results.c_str(), results.size(), 0, address->ai_addr, address->ai_addrlen) >= 0) {
296  break;
297  }
298  }
299 
300  auto c = --found->second.m_openCount;
301  if (m_debug) {
302  if (c == 0) {
303  edm::LogWarning("StatisticsSenderService") << "fully closed: " << *lfn << "\n";
304  } else {
305  edm::LogWarning("StatisticsSenderService") << "partially closed: " << *lfn << "\n";
306  }
307  }
308  } else if (m_debug) {
309  edm::LogWarning("StatisticsSenderService") << "closed: unknown url name " << url << "\n";
310  }
311 }
312 
314  //remove entries with openCount of 0
315  bool moreToTest = false;
316  do {
317  moreToTest = false;
318  for (auto it = m_lfnToFileInfo.begin(); it != m_lfnToFileInfo.end(); ++it) {
319  if (it->second.m_openCount == 0) {
320  auto lfn = it->first;
321  bool moreToTest2 = false;
322  do {
323  moreToTest2 = false;
324  for (auto it2 = m_urlToLfn.begin(); it2 != m_urlToLfn.end(); ++it2) {
325  if (it2->second == lfn) {
326  m_urlToLfn.unsafe_erase(it2);
327  moreToTest2 = true;
328  break;
329  }
330  }
331  } while (moreToTest2);
332 
333  m_lfnToFileInfo.unsafe_erase(it);
334  moreToTest = true;
335  break;
336  }
337  }
338  } while (moreToTest);
339 }
340 
342  auto lfn = matchedLfn(url);
343  if (nullptr != lfn) {
344  auto itFound = m_lfnToFileInfo.find(*lfn);
345  if (itFound != m_lfnToFileInfo.end()) {
346  itFound->second.m_size = size;
347  }
348  } else if (m_debug) {
349  edm::LogWarning("StatisticsSenderService") << "setSize: unknown url name " << url << "\n";
350  }
351 }
352 
354  //we are at a sync point in the framwework so no new files are being opened
355  cleanupOldFiles();
357 }
358 
360  char tmpName[HOST_NAME_MAX];
361  if (gethostname(tmpName, HOST_NAME_MAX) != 0) {
362  // Sigh, no way to log errors from here.
363  m_clienthost = "unknown";
364  } else {
365  m_clienthost = tmpName;
366  }
367  size_t dot_pos = m_clienthost.find('.');
368  if (dot_pos == std::string::npos) {
369  m_clientdomain = "unknown";
370  } else {
371  m_clientdomain = m_clienthost.substr(dot_pos + 1, m_clienthost.size() - dot_pos - 1);
372  m_clienthost = m_clienthost.substr(0, dot_pos);
373  }
374 }
375 
377  const FileInfo &fileinfo,
378  bool usedFallback,
379  std::string &udpinfo) const {
380  std::ostringstream os;
381 
382  // Header - same for all IO accesses
383  os << "{";
384  if (!siteName.empty()) {
385  os << "\"site_name\":\"" << siteName << "\", ";
386  }
387  // edm::getReleaseVersion() returns a string that includes quotation
388  // marks, therefore they are not added here
389  os << "\"cmssw_version\":" << edm::getReleaseVersion() << ", ";
390  if (usedFallback) {
391  os << "\"fallback\": true, ";
392  } else {
393  os << "\"fallback\": false, ";
394  }
395  os << "\"read_type\": ";
396  switch (fileinfo.m_type) {
398  os << "\"primary\", ";
399  break;
400  }
402  os << "\"secondary\", ";
403  break;
404  }
406  os << "\"embedded\", ";
407  break;
408  }
409  }
410  auto serverhost = fileinfo.m_serverhost;
411  auto serverdomain = fileinfo.m_serverdomain;
412 
413  os << "\"user_dn\":\"" << m_userdn << "\", ";
414  os << "\"client_host\":\"" << m_clienthost << "\", ";
415  os << "\"client_domain\":\"" << m_clientdomain << "\", ";
416  os << "\"server_host\":\"" << serverhost << "\", ";
417  os << "\"server_domain\":\"" << serverdomain << "\", ";
418  os << "\"unique_id\":\"" << m_guid << "-" << fileinfo.m_id << "\", ";
419  os << "\"file_lfn\":\"" << fileinfo.m_filelfn << "\", ";
420  // Dashboard devs requested that we send out no app_info if a job ID
421  // is not present in the environment.
422  const char *jobId = getJobID();
423  if (jobId) {
424  os << "\"app_info\":\"" << jobId << "\", ";
425  }
426 
427  if (fileinfo.m_size >= 0) {
428  os << "\"file_size\":" << fileinfo.m_size << ", ";
429  }
430 
431  m_filestats.fillUDP(os);
432 
433  os << "}";
434  udpinfo = os.str();
435 }
436 
437 /*
438  * Pull the X509 user subject from the environment.
439  * Based on initial code from the Frontier client:
440  * http://cdcvs.fnal.gov/cgi-bin/public-cvs/cvsweb-public.cgi/~checkout~/frontier/client/frontier.c?rev=1.57&content-type=text/plain
441  * This was further extended by walking up the returned chain similar to the Globus function
442  * globus_gsi_cert_utils-6.6/library/globus_gsi_cert_utils.c:globus_gsi_cert_utils_get_eec
443  * globus_gsi_credential-3.5/library/globus_gsi_credential.c:globus_gsi_cred_read_proxy_bio
444  */
445 
446 /*
447  * Given a stack of x509 proxies, take a guess at the EEC.
448  * Assumes the proxies are in reverse sorted order and looks for the first
449  * proxy which is not a substring of the prior proxy.
450  * THIS DOES NOT VERIFY THE RESULTS, and is a best-effort GUESS.
451  * Again, DO NOT REUSE THIS CODE THINKING IT VERIFIES THE CHAIN!
452  */
453 static X509 *findEEC(STACK_OF(X509) * certstack) {
454  int depth = sk_X509_num(certstack);
455  if (depth == 0) {
456  return nullptr;
457  }
458  int idx = depth - 1;
459  char *priorsubject = nullptr;
460  char *subject = nullptr;
461  X509 *x509cert = sk_X509_value(certstack, idx);
462  for (; x509cert && idx > 0; idx--) {
463  subject = X509_NAME_oneline(X509_get_subject_name(x509cert), nullptr, 0);
464  if (subject && priorsubject && (strncmp(subject, priorsubject, strlen(subject)) != 0)) {
465  break;
466  }
467  x509cert = sk_X509_value(certstack, idx);
468  if (subject) {
469  OPENSSL_free(subject);
470  subject = nullptr;
471  }
472  }
473  if (subject) {
474  OPENSSL_free(subject);
475  subject = nullptr;
476  }
477  return x509cert;
478 }
479 
481  BIO *biof = nullptr;
482  STACK_OF(X509) *certs = nullptr;
483  char *subject = nullptr;
484  unsigned char *data = nullptr;
485  char *header = nullptr;
486  char *name = nullptr;
487  long len = 0U;
488 
489  if ((biof = BIO_new_file(filename.c_str(), "r"))) {
490  certs = sk_X509_new_null();
491  bool encountered_error = false;
492  while ((!encountered_error) && (!BIO_eof(biof)) && PEM_read_bio(biof, &name, &header, &data, &len)) {
493  if (strcmp(name, PEM_STRING_X509) == 0 || strcmp(name, PEM_STRING_X509_OLD) == 0) {
494  X509 *tmp_cert = nullptr;
495  // See WARNINGS section in http://www.openssl.org/docs/crypto/d2i_X509.html
496  // Without this cmsRun crashes on a mac with a valid grid proxy.
497  const unsigned char *p;
498  p = data;
499  tmp_cert = d2i_X509(&tmp_cert, &p, len);
500  if (tmp_cert) {
501  sk_X509_push(certs, tmp_cert);
502  } else {
503  encountered_error = true;
504  }
505  } // Note we ignore any proxy key in the file.
506  if (data) {
507  OPENSSL_free(data);
508  data = nullptr;
509  }
510  if (header) {
511  OPENSSL_free(header);
512  header = nullptr;
513  }
514  if (name) {
515  OPENSSL_free(name);
516  name = nullptr;
517  }
518  }
519  X509 *x509cert = nullptr;
520  if (!encountered_error && sk_X509_num(certs)) {
521  x509cert = findEEC(certs);
522  }
523  if (x509cert) {
524  subject = X509_NAME_oneline(X509_get_subject_name(x509cert), nullptr, 0);
525  }
526  // Note we do not free x509cert directly, as it's still owned by the certs stack.
527  if (certs) {
528  sk_X509_pop_free(certs, X509_free);
529  x509cert = nullptr;
530  }
531  BIO_free(biof);
532  if (subject) {
533  result = subject;
534  OPENSSL_free(subject);
535  return true;
536  }
537  }
538  return false;
539 }
540 
542  char *filename = std::getenv("X509_USER_PROXY");
544  return true;
545  }
546  std::stringstream ss;
547  ss << "/tmp/x509up_u" << geteuid();
548  return getX509SubjectFromFile(ss.str(), result);
549 }
size
Write out results.
Log< level::System, false > LogSystem
InputType
Definition: InputType.h:5
static const TGPicture * info(bool iBackgroundIsBlack)
static StorageClassToken tokenForStorageClassName(std::string const &iName)
Definition: X509.py:1
void setSize(const std::string &urlOrLfn, size_t size)
oneapi::tbb::concurrent_unordered_map< int, OperationStats > StorageStats
#define OUTPUT_STATISTIC(x)
void fillUDP(const std::string &site, const FileInfo &fileinfo, bool, std::string &) const
static constexpr char const *const JOB_UNIQUE_ID_ENV_V2
Guid const & processGUID()
Definition: processGUID.cc:4
assert(be >=bs)
std::string const * matchedLfn(std::string const &iURL)
void watchPostCloseFile(PostCloseFile::slot_type const &iSlot)
T sqrt(T t)
Definition: SSEVec.h:19
std::string toString(const char *format,...)
Definition: xdaq_compat.cc:4
Abs< T >::type abs(const T &t)
Definition: Abs.h:22
static constexpr char const *const JOB_UNIQUE_ID_ENV
oneapi::tbb::concurrent_unordered_map< std::string, FileInfo > m_lfnToFileInfo
void openingFile(std::string const &lfn, edm::InputType type, size_t size=-1)
std::string getReleaseVersion()
Log< level::Info, false > LogInfo
void setCurrentServer(const std::string &urlOrLfn, const std::string &servername)
static X509 * findEEC(STACK_OF(X509) *certstack)
void closedFile(std::string const &lfn, bool usedFallback)
static const StorageStats & summary(void)
virtual std::string const & siteName(void) const =0
oneapi::tbb::concurrent_unordered_map< std::string, std::string > m_urlToLfn
HLT enums.
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:79
#define HOST_NAME_MAX
FileInfo(std::string const &iLFN, edm::InputType)
results
Definition: mysort.py:8
void filePostCloseEvent(std::string const &lfn)
bool isAvailable() const
Definition: Service.h:40
StatisticsSenderService(edm::ParameterSet const &pset, edm::ActivityRegistry &ar)
Log< level::Warning, false > LogWarning
virtual struct addrinfo const * statisticsDestination() const =0
static bool getX509SubjectFromFile(const std::string &filename, std::string &result)
virtual std::set< std::string > const * statisticsInfo() const =0
def move(src, dest)
Definition: eostools.py:511