15 #include <openssl/x509.h>
16 #include <openssl/pem.h>
18 #define UPDATE_STATISTIC(x) \
21 #define UPDATE_AND_OUTPUT_STATISTIC(x) \
22 os << "\"" #x "\":" << (x-m_ ## x) << ", "; \
28 #define HOST_NAME_MAX 128
31 #define JOB_UNIQUE_ID_ENV "CRAB_UNIQUE_JOB_ID"
32 #define JOB_UNIQUE_ID_ENV_V2 "DashboardJobId"
34 using namespace edm::storage;
37 m_read_single_operations(0),
38 m_read_single_bytes(0),
39 m_read_single_square(0),
40 m_read_vector_operations(0),
41 m_read_vector_bytes(0),
42 m_read_vector_square(0),
43 m_read_vector_count_sum(0),
44 m_read_vector_count_square(0),
51 ssize_t read_single_operations = 0;
52 ssize_t read_single_bytes = 0;
53 ssize_t read_single_square = 0;
54 ssize_t read_vector_operations = 0;
55 ssize_t read_vector_bytes = 0;
56 ssize_t read_vector_square = 0;
57 ssize_t read_vector_count_sum = 0;
58 ssize_t read_vector_count_square = 0;
60 for (StorageAccount::StorageStats::const_iterator
i = stats.begin ();
i != stats.end(); ++
i) {
61 if (
i->first == token.value()) {
64 for (StorageAccount::OperationStats::const_iterator
j =
i->second.begin();
j !=
i->second.end(); ++
j) {
66 read_vector_operations +=
j->second.attempts;
67 read_vector_bytes +=
j->second.amount;
68 read_vector_count_square +=
j->second.vector_square;
69 read_vector_square +=
j->second.amount_square;
70 read_vector_count_sum +=
j->second.vector_count;
72 read_single_operations +=
j->second.attempts;
73 read_single_bytes +=
j->second.amount;
74 read_single_square +=
j->second.amount_square;
78 int64_t single_op_count = read_single_operations - m_read_single_operations;
79 if (single_op_count > 0) {
80 double single_sum = read_single_bytes-m_read_single_bytes;
81 double single_average = single_sum/
static_cast<double>(single_op_count);
82 os <<
"\"read_single_sigma\":" <<
sqrt((static_cast<double>(read_single_square-m_read_single_square) - single_average*single_average*single_op_count)/static_cast<double>(single_op_count)) <<
", ";
83 os <<
"\"read_single_average\":" << single_average <<
", ";
85 m_read_single_square = read_single_square;
86 int64_t vector_op_count = read_vector_operations - m_read_vector_operations;
87 if (vector_op_count > 0) {
88 double vector_average =
static_cast<double>(read_vector_bytes-m_read_vector_bytes)/static_cast<double>(vector_op_count);
89 os <<
"\"read_vector_average\":" << vector_average <<
", ";
90 os <<
"\"read_vector_sigma\":" <<
sqrt((static_cast<double>(read_vector_square-m_read_vector_square) - vector_average*vector_average*vector_op_count)/static_cast<double>(vector_op_count)) <<
", ";
91 double vector_count_average =
static_cast<double>(read_vector_count_sum-m_read_vector_count_sum)/static_cast<double>(vector_op_count);
92 os <<
"\"read_vector_count_average\":" << vector_count_average <<
", ";
93 os <<
"\"read_vector_count_sigma\":" <<
sqrt((static_cast<double>(read_vector_count_square-m_read_vector_count_square) - vector_count_average*vector_count_average*vector_op_count)/static_cast<double>(vector_op_count)) <<
", ";
95 m_read_vector_square = read_vector_square;
96 m_read_vector_count_square = read_vector_count_square;
97 m_read_vector_count_sum = read_vector_count_sum;
99 os <<
"\"read_bytes\":" << (read_vector_bytes + read_single_bytes - m_read_vector_bytes - m_read_single_bytes) <<
", ";
100 os <<
"\"read_bytes_at_close\":" << (read_vector_bytes + read_single_bytes - m_read_vector_bytes - m_read_single_bytes) <<
", ";
108 os <<
"\"start_time\":" << m_start_time <<
", ";
111 os <<
"\"end_time\":" << m_start_time;
142 size_t dot_pos = servername.find(
".");
143 if (dot_pos == std::string::npos) {
144 m_serverhost = servername.substr(0, servername.find(
":"));
148 m_serverdomain = servername.substr(dot_pos+1, servername.find(
":")-dot_pos-1);
175 if (info && info->size() && (
m_userdn !=
"unknown") && (
176 (info->find(
"dn") == info->end()) ||
177 (info->find(
"nodn") != info->end()))
186 for (
const struct addrinfo *address = addresses; address !=
nullptr; address = address->ai_next) {
187 int sock = socket(address->ai_family, address->ai_socktype, address->ai_protocol);
191 if (sendto(sock, results.c_str(), results.size(), 0, address->ai_addr, address->ai_addrlen) >= 0) {
209 if (dot_pos == std::string::npos) {
219 std::ostringstream os;
223 if (!siteName.empty()) {
224 os <<
"\"site_name\":\"" << siteName <<
"\", ";
227 os <<
"\"fallback\": true, ";
229 os <<
"\"user_dn\":\"" <<
m_userdn <<
"\", ";
235 os <<
"\"file_lfn\":\"" <<
m_filelfn <<
"\", ";
240 os <<
"\"app_info\":\"" << jobId <<
"\", ";
244 os <<
"\"file_size\":" <<
m_size <<
", ";
269 static X509 *
findEEC(STACK_OF(X509) * certstack) {
270 int depth = sk_X509_num(certstack);
275 char *priorsubject =
nullptr;
276 char *subject =
nullptr;
277 X509 *x509cert = sk_X509_value(certstack, idx);
278 for (; x509cert && idx>0; idx--) {
279 subject = X509_NAME_oneline(X509_get_subject_name(x509cert),0,0);
280 if (subject && priorsubject && (strncmp(subject, priorsubject, strlen(subject)) != 0)) {
283 x509cert = sk_X509_value(certstack, idx);
285 OPENSSL_free(subject);
290 OPENSSL_free(subject);
299 STACK_OF(X509) *certs =
nullptr;
300 char *subject =
nullptr;
301 unsigned char *
data =
nullptr;
302 char *header =
nullptr;
303 char *
name =
nullptr;
306 if((biof = BIO_new_file(filename.c_str(),
"r"))) {
308 certs = sk_X509_new_null();
309 bool encountered_error =
false;
310 while ((!encountered_error) && (!BIO_eof(biof)) && PEM_read_bio(biof, &name, &header, &data, &len)) {
311 if (strcmp(name, PEM_STRING_X509) == 0 || strcmp(name, PEM_STRING_X509_OLD) == 0) {
312 X509 * tmp_cert =
nullptr;
315 const unsigned char *
p;
317 tmp_cert = d2i_X509(&tmp_cert, &p, len);
319 sk_X509_push(certs, tmp_cert);
321 encountered_error =
true;
324 if (data) { OPENSSL_free(data); data =
nullptr;}
325 if (header) { OPENSSL_free(header); header =
nullptr;}
326 if (name) { OPENSSL_free(name); name =
nullptr;}
328 X509 *x509cert =
nullptr;
329 if (!encountered_error && sk_X509_num(certs)) {
333 subject = X509_NAME_oneline(X509_get_subject_name(x509cert),0,0);
337 sk_X509_pop_free(certs, X509_free);
343 OPENSSL_free(subject);
352 char *
filename = getenv(
"X509_USER_PROXY");
356 std::stringstream
ss;
357 ss <<
"/tmp/x509up_u" << geteuid();
void setCurrentServer(const std::string &servername)
FileStatistics m_filestats
static const StorageStats & summary(void)
void filePreCloseEvent(std::string const &lfn, bool usedFallback)
static bool getX509Subject(std::string &)
static X509 * findEEC(STACK_OF(X509)*certstack)
tbb::concurrent_unordered_map< int, OperationStats > StorageStats
std::string m_serverdomain
#define JOB_UNIQUE_ID_ENV_V2
static StorageClassToken tokenForStorageClassName(std::string const &iName)
#define JOB_UNIQUE_ID_ENV
void determineHostnames(void)
void fillUDP(const std::string &, bool, std::string &)
void watchPreCloseFile(PreCloseFile::slot_type const &iSlot)
void fillUDP(std::ostringstream &os)
tuple idx
DEBUGGING if hasattr(process,"trackMonIterativeTracking2012"): print "trackMonIterativeTracking2012 D...
void setSize(size_t size)
virtual std::string const & siteName(void) const =0
char data[epos_bytes_allocation]
static const char * getJobID()
std::string m_clientdomain
StatisticsSenderService(edm::ParameterSet const &pset, edm::ActivityRegistry &ar)
virtual struct addrinfo const * statisticsDestination() const =0
static bool getX509SubjectFromFile(const std::string &filename, std::string &result)
tuple size
Write out results.
virtual std::set< std::string > const * statisticsInfo() const =0
#define UPDATE_AND_OUTPUT_STATISTIC(x)