15 #include <openssl/x509.h>
16 #include <openssl/pem.h>
18 #define UPDATE_STATISTIC(x) \
21 #define UPDATE_AND_OUTPUT_STATISTIC(x) \
22 os << "\"" #x "\":" << (x-m_ ## x) << ", "; \
28 #define HOST_NAME_MAX 128
31 #define JOB_UNIQUE_ID_ENV "CRAB_UNIQUE_JOB_ID"
32 #define JOB_UNIQUE_ID_ENV_V2 "DashboardJobId"
34 using namespace edm::storage;
37 m_read_single_operations(0),
38 m_read_single_bytes(0),
39 m_read_single_square(0),
40 m_read_vector_operations(0),
41 m_read_vector_bytes(0),
42 m_read_vector_square(0),
43 m_read_vector_count_sum(0),
44 m_read_vector_count_square(0),
45 m_read_bytes_at_close(0),
52 ssize_t read_single_operations = 0;
53 ssize_t read_single_bytes = 0;
54 ssize_t read_single_square = 0;
55 ssize_t read_vector_operations = 0;
56 ssize_t read_vector_bytes = 0;
57 ssize_t read_vector_square = 0;
58 ssize_t read_vector_count_sum = 0;
59 ssize_t read_vector_count_square = 0;
60 for (StorageAccount::StorageStats::const_iterator
i = stats.begin ();
i != stats.end(); ++
i) {
61 if (
i->first ==
"tstoragefile") {
64 for (StorageAccount::OperationStats::const_iterator
j =
i->second->begin();
j !=
i->second->end(); ++
j) {
65 if (
j->first ==
"readv") {
66 read_vector_operations +=
j->second.attempts;
67 read_vector_bytes +=
j->second.amount;
68 read_vector_count_square +=
j->second.vector_square;
69 read_vector_square +=
j->second.amount_square;
70 read_vector_count_sum +=
j->second.vector_count;
71 }
else if (
j->first ==
"read") {
72 read_single_operations +=
j->second.attempts;
73 read_single_bytes +=
j->second.amount;
74 read_single_square +=
j->second.amount_square;
78 int64_t single_op_count = read_single_operations - m_read_single_operations;
79 if (single_op_count > 0) {
80 double single_sum = read_single_bytes-m_read_single_bytes;
81 double single_average = single_sum/
static_cast<double>(single_op_count);
82 os <<
"\"read_single_sigma\":" <<
sqrt((static_cast<double>(read_single_square-m_read_single_square) - single_average*single_average*single_op_count)/static_cast<double>(single_op_count)) <<
", ";
83 os <<
"\"read_single_average\":" << single_average <<
", ";
85 m_read_single_square = read_single_square;
86 int64_t vector_op_count = read_vector_operations - m_read_vector_operations;
87 if (vector_op_count > 0) {
88 double vector_average =
static_cast<double>(read_vector_bytes-m_read_vector_bytes)/static_cast<double>(vector_op_count);
89 os <<
"\"read_vector_average\":" << vector_average <<
", ";
90 os <<
"\"read_vector_sigma\":" <<
sqrt((static_cast<double>(read_vector_square-m_read_vector_square) - vector_average*vector_average*vector_op_count)/static_cast<double>(vector_op_count)) <<
", ";
91 double vector_count_average =
static_cast<double>(read_vector_count_sum-m_read_vector_count_sum)/static_cast<double>(vector_op_count);
92 os <<
"\"read_vector_count_average\":" << vector_count_average <<
", ";
93 os <<
"\"read_vector_count_sigma\":" <<
sqrt((static_cast<double>(read_vector_count_square-m_read_vector_count_square) - vector_count_average*vector_count_average*vector_op_count)/static_cast<double>(vector_op_count)) <<
", ";
95 m_read_vector_square = read_vector_square;
96 m_read_vector_count_square = read_vector_count_square;
97 m_read_vector_count_sum = read_vector_count_sum;
99 os <<
"\"read_bytes\":" << (read_vector_bytes + read_single_bytes - m_read_vector_bytes - m_read_single_bytes) <<
", ";
100 os <<
"\"read_bytes_at_close\":" << (read_vector_bytes + read_single_bytes - m_read_vector_bytes - m_read_single_bytes) <<
", ";
108 os <<
"\"start_time\":" << m_start_time <<
", ";
111 os <<
"\"end_time\":" << m_start_time;
142 size_t dot_pos = servername.find(
".");
143 if (dot_pos == std::string::npos) {
144 m_serverhost = servername.substr(0, servername.find(
":"));
148 m_serverdomain = servername.substr(dot_pos+1, servername.find(
":")-dot_pos-1);
177 for (
const struct addrinfo *address = addresses; address !=
nullptr; address = address->ai_next) {
178 int sock = socket(address->ai_family, address->ai_socktype, address->ai_protocol);
182 if (sendto(sock, results.c_str(), results.size(), 0, address->ai_addr, address->ai_addrlen) >= 0) {
200 if (dot_pos == std::string::npos) {
210 std::ostringstream os;
214 if (!siteName.empty()) {
215 os <<
"\"site_name\":\"" << siteName <<
"\", ";
218 os <<
"\"fallback\": true, ";
220 os <<
"\"user_dn\":\"" <<
m_userdn <<
"\", ";
226 os <<
"\"file_lfn\":\"" <<
m_filelfn <<
"\", ";
231 os <<
"\"app_info\":\"" << jobId <<
"\", ";
235 os <<
"\"file_size\":" <<
m_size <<
", ";
260 static X509 *
findEEC(STACK_OF(X509) * certstack) {
261 int depth = sk_X509_num(certstack);
266 char *priorsubject =
nullptr;
267 char *subject =
nullptr;
268 X509 *x509cert = sk_X509_value(certstack, idx);
269 for (; x509cert && idx>0; idx--) {
270 subject = X509_NAME_oneline(X509_get_subject_name(x509cert),0,0);
271 if (subject && priorsubject && (strncmp(subject, priorsubject, strlen(subject)) != 0)) {
274 x509cert = sk_X509_value(certstack, idx);
276 OPENSSL_free(subject);
281 OPENSSL_free(subject);
290 STACK_OF(X509) *certs =
nullptr;
291 char *subject =
nullptr;
292 unsigned char *
data =
nullptr;
293 char *header =
nullptr;
294 char *
name =
nullptr;
297 if((biof = BIO_new_file(filename.c_str(),
"r"))) {
299 certs = sk_X509_new_null();
300 bool encountered_error =
false;
301 while ((!encountered_error) && (!BIO_eof(biof)) && PEM_read_bio(biof, &name, &header, &data, &len)) {
302 if (strcmp(name, PEM_STRING_X509) == 0 || strcmp(name, PEM_STRING_X509_OLD) == 0) {
303 X509 * tmp_cert =
nullptr;
306 const unsigned char *
p;
308 tmp_cert = d2i_X509(&tmp_cert, &p, len);
310 sk_X509_push(certs, tmp_cert);
312 encountered_error =
true;
315 if (data) { OPENSSL_free(data); data =
nullptr;}
316 if (header) { OPENSSL_free(header); header =
nullptr;}
317 if (name) { OPENSSL_free(name); name =
nullptr;}
319 X509 *x509cert =
nullptr;
320 if (!encountered_error && sk_X509_num(certs)) {
324 subject = X509_NAME_oneline(X509_get_subject_name(x509cert),0,0);
328 sk_X509_pop_free(certs, X509_free);
334 OPENSSL_free(subject);
343 char *
filename = getenv(
"X509_USER_PROXY");
347 std::stringstream
ss;
348 ss <<
"/tmp/x509up_u" << geteuid();
void setCurrentServer(const std::string &servername)
FileStatistics m_filestats
static const StorageStats & summary(void)
void filePreCloseEvent(std::string const &lfn, bool usedFallback)
static bool getX509Subject(std::string &)
std::map< std::string, boost::shared_ptr< OperationStats > > StorageStats
static X509 * findEEC(STACK_OF(X509)*certstack)
std::string m_serverdomain
#define JOB_UNIQUE_ID_ENV_V2
#define JOB_UNIQUE_ID_ENV
void determineHostnames(void)
void fillUDP(const std::string &, bool, std::string &)
void watchPreCloseFile(PreCloseFile::slot_type const &iSlot)
void fillUDP(std::ostringstream &os)
tuple idx
DEBUGGING if hasattr(process,"trackMonIterativeTracking2012"): print "trackMonIterativeTracking2012 D...
void setSize(size_t size)
virtual std::string const & siteName(void) const =0
char data[epos_bytes_allocation]
static const char * getJobID()
std::string m_clientdomain
StatisticsSenderService(edm::ParameterSet const &pset, edm::ActivityRegistry &ar)
virtual struct addrinfo const * statisticsDestination() const =0
static bool getX509SubjectFromFile(const std::string &filename, std::string &result)
tuple size
Write out results.
#define UPDATE_AND_OUTPUT_STATISTIC(x)