CMS 3D CMS Logo

StatisticsSenderService.cc
Go to the documentation of this file.
1 
8 
9 #include <string>
10 #include <cmath>
11 
12 #include <unistd.h>
13 #include <fcntl.h>
14 
15 #include <openssl/x509.h>
16 #include <openssl/pem.h>
17 
18 #define UPDATE_STATISTIC(x) m_##x = x;
19 
20 #define UPDATE_AND_OUTPUT_STATISTIC(x) \
21  os << "\"" #x "\":" << (x - m_##x) << ", "; \
22  UPDATE_STATISTIC(x)
23 
24 // Simple hack to define HOST_NAME_MAX on Mac.
25 // Allows arrays to be statically allocated
26 #ifndef HOST_NAME_MAX
27 #define HOST_NAME_MAX 128
28 #endif
29 
30 #define JOB_UNIQUE_ID_ENV "CRAB_UNIQUE_JOB_ID"
31 #define JOB_UNIQUE_ID_ENV_V2 "DashboardJobId"
32 
33 using namespace edm::storage;
34 
36  : m_read_single_operations(0),
37  m_read_single_bytes(0),
38  m_read_single_square(0),
39  m_read_vector_operations(0),
40  m_read_vector_bytes(0),
41  m_read_vector_square(0),
42  m_read_vector_count_sum(0),
43  m_read_vector_count_square(0),
44  m_start_time(time(nullptr)) {}
45 
48  ssize_t read_single_operations = 0;
49  ssize_t read_single_bytes = 0;
50  ssize_t read_single_square = 0;
51  ssize_t read_vector_operations = 0;
52  ssize_t read_vector_bytes = 0;
53  ssize_t read_vector_square = 0;
54  ssize_t read_vector_count_sum = 0;
55  ssize_t read_vector_count_square = 0;
56  auto token = StorageAccount::tokenForStorageClassName("tstoragefile");
57  for (StorageAccount::StorageStats::const_iterator i = stats.begin(); i != stats.end(); ++i) {
58  if (i->first == token.value()) {
59  continue;
60  }
61  for (StorageAccount::OperationStats::const_iterator j = i->second.begin(); j != i->second.end(); ++j) {
62  if (j->first == static_cast<int>(StorageAccount::Operation::readv)) {
63  read_vector_operations += j->second.attempts;
64  read_vector_bytes += j->second.amount;
65  read_vector_count_square += j->second.vector_square;
66  read_vector_square += j->second.amount_square;
67  read_vector_count_sum += j->second.vector_count;
68  } else if (j->first == static_cast<int>(StorageAccount::Operation::read)) {
69  read_single_operations += j->second.attempts;
70  read_single_bytes += j->second.amount;
71  read_single_square += j->second.amount_square;
72  }
73  }
74  }
75  int64_t single_op_count = read_single_operations - m_read_single_operations;
76  if (single_op_count > 0) {
77  double single_sum = read_single_bytes - m_read_single_bytes;
78  double single_average = single_sum / static_cast<double>(single_op_count);
79  os << "\"read_single_sigma\":"
80  << sqrt((static_cast<double>(read_single_square - m_read_single_square) -
81  single_average * single_average * single_op_count) /
82  static_cast<double>(single_op_count))
83  << ", ";
84  os << "\"read_single_average\":" << single_average << ", ";
85  }
86  m_read_single_square = read_single_square;
87  int64_t vector_op_count = read_vector_operations - m_read_vector_operations;
88  if (vector_op_count > 0) {
89  double vector_average =
90  static_cast<double>(read_vector_bytes - m_read_vector_bytes) / static_cast<double>(vector_op_count);
91  os << "\"read_vector_average\":" << vector_average << ", ";
92  os << "\"read_vector_sigma\":"
93  << sqrt((static_cast<double>(read_vector_square - m_read_vector_square) -
94  vector_average * vector_average * vector_op_count) /
95  static_cast<double>(vector_op_count))
96  << ", ";
97  double vector_count_average =
98  static_cast<double>(read_vector_count_sum - m_read_vector_count_sum) / static_cast<double>(vector_op_count);
99  os << "\"read_vector_count_average\":" << vector_count_average << ", ";
100  os << "\"read_vector_count_sigma\":"
101  << sqrt((static_cast<double>(read_vector_count_square - m_read_vector_count_square) -
102  vector_count_average * vector_count_average * vector_op_count) /
103  static_cast<double>(vector_op_count))
104  << ", ";
105  }
106  m_read_vector_square = read_vector_square;
107  m_read_vector_count_square = read_vector_count_square;
108  m_read_vector_count_sum = read_vector_count_sum;
109 
110  os << "\"read_bytes\":" << (read_vector_bytes + read_single_bytes - m_read_vector_bytes - m_read_single_bytes)
111  << ", ";
112  os << "\"read_bytes_at_close\":"
113  << (read_vector_bytes + read_single_bytes - m_read_vector_bytes - m_read_single_bytes) << ", ";
114 
115  // See top of file for macros; not complex, just avoiding copy/paste
116  UPDATE_AND_OUTPUT_STATISTIC(read_single_operations)
117  UPDATE_AND_OUTPUT_STATISTIC(read_single_bytes)
118  UPDATE_AND_OUTPUT_STATISTIC(read_vector_operations)
119  UPDATE_AND_OUTPUT_STATISTIC(read_vector_bytes)
120 
121  os << "\"start_time\":" << m_start_time << ", ";
122  m_start_time = time(nullptr);
123  // NOTE: last entry doesn't have the trailing comma.
124  os << "\"end_time\":" << m_start_time;
125 }
126 
128  : m_clienthost("unknown"),
129  m_clientdomain("unknown"),
130  m_serverhost("unknown"),
131  m_serverdomain("unknown"),
132  m_filelfn("unknown"),
133  m_filestats(),
134  m_guid(Guid().toString()),
135  m_counter(0),
136  m_size(-1),
137  m_userdn("unknown") {
140  if (!getX509Subject(m_userdn)) {
141  m_userdn = "unknown";
142  }
143 }
144 
146  const char *id = std::getenv(JOB_UNIQUE_ID_ENV);
147  // Dashboard developers requested that we migrate to this environment variable.
148  return id ? id : std::getenv(JOB_UNIQUE_ID_ENV_V2);
149 }
150 
152  size_t dot_pos = servername.find('.');
153  std::string serverhost;
154  std::string serverdomain;
155  if (dot_pos == std::string::npos) {
156  serverhost = servername.substr(0, servername.find(':'));
157  serverdomain = "unknown";
158  } else {
159  serverhost = servername.substr(0, dot_pos);
160  serverdomain = servername.substr(dot_pos + 1, servername.find(':') - dot_pos - 1);
161  if (serverdomain.empty()) {
162  serverdomain = "unknown";
163  }
164  }
165  {
166  std::lock_guard<std::mutex> sentry(m_servermutex);
167  m_serverhost = std::move(serverhost);
168  m_serverdomain = std::move(serverdomain);
169  }
170 }
171 
173 
174 void StatisticsSenderService::filePreCloseEvent(std::string const &lfn, bool usedFallback) {
175  m_filelfn = lfn;
176 
178  if (!pSLC.isAvailable()) {
179  return;
180  }
181 
182  const struct addrinfo *addresses = pSLC->statisticsDestination();
183  if (!addresses) {
184  return;
185  }
186 
187  std::set<std::string> const *info = pSLC->statisticsInfo();
188  if (info && !info->empty() && (m_userdn != "unknown") &&
189  ((info->find("dn") == info->end()) || (info->find("nodn") != info->end()))) {
190  m_userdn = "not reported";
191  }
192 
194  fillUDP(pSLC->siteName(), usedFallback, results);
195 
196  for (const struct addrinfo *address = addresses; address != nullptr; address = address->ai_next) {
197  int sock = socket(address->ai_family, address->ai_socktype, address->ai_protocol);
198  if (sock < 0) {
199  continue;
200  }
201  auto close_del = [](int *iSocket) { close(*iSocket); };
202  std::unique_ptr<int, decltype(close_del)> guard(&sock, close_del);
203  if (sendto(sock, results.c_str(), results.size(), 0, address->ai_addr, address->ai_addrlen) >= 0) {
204  break;
205  }
206  }
207 
208  m_counter++;
209 }
210 
212  char tmpName[HOST_NAME_MAX];
213  if (gethostname(tmpName, HOST_NAME_MAX) != 0) {
214  // Sigh, no way to log errors from here.
215  m_clienthost = "unknown";
216  } else {
217  m_clienthost = tmpName;
218  }
219  size_t dot_pos = m_clienthost.find('.');
220  if (dot_pos == std::string::npos) {
221  m_clientdomain = "unknown";
222  } else {
223  m_clientdomain = m_clienthost.substr(dot_pos + 1, m_clienthost.size() - dot_pos - 1);
224  m_clienthost = m_clienthost.substr(0, dot_pos);
225  }
226 }
227 
228 void StatisticsSenderService::fillUDP(const std::string &siteName, bool usedFallback, std::string &udpinfo) {
229  std::ostringstream os;
230 
231  // Header - same for all IO accesses
232  os << "{";
233  if (!siteName.empty()) {
234  os << "\"site_name\":\"" << siteName << "\", ";
235  }
236  if (usedFallback) {
237  os << "\"fallback\": true, ";
238  }
239  std::string serverhost;
240  std::string serverdomain;
241  {
242  std::lock_guard<std::mutex> sentry(m_servermutex);
243  serverhost = m_serverhost;
244  serverdomain = m_serverdomain;
245  }
246 
247  os << "\"user_dn\":\"" << m_userdn << "\", ";
248  os << "\"client_host\":\"" << m_clienthost << "\", ";
249  os << "\"client_domain\":\"" << m_clientdomain << "\", ";
250  os << "\"server_host\":\"" << serverhost << "\", ";
251  os << "\"server_domain\":\"" << serverdomain << "\", ";
252  os << "\"unique_id\":\"" << m_guid << "-" << m_counter << "\", ";
253  os << "\"file_lfn\":\"" << m_filelfn << "\", ";
254  // Dashboard devs requested that we send out no app_info if a job ID
255  // is not present in the environment.
256  const char *jobId = getJobID();
257  if (jobId) {
258  os << "\"app_info\":\"" << jobId << "\", ";
259  }
260 
261  if (m_size >= 0) {
262  os << "\"file_size\":" << m_size << ", ";
263  }
264 
265  m_filestats.fillUDP(os);
266 
267  os << "}";
268  udpinfo = os.str();
269 }
270 
271 /*
272  * Pull the X509 user subject from the environment.
273  * Based on initial code from the Frontier client:
274  * http://cdcvs.fnal.gov/cgi-bin/public-cvs/cvsweb-public.cgi/~checkout~/frontier/client/frontier.c?rev=1.57&content-type=text/plain
275  * This was further extended by walking up the returned chain similar to the Globus function
276  * globus_gsi_cert_utils-6.6/library/globus_gsi_cert_utils.c:globus_gsi_cert_utils_get_eec
277  * globus_gsi_credential-3.5/library/globus_gsi_credential.c:globus_gsi_cred_read_proxy_bio
278  */
279 
280 /*
281  * Given a stack of x509 proxies, take a guess at the EEC.
282  * Assumes the proxies are in reverse sorted order and looks for the first
283  * proxy which is not a substring of the prior proxy.
284  * THIS DOES NOT VERIFY THE RESULTS, and is a best-effort GUESS.
285  * Again, DO NOT REUSE THIS CODE THINKING IT VERIFIES THE CHAIN!
286  */
287 static X509 *findEEC(STACK_OF(X509) * certstack) {
288  int depth = sk_X509_num(certstack);
289  if (depth == 0) {
290  return nullptr;
291  }
292  int idx = depth - 1;
293  char *priorsubject = nullptr;
294  char *subject = nullptr;
295  X509 *x509cert = sk_X509_value(certstack, idx);
296  for (; x509cert && idx > 0; idx--) {
297  subject = X509_NAME_oneline(X509_get_subject_name(x509cert), nullptr, 0);
298  if (subject && priorsubject && (strncmp(subject, priorsubject, strlen(subject)) != 0)) {
299  break;
300  }
301  x509cert = sk_X509_value(certstack, idx);
302  if (subject) {
303  OPENSSL_free(subject);
304  subject = nullptr;
305  }
306  }
307  if (subject) {
308  OPENSSL_free(subject);
309  subject = nullptr;
310  }
311  return x509cert;
312 }
313 
315  BIO *biof = nullptr;
316  STACK_OF(X509) *certs = nullptr;
317  char *subject = nullptr;
318  unsigned char *data = nullptr;
319  char *header = nullptr;
320  char *name = nullptr;
321  long len = 0U;
322 
323  if ((biof = BIO_new_file(filename.c_str(), "r"))) {
324  certs = sk_X509_new_null();
325  bool encountered_error = false;
326  while ((!encountered_error) && (!BIO_eof(biof)) && PEM_read_bio(biof, &name, &header, &data, &len)) {
327  if (strcmp(name, PEM_STRING_X509) == 0 || strcmp(name, PEM_STRING_X509_OLD) == 0) {
328  X509 *tmp_cert = nullptr;
329  // See WARNINGS section in http://www.openssl.org/docs/crypto/d2i_X509.html
330  // Without this cmsRun crashes on a mac with a valid grid proxy.
331  const unsigned char *p;
332  p = data;
333  tmp_cert = d2i_X509(&tmp_cert, &p, len);
334  if (tmp_cert) {
335  sk_X509_push(certs, tmp_cert);
336  } else {
337  encountered_error = true;
338  }
339  } // Note we ignore any proxy key in the file.
340  if (data) {
341  OPENSSL_free(data);
342  data = nullptr;
343  }
344  if (header) {
345  OPENSSL_free(header);
346  header = nullptr;
347  }
348  if (name) {
349  OPENSSL_free(name);
350  name = nullptr;
351  }
352  }
353  X509 *x509cert = nullptr;
354  if (!encountered_error && sk_X509_num(certs)) {
355  x509cert = findEEC(certs);
356  }
357  if (x509cert) {
358  subject = X509_NAME_oneline(X509_get_subject_name(x509cert), nullptr, 0);
359  }
360  // Note we do not free x509cert directly, as it's still owned by the certs stack.
361  if (certs) {
362  sk_X509_pop_free(certs, X509_free);
363  x509cert = nullptr;
364  }
365  BIO_free(biof);
366  if (subject) {
367  result = subject;
368  OPENSSL_free(subject);
369  return true;
370  }
371  }
372  return false;
373 }
374 
376  char *filename = std::getenv("X509_USER_PROXY");
378  return true;
379  }
380  std::stringstream ss;
381  ss << "/tmp/x509up_u" << geteuid();
382  return getX509SubjectFromFile(ss.str(), result);
383 }
StorageAccount::StorageStats
tbb::concurrent_unordered_map< int, OperationStats > StorageStats
Definition: StorageAccount.h:119
mps_fire.i
i
Definition: mps_fire.py:428
edm::storage::StatisticsSenderService::m_filestats
FileStatistics m_filestats
Definition: StatisticsSenderService.h:52
edm::storage
Definition: StatisticsSenderService.h:15
edm::storage::StatisticsSenderService::m_serverhost
std::string m_serverhost
Definition: StatisticsSenderService.h:49
edm::storage::StatisticsSenderService::m_counter
size_t m_counter
Definition: StatisticsSenderService.h:54
X509
Definition: X509.py:1
edm::storage::StatisticsSenderService::m_servermutex
std::mutex m_servermutex
Definition: StatisticsSenderService.h:57
AlCaHLTBitMon_ParallelJobs.p
p
Definition: AlCaHLTBitMon_ParallelJobs.py:153
edm::storage::StatisticsSenderService::m_size
std::atomic< ssize_t > m_size
Definition: StatisticsSenderService.h:55
edm::storage::StatisticsSenderService::filePreCloseEvent
void filePreCloseEvent(std::string const &lfn, bool usedFallback)
Definition: StatisticsSenderService.cc:174
getX509SubjectFromFile
static bool getX509SubjectFromFile(const std::string &filename, std::string &result)
Definition: StatisticsSenderService.cc:314
protons_cff.time
time
Definition: protons_cff.py:39
bookConverter.results
results
Definition: bookConverter.py:144
info
static const TGPicture * info(bool iBackgroundIsBlack)
Definition: FWCollectionSummaryWidget.cc:153
edm::SiteLocalConfig::statisticsDestination
virtual struct addrinfo const * statisticsDestination() const =0
edm::SiteLocalConfig::siteName
virtual std::string const & siteName(void) const =0
edm::storage::StatisticsSenderService::getX509Subject
static bool getX509Subject(std::string &)
Definition: StatisticsSenderService.cc:375
edm::Service::isAvailable
bool isAvailable() const
Definition: Service.h:40
heavyIonCSV_trainingSettings.idx
idx
Definition: heavyIonCSV_trainingSettings.py:5
StorageAccount::Operation::readv
ActivityRegistry.h
edm::ActivityRegistry::watchPreCloseFile
void watchPreCloseFile(PreCloseFile::slot_type const &iSlot)
Definition: ActivityRegistry.h:235
edm::storage::StatisticsSenderService::m_serverdomain
std::string m_serverdomain
Definition: StatisticsSenderService.h:50
contentValuesCheck.ss
ss
Definition: contentValuesCheck.py:33
findEEC
static X509 * findEEC(STACK_OF(X509) *certstack)
Definition: StatisticsSenderService.cc:287
toolbox::toString
std::string toString(const char *format,...)
Definition: xdaq_compat.cc:4
edm::storage::StatisticsSenderService::FileStatistics::FileStatistics
FileStatistics()
Definition: StatisticsSenderService.cc:35
Service.h
edm::storage::StatisticsSenderService::m_guid
std::string m_guid
Definition: StatisticsSenderService.h:53
mathSSE::sqrt
T sqrt(T t)
Definition: SSEVec.h:19
edm::ActivityRegistry
Definition: ActivityRegistry.h:134
StorageAccount.h
corrVsCorr.filename
filename
Definition: corrVsCorr.py:123
LEDCalibrationChannels.depth
depth
Definition: LEDCalibrationChannels.py:65
mitigatedMETSequence_cff.U
U
Definition: mitigatedMETSequence_cff.py:36
AlCaHLTBitMon_QueryRunRegistry.string
string
Definition: AlCaHLTBitMon_QueryRunRegistry.py:256
edm::ParameterSet
Definition: ParameterSet.h:47
UPDATE_AND_OUTPUT_STATISTIC
#define UPDATE_AND_OUTPUT_STATISTIC(x)
Definition: StatisticsSenderService.cc:20
edm::storage::StatisticsSenderService::determineHostnames
void determineHostnames(void)
Definition: StatisticsSenderService.cc:211
edm::Service
Definition: Service.h:30
createfilelist.int
int
Definition: createfilelist.py:10
StorageAccount::tokenForStorageClassName
static StorageClassToken tokenForStorageClassName(std::string const &iName)
Definition: StorageAccount.cc:38
StatisticsSenderService.h
edm::storage::StatisticsSenderService::fillUDP
void fillUDP(const std::string &, bool, std::string &)
Definition: StatisticsSenderService.cc:228
edm::SiteLocalConfig::statisticsInfo
virtual std::set< std::string > const * statisticsInfo() const =0
StorageAccount::summary
static const StorageStats & summary(void)
Definition: StorageAccount.cc:95
dqmMemoryStats.stats
stats
Definition: dqmMemoryStats.py:134
JOB_UNIQUE_ID_ENV_V2
#define JOB_UNIQUE_ID_ENV_V2
Definition: StatisticsSenderService.cc:31
edm::Guid
Definition: Guid.h:26
cms::cuda::device::unique_ptr
std::unique_ptr< T, impl::DeviceDeleter > unique_ptr
Definition: device_unique_ptr.h:33
edm::storage::StatisticsSenderService::setSize
void setSize(size_t size)
Definition: StatisticsSenderService.cc:172
StorageAccount::Operation::read
JOB_UNIQUE_ID_ENV
#define JOB_UNIQUE_ID_ENV
Definition: StatisticsSenderService.cc:30
edm::storage::StatisticsSenderService::FileStatistics::fillUDP
void fillUDP(std::ostringstream &os)
Definition: StatisticsSenderService.cc:46
eostools.move
def move(src, dest)
Definition: eostools.py:511
edm::storage::StatisticsSenderService::getJobID
static const char * getJobID()
Definition: StatisticsSenderService.cc:145
edm::storage::StatisticsSenderService::m_clientdomain
std::string m_clientdomain
Definition: StatisticsSenderService.h:48
triggerObjects_cff.id
id
Definition: triggerObjects_cff.py:29
edm::storage::StatisticsSenderService::StatisticsSenderService
StatisticsSenderService(edm::ParameterSet const &pset, edm::ActivityRegistry &ar)
Definition: StatisticsSenderService.cc:127
Skims_PA_cff.name
name
Definition: Skims_PA_cff.py:17
data
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:79
edm::storage::StatisticsSenderService::m_userdn
std::string m_userdn
Definition: StatisticsSenderService.h:56
RecoTauValidation_cfi.header
header
Definition: RecoTauValidation_cfi.py:292
HOST_NAME_MAX
#define HOST_NAME_MAX
Definition: StatisticsSenderService.cc:27
mps_fire.result
result
Definition: mps_fire.py:311
edm::storage::StatisticsSenderService::m_filelfn
std::string m_filelfn
Definition: StatisticsSenderService.h:51
Guid.h
dqmiolumiharvest.j
j
Definition: dqmiolumiharvest.py:66
edm::storage::StatisticsSenderService::m_clienthost
std::string m_clienthost
Definition: StatisticsSenderService.h:47
edm::storage::StatisticsSenderService::setCurrentServer
void setCurrentServer(const std::string &servername)
Definition: StatisticsSenderService.cc:151
SiteLocalConfig.h
findQualityFiles.size
size
Write out results.
Definition: findQualityFiles.py:443
unpackBuffers-CaloStage2.token
token
Definition: unpackBuffers-CaloStage2.py:316