17 #include <openssl/x509.h> 18 #include <openssl/pem.h> 20 #define OUTPUT_STATISTIC(x) os << "\"" #x "\":" << (x - m_##x) << ", "; 25 #define HOST_NAME_MAX 128 34 : m_read_single_operations(0),
35 m_read_single_bytes(0),
36 m_read_single_square(0),
37 m_read_vector_operations(0),
38 m_read_vector_bytes(0),
39 m_read_vector_square(0),
40 m_read_vector_count_sum(0),
41 m_read_vector_count_square(0),
42 m_start_time(
time(nullptr)) {}
46 ssize_t read_single_operations = 0;
47 ssize_t read_single_bytes = 0;
48 ssize_t read_single_square = 0;
49 ssize_t read_vector_operations = 0;
50 ssize_t read_vector_bytes = 0;
51 ssize_t read_vector_square = 0;
52 ssize_t read_vector_count_sum = 0;
53 ssize_t read_vector_count_square = 0;
55 for (StorageAccount::StorageStats::const_iterator
i =
stats.begin();
i !=
stats.end(); ++
i) {
56 if (
i->first ==
token.value()) {
59 for (StorageAccount::OperationStats::const_iterator
j =
i->second.begin();
j !=
i->second.end(); ++
j) {
61 read_vector_operations +=
j->second.attempts;
62 read_vector_bytes +=
j->second.amount;
63 read_vector_count_square +=
j->second.vector_square;
64 read_vector_square +=
j->second.amount_square;
65 read_vector_count_sum +=
j->second.vector_count;
67 read_single_operations +=
j->second.attempts;
68 read_single_bytes +=
j->second.amount;
69 read_single_square +=
j->second.amount_square;
73 int64_t single_op_count = read_single_operations - m_read_single_operations;
74 if (single_op_count > 0) {
75 double single_sum = read_single_bytes - m_read_single_bytes;
76 double single_average = single_sum /
static_cast<double>(single_op_count);
77 os <<
"\"read_single_sigma\":" 78 <<
sqrt(
std::abs((static_cast<double>(read_single_square - m_read_single_square) -
79 single_average * single_average * single_op_count) /
80 static_cast<double>(single_op_count)))
82 os <<
"\"read_single_average\":" << single_average <<
", ";
84 int64_t vector_op_count = read_vector_operations - m_read_vector_operations;
85 if (vector_op_count > 0) {
86 double vector_average =
87 static_cast<double>(read_vector_bytes - m_read_vector_bytes) / static_cast<double>(vector_op_count);
88 os <<
"\"read_vector_average\":" << vector_average <<
", ";
89 os <<
"\"read_vector_sigma\":" 90 <<
sqrt(
std::abs((static_cast<double>(read_vector_square - m_read_vector_square) -
91 vector_average * vector_average * vector_op_count) /
92 static_cast<double>(vector_op_count)))
94 double vector_count_average =
95 static_cast<double>(read_vector_count_sum - m_read_vector_count_sum) / static_cast<double>(vector_op_count);
96 os <<
"\"read_vector_count_average\":" << vector_count_average <<
", ";
97 os <<
"\"read_vector_count_sigma\":" 98 <<
sqrt(
std::abs((static_cast<double>(read_vector_count_square - m_read_vector_count_square) -
99 vector_count_average * vector_count_average * vector_op_count) /
100 static_cast<double>(vector_op_count)))
104 os <<
"\"read_bytes\":" << (read_vector_bytes + read_single_bytes - m_read_vector_bytes - m_read_single_bytes)
106 os <<
"\"read_bytes_at_close\":" 107 << (read_vector_bytes + read_single_bytes - m_read_vector_bytes - m_read_single_bytes) <<
", ";
115 os <<
"\"start_time\":" << m_start_time <<
", ";
117 os <<
"\"end_time\":" <<
time(
nullptr);
122 ssize_t read_single_operations = 0;
123 ssize_t read_single_bytes = 0;
124 ssize_t read_single_square = 0;
125 ssize_t read_vector_operations = 0;
126 ssize_t read_vector_bytes = 0;
127 ssize_t read_vector_square = 0;
128 ssize_t read_vector_count_sum = 0;
129 ssize_t read_vector_count_square = 0;
131 for (StorageAccount::StorageStats::const_iterator
i =
stats.begin();
i !=
stats.end(); ++
i) {
132 if (
i->first ==
token.value()) {
135 for (StorageAccount::OperationStats::const_iterator
j =
i->second.begin();
j !=
i->second.end(); ++
j) {
137 read_vector_operations +=
j->second.attempts;
138 read_vector_bytes +=
j->second.amount;
139 read_vector_count_square +=
j->second.vector_square;
140 read_vector_square +=
j->second.amount_square;
141 read_vector_count_sum +=
j->second.vector_count;
143 read_single_operations +=
j->second.attempts;
144 read_single_bytes +=
j->second.amount;
145 read_single_square +=
j->second.amount_square;
150 m_read_single_square = read_single_square;
151 m_read_vector_square = read_vector_square;
152 m_read_vector_count_square = read_vector_count_square;
153 m_read_vector_count_sum = read_vector_count_sum;
154 m_read_single_operations = read_single_operations;
155 m_read_single_bytes = read_single_bytes;
156 m_read_vector_operations = read_vector_operations;
157 m_read_vector_bytes = read_vector_bytes;
158 m_start_time =
time(
nullptr);
162 m_serverhost(
"unknown"),
163 m_serverdomain(
"unknown"),
193 return &
found->second;
196 if (
v.first.size() < iURL.size()) {
197 if (
v.first == iURL.substr(iURL.size() -
v.first.size())) {
204 if (std::string::npos == iURL.find(
':')) {
206 if ((std::string::npos !=
v.first.find(
':')) and (
v.first.size() > iURL.size())) {
207 if (iURL ==
v.first.substr(
v.first.size() - iURL.size())) {
219 size_t dot_pos = servername.find(
'.');
222 if (dot_pos == std::string::npos) {
223 serverhost = servername.substr(0, servername.find(
':'));
224 serverdomain =
"unknown";
226 serverhost = servername.substr(0, dot_pos);
227 serverdomain = servername.substr(dot_pos + 1, servername.find(
':') - dot_pos - 1);
228 if (serverdomain.empty()) {
229 serverdomain =
"unknown";
235 if (
nullptr != lfn) {
242 edm::LogWarning(
"StatisticsSenderService") <<
"setCurrentServer: unknown url name " <<
url <<
"\n";
250 if (attempt.second) {
251 attempt.first->second.m_size =
size;
252 attempt.first->second.m_id =
m_counter++;
253 edm::LogInfo(
"StatisticsSenderService") <<
"openingFile: opening " << lfn <<
"\n";
255 ++(attempt.first->second.m_openCount);
256 edm::LogInfo(
"StatisticsSenderService") <<
"openingFile: re-opening" << lfn <<
"\n";
273 ((
info->find(
"dn") ==
info->end()) || (
info->find(
"nodn") !=
info->end()))) {
278 if (
nullptr != lfn) {
288 for (
const struct addrinfo *address = addresses; address !=
nullptr; address = address->ai_next) {
289 int sock = socket(address->ai_family, address->ai_socktype, address->ai_protocol);
293 auto close_del = [](
int *iSocket) { close(*iSocket); };
294 std::unique_ptr<int, decltype(close_del)> guard(&sock, close_del);
295 if (sendto(sock,
results.c_str(),
results.size(), 0, address->ai_addr, address->ai_addrlen) >= 0) {
300 auto c = --
found->second.m_openCount;
303 edm::LogWarning(
"StatisticsSenderService") <<
"fully closed: " << *lfn <<
"\n";
305 edm::LogWarning(
"StatisticsSenderService") <<
"partially closed: " << *lfn <<
"\n";
309 edm::LogWarning(
"StatisticsSenderService") <<
"closed: unknown url name " <<
url <<
"\n";
315 bool moreToTest =
false;
319 if (it->second.m_openCount == 0) {
320 auto lfn = it->first;
321 bool moreToTest2 =
false;
325 if (it2->second == lfn) {
331 }
while (moreToTest2);
338 }
while (moreToTest);
343 if (
nullptr != lfn) {
346 itFound->second.m_size =
size;
349 edm::LogWarning(
"StatisticsSenderService") <<
"setSize: unknown url name " <<
url <<
"\n";
368 if (dot_pos == std::string::npos) {
380 std::ostringstream os;
384 if (!siteName.empty()) {
385 os <<
"\"site_name\":\"" << siteName <<
"\", ";
391 os <<
"\"fallback\": true, ";
393 os <<
"\"fallback\": false, ";
395 os <<
"\"read_type\": ";
396 switch (fileinfo.
m_type) {
398 os <<
"\"primary\", ";
402 os <<
"\"secondary\", ";
406 os <<
"\"embedded\", ";
413 os <<
"\"user_dn\":\"" <<
m_userdn <<
"\", ";
416 os <<
"\"server_host\":\"" << serverhost <<
"\", ";
417 os <<
"\"server_domain\":\"" << serverdomain <<
"\", ";
418 os <<
"\"unique_id\":\"" <<
m_guid <<
"-" << fileinfo.
m_id <<
"\", ";
419 os <<
"\"file_lfn\":\"" << fileinfo.
m_filelfn <<
"\", ";
424 os <<
"\"app_info\":\"" << jobId <<
"\", ";
427 if (fileinfo.
m_size >= 0) {
428 os <<
"\"file_size\":" << fileinfo.
m_size <<
", ";
454 int depth = sk_X509_num(certstack);
459 char *priorsubject =
nullptr;
460 char *subject =
nullptr;
461 X509 *x509cert = sk_X509_value(certstack,
idx);
462 for (; x509cert &&
idx > 0;
idx--) {
463 subject = X509_NAME_oneline(X509_get_subject_name(x509cert),
nullptr, 0);
464 if (subject && priorsubject && (strncmp(subject, priorsubject, strlen(subject)) != 0)) {
467 x509cert = sk_X509_value(certstack,
idx);
469 OPENSSL_free(subject);
474 OPENSSL_free(subject);
482 STACK_OF(
X509) *certs =
nullptr;
483 char *subject =
nullptr;
484 unsigned char *
data =
nullptr;
486 char *
name =
nullptr;
489 if ((biof = BIO_new_file(
filename.c_str(),
"r"))) {
490 certs = sk_X509_new_null();
491 bool encountered_error =
false;
492 while ((!encountered_error) && (!BIO_eof(biof)) && PEM_read_bio(biof, &
name, &
header, &
data, &len)) {
493 if (strcmp(
name, PEM_STRING_X509) == 0 || strcmp(
name, PEM_STRING_X509_OLD) == 0) {
494 X509 *tmp_cert =
nullptr;
497 const unsigned char *
p;
499 tmp_cert = d2i_X509(&tmp_cert, &
p, len);
501 sk_X509_push(certs, tmp_cert);
503 encountered_error =
true;
519 X509 *x509cert =
nullptr;
520 if (!encountered_error && sk_X509_num(certs)) {
524 subject = X509_NAME_oneline(X509_get_subject_name(x509cert),
nullptr, 0);
528 sk_X509_pop_free(certs, X509_free);
534 OPENSSL_free(subject);
542 char *
filename = std::getenv(
"X509_USER_PROXY");
546 std::stringstream
ss;
547 ss <<
"/tmp/x509up_u" << geteuid();
Log< level::System, false > LogSystem
static StorageClassToken tokenForStorageClassName(std::string const &iName)
void setSize(const std::string &urlOrLfn, size_t size)
oneapi::tbb::concurrent_unordered_map< int, OperationStats > StorageStats
void determineHostnames()
FileStatistics m_filestats
#define OUTPUT_STATISTIC(x)
void fillUDP(const std::string &site, const FileInfo &fileinfo, bool, std::string &) const
static bool getX509Subject(std::string &)
static constexpr char const *const JOB_UNIQUE_ID_ENV_V2
Guid const & processGUID()
std::string const * matchedLfn(std::string const &iURL)
void watchPostCloseFile(PostCloseFile::slot_type const &iSlot)
std::atomic< ssize_t > m_size
Abs< T >::type abs(const T &t)
static constexpr char const *const JOB_UNIQUE_ID_ENV
oneapi::tbb::concurrent_unordered_map< std::string, FileInfo > m_lfnToFileInfo
void openingFile(std::string const &lfn, edm::InputType type, size_t size=-1)
std::string getReleaseVersion()
Log< level::Info, false > LogInfo
void fillUDP(std::ostringstream &os) const
void setCurrentServer(const std::string &urlOrLfn, const std::string &servername)
static X509 * findEEC(STACK_OF(X509) *certstack)
void closedFile(std::string const &lfn, bool usedFallback)
static const StorageStats & summary(void)
virtual std::string const & siteName(void) const =0
oneapi::tbb::concurrent_unordered_map< std::string, std::string > m_urlToLfn
char data[epos_bytes_allocation]
static const char * getJobID()
std::string m_serverdomain
FileInfo(std::string const &iLFN, edm::InputType)
std::string m_clientdomain
void filePostCloseEvent(std::string const &lfn)
StatisticsSenderService(edm::ParameterSet const &pset, edm::ActivityRegistry &ar)
Log< level::Warning, false > LogWarning
virtual struct addrinfo const * statisticsDestination() const =0
static bool getX509SubjectFromFile(const std::string &filename, std::string &result)
virtual std::set< std::string > const * statisticsInfo() const =0