16 #include <openssl/x509.h>
17 #include <openssl/pem.h>
19 #define OUTPUT_STATISTIC(x) os << "\"" #x "\":" << (x - m_##x) << ", ";
24 #define HOST_NAME_MAX 128
30 using namespace edm::storage;
33 : m_read_single_operations(0),
34 m_read_single_bytes(0),
35 m_read_single_square(0),
36 m_read_vector_operations(0),
37 m_read_vector_bytes(0),
38 m_read_vector_square(0),
39 m_read_vector_count_sum(0),
40 m_read_vector_count_square(0),
45 ssize_t read_single_operations = 0;
46 ssize_t read_single_bytes = 0;
47 ssize_t read_single_square = 0;
48 ssize_t read_vector_operations = 0;
49 ssize_t read_vector_bytes = 0;
50 ssize_t read_vector_square = 0;
51 ssize_t read_vector_count_sum = 0;
52 ssize_t read_vector_count_square = 0;
54 for (StorageAccount::StorageStats::const_iterator
i = stats.begin();
i != stats.end(); ++
i) {
55 if (
i->first ==
token.value()) {
58 for (StorageAccount::OperationStats::const_iterator
j =
i->second.begin();
j !=
i->second.end(); ++
j) {
60 read_vector_operations +=
j->second.attempts;
61 read_vector_bytes +=
j->second.amount;
62 read_vector_count_square +=
j->second.vector_square;
63 read_vector_square +=
j->second.amount_square;
64 read_vector_count_sum +=
j->second.vector_count;
66 read_single_operations +=
j->second.attempts;
67 read_single_bytes +=
j->second.amount;
68 read_single_square +=
j->second.amount_square;
72 int64_t single_op_count = read_single_operations - m_read_single_operations;
73 if (single_op_count > 0) {
74 double single_sum = read_single_bytes - m_read_single_bytes;
75 double single_average = single_sum /
static_cast<double>(single_op_count);
76 os <<
"\"read_single_sigma\":"
77 <<
sqrt(
std::abs((static_cast<double>(read_single_square - m_read_single_square) -
78 single_average * single_average * single_op_count) /
79 static_cast<double>(single_op_count)))
81 os <<
"\"read_single_average\":" << single_average <<
", ";
83 int64_t vector_op_count = read_vector_operations - m_read_vector_operations;
84 if (vector_op_count > 0) {
85 double vector_average =
86 static_cast<double>(read_vector_bytes - m_read_vector_bytes) / static_cast<double>(vector_op_count);
87 os <<
"\"read_vector_average\":" << vector_average <<
", ";
88 os <<
"\"read_vector_sigma\":"
89 <<
sqrt(
std::abs((static_cast<double>(read_vector_square - m_read_vector_square) -
90 vector_average * vector_average * vector_op_count) /
91 static_cast<double>(vector_op_count)))
93 double vector_count_average =
94 static_cast<double>(read_vector_count_sum - m_read_vector_count_sum) / static_cast<double>(vector_op_count);
95 os <<
"\"read_vector_count_average\":" << vector_count_average <<
", ";
96 os <<
"\"read_vector_count_sigma\":"
97 <<
sqrt(
std::abs((static_cast<double>(read_vector_count_square - m_read_vector_count_square) -
98 vector_count_average * vector_count_average * vector_op_count) /
99 static_cast<double>(vector_op_count)))
103 os <<
"\"read_bytes\":" << (read_vector_bytes + read_single_bytes - m_read_vector_bytes - m_read_single_bytes)
105 os <<
"\"read_bytes_at_close\":"
106 << (read_vector_bytes + read_single_bytes - m_read_vector_bytes - m_read_single_bytes) <<
", ";
114 os <<
"\"start_time\":" << m_start_time <<
", ";
116 os <<
"\"end_time\":" << time(
nullptr);
121 ssize_t read_single_operations = 0;
122 ssize_t read_single_bytes = 0;
123 ssize_t read_single_square = 0;
124 ssize_t read_vector_operations = 0;
125 ssize_t read_vector_bytes = 0;
126 ssize_t read_vector_square = 0;
127 ssize_t read_vector_count_sum = 0;
128 ssize_t read_vector_count_square = 0;
130 for (StorageAccount::StorageStats::const_iterator
i = stats.begin();
i != stats.end(); ++
i) {
131 if (
i->first ==
token.value()) {
134 for (StorageAccount::OperationStats::const_iterator
j =
i->second.begin();
j !=
i->second.end(); ++
j) {
136 read_vector_operations +=
j->second.attempts;
137 read_vector_bytes +=
j->second.amount;
138 read_vector_count_square +=
j->second.vector_square;
139 read_vector_square +=
j->second.amount_square;
140 read_vector_count_sum +=
j->second.vector_count;
142 read_single_operations +=
j->second.attempts;
143 read_single_bytes +=
j->second.amount;
144 read_single_square +=
j->second.amount_square;
149 m_read_single_square = read_single_square;
150 m_read_vector_square = read_vector_square;
151 m_read_vector_count_square = read_vector_count_square;
152 m_read_vector_count_sum = read_vector_count_sum;
153 m_read_single_operations = read_single_operations;
154 m_read_single_bytes = read_single_bytes;
155 m_read_vector_operations = read_vector_operations;
156 m_read_vector_bytes = read_vector_bytes;
157 m_start_time = time(
nullptr);
161 m_serverhost(
"unknown"),
162 m_serverdomain(
"unknown"),
175 m_debug(iPSet.getUntrackedParameter<bool>(
"debug",
false)) {
192 return &
found->second;
195 if (
v.first.size() < iURL.size()) {
196 if (
v.first == iURL.substr(iURL.size() -
v.first.size())) {
203 if (std::string::npos == iURL.find(
':')) {
204 for (
auto const &
v : m_lfnToFileInfo) {
205 if ((std::string::npos !=
v.first.find(
':')) and (
v.first.size() > iURL.size())) {
206 if (iURL ==
v.first.substr(
v.first.size() - iURL.size())) {
218 size_t dot_pos = servername.find(
'.');
221 if (dot_pos == std::string::npos) {
222 serverhost = servername.substr(0, servername.find(
":"));
223 serverdomain =
"unknown";
225 serverhost = servername.substr(0, dot_pos);
226 serverdomain = servername.substr(dot_pos + 1, servername.find(
":") - dot_pos - 1);
227 if (serverdomain.empty()) {
228 serverdomain =
"unknown";
234 if (
nullptr != lfn) {
241 edm::LogWarning(
"StatisticsSenderService") <<
"setCurrentServer: unknown url name " << url <<
"\n";
249 if (attempt.second) {
250 attempt.first->second.m_size =
size;
251 attempt.first->second.m_id =
m_counter++;
252 edm::LogInfo(
"StatisticsSenderService") <<
"openingFile: opening " << lfn <<
"\n";
254 ++(attempt.first->second.m_openCount);
255 edm::LogInfo(
"StatisticsSenderService") <<
"openingFile: re-opening" << lfn <<
"\n";
271 if (info && !info->empty() && (
m_userdn !=
"unknown") &&
272 ((info->find(
"dn") == info->end()) || (info->find(
"nodn") != info->end()))) {
277 if (
nullptr != lfn) {
284 edm::LogSystem(
"StatisticSenderService") <<
"\n" << results <<
"\n";
287 for (
const struct addrinfo *address = addresses; address !=
nullptr; address = address->ai_next) {
288 int sock = socket(address->ai_family, address->ai_socktype, address->ai_protocol);
292 auto close_del = [](
int *iSocket) { close(*iSocket); };
293 std::unique_ptr<int, decltype(close_del)> guard(&sock, close_del);
294 if (sendto(sock, results.c_str(), results.size(), 0, address->ai_addr, address->ai_addrlen) >= 0) {
299 auto c = --
found->second.m_openCount;
302 edm::LogWarning(
"StatisticsSenderService") <<
"fully closed: " << *lfn <<
"\n";
304 edm::LogWarning(
"StatisticsSenderService") <<
"partially closed: " << *lfn <<
"\n";
308 edm::LogWarning(
"StatisticsSenderService") <<
"closed: unknown url name " << url <<
"\n";
314 bool moreToTest =
false;
318 if (it->second.m_openCount == 0) {
319 auto lfn = it->first;
320 bool moreToTest2 =
false;
324 if (it2->second == lfn) {
330 }
while (moreToTest2);
337 }
while (moreToTest);
342 if (
nullptr != lfn) {
345 itFound->second.m_size =
size;
348 edm::LogWarning(
"StatisticsSenderService") <<
"setSize: unknown url name " << url <<
"\n";
367 if (dot_pos == std::string::npos) {
379 std::ostringstream os;
383 if (!siteName.empty()) {
384 os <<
"\"site_name\":\"" << siteName <<
"\", ";
387 os <<
"\"fallback\": true, ";
389 os <<
"\"fallback\": false, ";
392 switch (fileinfo.
m_type) {
394 os <<
"\"primary\", ";
398 os <<
"\"secondary\", ";
402 os <<
"\"embedded\", ";
409 os <<
"\"user_dn\":\"" <<
m_userdn <<
"\", ";
412 os <<
"\"server_host\":\"" << serverhost <<
"\", ";
413 os <<
"\"server_domain\":\"" << serverdomain <<
"\", ";
414 os <<
"\"unique_id\":\"" <<
m_guid <<
"-" << fileinfo.
m_id <<
"\", ";
415 os <<
"\"file_lfn\":\"" << fileinfo.
m_filelfn <<
"\", ";
420 os <<
"\"app_info\":\"" << jobId <<
"\", ";
423 if (fileinfo.
m_size >= 0) {
424 os <<
"\"file_size\":" << fileinfo.
m_size <<
", ";
449 static X509 *
findEEC(STACK_OF(X509) * certstack) {
450 int depth = sk_X509_num(certstack);
455 char *priorsubject =
nullptr;
456 char *subject =
nullptr;
457 X509 *x509cert = sk_X509_value(certstack, idx);
458 for (; x509cert && idx > 0; idx--) {
459 subject = X509_NAME_oneline(X509_get_subject_name(x509cert),
nullptr, 0);
460 if (subject && priorsubject && (strncmp(subject, priorsubject, strlen(subject)) != 0)) {
463 x509cert = sk_X509_value(certstack, idx);
465 OPENSSL_free(subject);
470 OPENSSL_free(subject);
478 STACK_OF(X509) *certs =
nullptr;
479 char *subject =
nullptr;
480 unsigned char *
data =
nullptr;
481 char *header =
nullptr;
482 char *
name =
nullptr;
485 if ((biof = BIO_new_file(filename.c_str(),
"r"))) {
486 certs = sk_X509_new_null();
487 bool encountered_error =
false;
488 while ((!encountered_error) && (!BIO_eof(biof)) && PEM_read_bio(biof, &name, &header, &data, &len)) {
489 if (strcmp(name, PEM_STRING_X509) == 0 || strcmp(name, PEM_STRING_X509_OLD) == 0) {
490 X509 *tmp_cert =
nullptr;
493 const unsigned char *
p;
495 tmp_cert = d2i_X509(&tmp_cert, &p, len);
497 sk_X509_push(certs, tmp_cert);
499 encountered_error =
true;
507 OPENSSL_free(header);
515 X509 *x509cert =
nullptr;
516 if (!encountered_error && sk_X509_num(certs)) {
520 subject = X509_NAME_oneline(X509_get_subject_name(x509cert),
nullptr, 0);
524 sk_X509_pop_free(certs, X509_free);
530 OPENSSL_free(subject);
538 char *
filename = getenv(
"X509_USER_PROXY");
542 std::stringstream
ss;
543 ss <<
"/tmp/x509up_u" << geteuid();
static char const *const JOB_UNIQUE_ID_ENV_V2
void setSize(const std::string &urlOrLfn, size_t size)
void determineHostnames()
FileStatistics m_filestats
#define OUTPUT_STATISTIC(x)
static const StorageStats & summary(void)
static bool getX509Subject(std::string &)
void fillUDP(const std::string &site, const FileInfo &fileinfo, bool, std::string &) const
tbb::concurrent_unordered_map< std::string, std::string > m_urlToLfn
static X509 * findEEC(STACK_OF(X509)*certstack)
static char const *const JOB_UNIQUE_ID_ENV
tbb::concurrent_unordered_map< int, OperationStats > StorageStats
void watchPostCloseFile(PostCloseFile::slot_type const &iSlot)
static StorageClassToken tokenForStorageClassName(std::string const &iName)
std::atomic< ssize_t > m_size
std::string const * matchedLfn(std::string const &iURL)
Abs< T >::type abs(const T &t)
void fillUDP(std::ostringstream &os) const
void openingFile(std::string const &lfn, edm::InputType type, size_t size=-1)
void setCurrentServer(const std::string &urlOrLfn, const std::string &servername)
tbb::concurrent_unordered_map< std::string, FileInfo > m_lfnToFileInfo
tuple idx
DEBUGGING if hasattr(process,"trackMonIterativeTracking2012"): print "trackMonIterativeTracking2012 D...
void closedFile(std::string const &lfn, bool usedFallback)
virtual std::string const & siteName(void) const =0
char data[epos_bytes_allocation]
static const char * getJobID()
std::string m_serverdomain
FileInfo(std::string const &iLFN, edm::InputType)
std::string m_clientdomain
StatisticsSenderService(edm::ParameterSet const &pset, edm::ActivityRegistry &ar)
volatile std::atomic< bool > shutdown_flag false
virtual struct addrinfo const * statisticsDestination() const =0
static bool getX509SubjectFromFile(const std::string &filename, std::string &result)
tuple size
Write out results.
virtual std::set< std::string > const * statisticsInfo() const =0
void filePostCloseEvent(std::string const &lfn, bool usedFallback)