CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
StatisticsSenderService.cc
Go to the documentation of this file.
1 
8 
9 #include <string>
10 
11 #include <unistd.h>
12 #include <fcntl.h>
13 #include <string.h>
14 
15 #include <openssl/x509.h>
16 #include <openssl/pem.h>
17 
18 #define UPDATE_STATISTIC(x) \
19  m_ ## x = x;
20 
21 #define UPDATE_AND_OUTPUT_STATISTIC(x) \
22  os << "\"" #x "\":" << (x-m_ ## x) << ", "; \
23  UPDATE_STATISTIC(x)
24 
25 // Simple hack to define HOST_NAME_MAX on Mac.
26 // Allows arrays to be statically allocated
27 #ifndef HOST_NAME_MAX
28 #define HOST_NAME_MAX 128
29 #endif
30 
31 #define JOB_UNIQUE_ID_ENV "CRAB_UNIQUE_JOB_ID"
32 #define JOB_UNIQUE_ID_ENV_V2 "DashboardJobId"
33 
34 using namespace edm::storage;
35 
37  m_read_single_operations(0),
38  m_read_single_bytes(0),
39  m_read_single_square(0),
40  m_read_vector_operations(0),
41  m_read_vector_bytes(0),
42  m_read_vector_square(0),
43  m_read_vector_count_sum(0),
44  m_read_vector_count_square(0),
45  m_start_time(time(NULL))
46 {}
47 
48 void
51  ssize_t read_single_operations = 0;
52  ssize_t read_single_bytes = 0;
53  ssize_t read_single_square = 0;
54  ssize_t read_vector_operations = 0;
55  ssize_t read_vector_bytes = 0;
56  ssize_t read_vector_square = 0;
57  ssize_t read_vector_count_sum = 0;
58  ssize_t read_vector_count_square = 0;
59  auto token = StorageAccount::tokenForStorageClassName("tstoragefile");
60  for (StorageAccount::StorageStats::const_iterator i = stats.begin (); i != stats.end(); ++i) {
61  if (i->first == token.value()) {
62  continue;
63  }
64  for (StorageAccount::OperationStats::const_iterator j = i->second.begin(); j != i->second.end(); ++j) {
65  if (j->first == static_cast<int>(StorageAccount::Operation::readv)) {
66  read_vector_operations += j->second.attempts;
67  read_vector_bytes += j->second.amount;
68  read_vector_count_square += j->second.vector_square;
69  read_vector_square += j->second.amount_square;
70  read_vector_count_sum += j->second.vector_count;
71  } else if (j->first == static_cast<int>(StorageAccount::Operation::read)) {
72  read_single_operations += j->second.attempts;
73  read_single_bytes += j->second.amount;
74  read_single_square += j->second.amount_square;
75  }
76  }
77  }
78  int64_t single_op_count = read_single_operations - m_read_single_operations;
79  if (single_op_count > 0) {
80  double single_sum = read_single_bytes-m_read_single_bytes;
81  double single_average = single_sum/static_cast<double>(single_op_count);
82  os << "\"read_single_sigma\":" << sqrt((static_cast<double>(read_single_square-m_read_single_square) - single_average*single_average*single_op_count)/static_cast<double>(single_op_count)) << ", ";
83  os << "\"read_single_average\":" << single_average << ", ";
84  }
85  m_read_single_square = read_single_square;
86  int64_t vector_op_count = read_vector_operations - m_read_vector_operations;
87  if (vector_op_count > 0) {
88  double vector_average = static_cast<double>(read_vector_bytes-m_read_vector_bytes)/static_cast<double>(vector_op_count);
89  os << "\"read_vector_average\":" << vector_average << ", ";
90  os << "\"read_vector_sigma\":" << sqrt((static_cast<double>(read_vector_square-m_read_vector_square) - vector_average*vector_average*vector_op_count)/static_cast<double>(vector_op_count)) << ", ";
91  double vector_count_average = static_cast<double>(read_vector_count_sum-m_read_vector_count_sum)/static_cast<double>(vector_op_count);
92  os << "\"read_vector_count_average\":" << vector_count_average << ", ";
93  os << "\"read_vector_count_sigma\":" << sqrt((static_cast<double>(read_vector_count_square-m_read_vector_count_square) - vector_count_average*vector_count_average*vector_op_count)/static_cast<double>(vector_op_count)) << ", ";
94  }
95  m_read_vector_square = read_vector_square;
96  m_read_vector_count_square = read_vector_count_square;
97  m_read_vector_count_sum = read_vector_count_sum;
98 
99  os << "\"read_bytes\":" << (read_vector_bytes + read_single_bytes - m_read_vector_bytes - m_read_single_bytes) << ", ";
100  os << "\"read_bytes_at_close\":" << (read_vector_bytes + read_single_bytes - m_read_vector_bytes - m_read_single_bytes) << ", ";
101 
102  // See top of file for macros; not complex, just avoiding copy/paste
103  UPDATE_AND_OUTPUT_STATISTIC(read_single_operations)
104  UPDATE_AND_OUTPUT_STATISTIC(read_single_bytes)
105  UPDATE_AND_OUTPUT_STATISTIC(read_vector_operations)
106  UPDATE_AND_OUTPUT_STATISTIC(read_vector_bytes)
107 
108  os << "\"start_time\":" << m_start_time << ", ";
109  m_start_time = time(NULL);
110  // NOTE: last entry doesn't have the trailing comma.
111  os << "\"end_time\":" << m_start_time;
112 }
113 
115  m_clienthost("unknown"),
116  m_clientdomain("unknown"),
117  m_serverhost("unknown"),
118  m_serverdomain("unknown"),
119  m_filelfn("unknown"),
120  m_filestats(),
121  m_guid(Guid().toString()),
122  m_counter(0),
123  m_size(-1),
124  m_userdn("unknown")
125 {
128  if (!getX509Subject(m_userdn)) {
129  m_userdn = "unknown";
130  }
131 }
132 
133 const char *
135  const char * id = getenv(JOB_UNIQUE_ID_ENV);
136  // Dashboard developers requested that we migrate to this environment variable.
137  return id ? id : getenv(JOB_UNIQUE_ID_ENV_V2);
138 }
139 
140 void
142  size_t dot_pos = servername.find(".");
143  std::string serverhost;
144  std::string serverdomain;
145  if (dot_pos == std::string::npos) {
146  serverhost = servername.substr(0, servername.find(":"));
147  serverdomain = "unknown";
148  } else {
149  serverhost = servername.substr(0, dot_pos);
150  serverdomain = servername.substr(dot_pos+1, servername.find(":")-dot_pos-1);
151  if (serverdomain.empty()) {
152  serverdomain = "unknown";
153  }
154  }
155  {
156  std::lock_guard<std::mutex> sentry(m_servermutex);
157  m_serverhost = std::move(serverhost);
158  m_serverdomain = std::move(serverdomain);
159  }
160 }
161 
162 void
164  m_size = size;
165 }
166 
167 void
169  m_filelfn = lfn;
170 
172  if (!pSLC.isAvailable()) {
173  return;
174  }
175 
176  const struct addrinfo * addresses = pSLC->statisticsDestination();
177  if (!addresses) {
178  return;
179  }
180 
181  std::set<std::string> const * info = pSLC->statisticsInfo();
182  if (info && info->size() && (m_userdn != "unknown") && (
183  (info->find("dn") == info->end()) ||
184  (info->find("nodn") != info->end()))
185  )
186  {
187  m_userdn = "not reported";
188  }
189 
191  fillUDP(pSLC->siteName(), usedFallback, results);
192 
193  for (const struct addrinfo *address = addresses; address != nullptr; address = address->ai_next) {
194  int sock = socket(address->ai_family, address->ai_socktype, address->ai_protocol);
195  if (sock < 0) {
196  continue;
197  }
198  if (sendto(sock, results.c_str(), results.size(), 0, address->ai_addr, address->ai_addrlen) >= 0) {
199  break;
200  }
201  }
202 
203  m_counter++;
204 }
205 
206 void
208  char tmpName[HOST_NAME_MAX];
209  if (gethostname(tmpName, HOST_NAME_MAX) != 0) {
210  // Sigh, no way to log errors from here.
211  m_clienthost = "unknown";
212  } else {
213  m_clienthost = tmpName;
214  }
215  size_t dot_pos = m_clienthost.find(".");
216  if (dot_pos == std::string::npos) {
217  m_clientdomain = "unknown";
218  } else {
219  m_clientdomain = m_clienthost.substr(dot_pos+1, m_clienthost.size()-dot_pos-1);
220  m_clienthost = m_clienthost.substr(0, dot_pos);
221  }
222 }
223 
224 void
225 StatisticsSenderService::fillUDP(const std::string& siteName, bool usedFallback, std::string &udpinfo) {
226  std::ostringstream os;
227 
228  // Header - same for all IO accesses
229  os << "{";
230  if (!siteName.empty()) {
231  os << "\"site_name\":\"" << siteName << "\", ";
232  }
233  if (usedFallback) {
234  os << "\"fallback\": true, ";
235  }
236  std::string serverhost;
237  std::string serverdomain;
238  {
239  std::lock_guard<std::mutex> sentry(m_servermutex);
240  serverhost = m_serverhost;
241  serverdomain = m_serverdomain;
242  }
243 
244  os << "\"user_dn\":\"" << m_userdn << "\", ";
245  os << "\"client_host\":\"" << m_clienthost << "\", ";
246  os << "\"client_domain\":\"" << m_clientdomain << "\", ";
247  os << "\"server_host\":\"" << serverhost << "\", ";
248  os << "\"server_domain\":\"" << serverdomain << "\", ";
249  os << "\"unique_id\":\"" << m_guid << "-" << m_counter << "\", ";
250  os << "\"file_lfn\":\"" << m_filelfn << "\", ";
251  // Dashboard devs requested that we send out no app_info if a job ID
252  // is not present in the environment.
253  const char * jobId = getJobID();
254  if (jobId) {
255  os << "\"app_info\":\"" << jobId << "\", ";
256  }
257 
258  if (m_size >= 0) {
259  os << "\"file_size\":" << m_size << ", ";
260  }
261 
262  m_filestats.fillUDP(os);
263 
264  os << "}";
265  udpinfo = os.str();
266 }
267 
268 /*
269  * Pull the X509 user subject from the environment.
270  * Based on initial code from the Frontier client:
271  * http://cdcvs.fnal.gov/cgi-bin/public-cvs/cvsweb-public.cgi/~checkout~/frontier/client/frontier.c?rev=1.57&content-type=text/plain
272  * This was further extended by walking up the returned chain similar to the Globus function
273  * globus_gsi_cert_utils-6.6/library/globus_gsi_cert_utils.c:globus_gsi_cert_utils_get_eec
274  * globus_gsi_credential-3.5/library/globus_gsi_credential.c:globus_gsi_cred_read_proxy_bio
275  */
276 
277 /*
278  * Given a stack of x509 proxies, take a guess at the EEC.
279  * Assumes the proxies are in reverse sorted order and looks for the first
280  * proxy which is not a substring of the prior proxy.
281  * THIS DOES NOT VERIFY THE RESULTS, and is a best-effort GUESS.
282  * Again, DO NOT REUSE THIS CODE THINKING IT VERIFIES THE CHAIN!
283  */
284 static X509 * findEEC(STACK_OF(X509) * certstack) {
285  int depth = sk_X509_num(certstack);
286  if (depth == 0) {
287  return nullptr;
288  }
289  int idx = depth-1;
290  char *priorsubject = nullptr;
291  char *subject = nullptr;
292  X509 *x509cert = sk_X509_value(certstack, idx);
293  for (; x509cert && idx>0; idx--) {
294  subject = X509_NAME_oneline(X509_get_subject_name(x509cert),0,0);
295  if (subject && priorsubject && (strncmp(subject, priorsubject, strlen(subject)) != 0)) {
296  break;
297  }
298  x509cert = sk_X509_value(certstack, idx);
299  if (subject) {
300  OPENSSL_free(subject);
301  subject = nullptr;
302  }
303  }
304  if (subject) {
305  OPENSSL_free(subject);
306  subject = nullptr;
307  }
308  return x509cert;
309 }
310 
311 static bool
313  BIO *biof = nullptr;
314  STACK_OF(X509) *certs = nullptr;
315  char *subject = nullptr;
316  unsigned char *data = nullptr;
317  char *header = nullptr;
318  char *name = nullptr;
319  long len = 0U;
320 
321  if((biof = BIO_new_file(filename.c_str(), "r"))) {
322 
323  certs = sk_X509_new_null();
324  bool encountered_error = false;
325  while ((!encountered_error) && (!BIO_eof(biof)) && PEM_read_bio(biof, &name, &header, &data, &len)) {
326  if (strcmp(name, PEM_STRING_X509) == 0 || strcmp(name, PEM_STRING_X509_OLD) == 0) {
327  X509 * tmp_cert = nullptr;
328  // See WARNINGS section in http://www.openssl.org/docs/crypto/d2i_X509.html
329  // Without this cmsRun crashes on a mac with a valid grid proxy.
330  const unsigned char *p;
331  p=data;
332  tmp_cert = d2i_X509(&tmp_cert, &p, len);
333  if (tmp_cert) {
334  sk_X509_push(certs, tmp_cert);
335  } else {
336  encountered_error = true;
337  }
338  } // Note we ignore any proxy key in the file.
339  if (data) { OPENSSL_free(data); data = nullptr;}
340  if (header) { OPENSSL_free(header); header = nullptr;}
341  if (name) { OPENSSL_free(name); name = nullptr;}
342  }
343  X509 *x509cert = nullptr;
344  if (!encountered_error && sk_X509_num(certs)) {
345  x509cert = findEEC(certs);
346  }
347  if (x509cert) {
348  subject = X509_NAME_oneline(X509_get_subject_name(x509cert),0,0);
349  }
350  // Note we do not free x509cert directly, as it's still owned by the certs stack.
351  if (certs) {
352  sk_X509_pop_free(certs, X509_free);
353  x509cert = nullptr;
354  }
355  BIO_free(biof);
356  if (subject) {
357  result = subject;
358  OPENSSL_free(subject);
359  return true;
360  }
361  }
362  return false;
363 }
364 
365 bool
367  char *filename = getenv("X509_USER_PROXY");
368  if (filename && getX509SubjectFromFile(filename, result)) {
369  return true;
370  }
371  std::stringstream ss;
372  ss << "/tmp/x509up_u" << geteuid();
373  return getX509SubjectFromFile(ss.str(), result);
374 }
void setCurrentServer(const std::string &servername)
int i
Definition: DBlmapReader.cc:9
static const TGPicture * info(bool iBackgroundIsBlack)
static const StorageStats & summary(void)
void filePreCloseEvent(std::string const &lfn, bool usedFallback)
#define NULL
Definition: scimark2.h:8
static X509 * findEEC(STACK_OF(X509)*certstack)
tuple result
Definition: mps_fire.py:95
tbb::concurrent_unordered_map< int, OperationStats > StorageStats
Definition: Guid.h:23
#define JOB_UNIQUE_ID_ENV_V2
static StorageClassToken tokenForStorageClassName(std::string const &iName)
T sqrt(T t)
Definition: SSEVec.h:18
#define JOB_UNIQUE_ID_ENV
std::string toString(const char *format,...)
Definition: xdaq_compat.cc:4
def move
Definition: eostools.py:510
bool isAvailable() const
Definition: Service.h:46
tuple results
Definition: mps_update.py:44
int j
Definition: DBlmapReader.cc:9
void fillUDP(const std::string &, bool, std::string &)
void watchPreCloseFile(PreCloseFile::slot_type const &iSlot)
tuple idx
DEBUGGING if hasattr(process,&quot;trackMonIterativeTracking2012&quot;): print &quot;trackMonIterativeTracking2012 D...
virtual std::string const & siteName(void) const =0
#define HOST_NAME_MAX
tuple filename
Definition: lut2db_cfg.py:20
StatisticsSenderService(edm::ParameterSet const &pset, edm::ActivityRegistry &ar)
virtual struct addrinfo const * statisticsDestination() const =0
static bool getX509SubjectFromFile(const std::string &filename, std::string &result)
tuple size
Write out results.
virtual std::set< std::string > const * statisticsInfo() const =0
#define UPDATE_AND_OUTPUT_STATISTIC(x)