17 #include <openssl/x509.h> 18 #include <openssl/pem.h> 20 #define OUTPUT_STATISTIC(x) os << "\"" #x "\":" << (x - m_##x) << ", "; 25 #define HOST_NAME_MAX 128 34 : m_read_single_operations(0),
35 m_read_single_bytes(0),
36 m_read_single_square(0),
37 m_read_vector_operations(0),
38 m_read_vector_bytes(0),
39 m_read_vector_square(0),
40 m_read_vector_count_sum(0),
41 m_read_vector_count_square(0),
46 ssize_t read_single_operations = 0;
47 ssize_t read_single_bytes = 0;
48 ssize_t read_single_square = 0;
49 ssize_t read_vector_operations = 0;
50 ssize_t read_vector_bytes = 0;
51 ssize_t read_vector_square = 0;
52 ssize_t read_vector_count_sum = 0;
53 ssize_t read_vector_count_square = 0;
55 for (StorageAccount::StorageStats::const_iterator
i = stats.begin();
i != stats.end(); ++
i) {
56 if (
i->first == token.value()) {
59 for (StorageAccount::OperationStats::const_iterator j =
i->second.begin(); j !=
i->second.end(); ++j) {
61 read_vector_operations += j->second.attempts;
62 read_vector_bytes += j->second.amount;
63 read_vector_count_square += j->second.vector_square;
64 read_vector_square += j->second.amount_square;
65 read_vector_count_sum += j->second.vector_count;
67 read_single_operations += j->second.attempts;
68 read_single_bytes += j->second.amount;
69 read_single_square += j->second.amount_square;
74 if (single_op_count > 0) {
76 double single_average = single_sum /
static_cast<double>(single_op_count);
77 os <<
"\"read_single_sigma\":" 79 single_average * single_average * single_op_count) /
80 static_cast<double>(single_op_count)))
82 os <<
"\"read_single_average\":" << single_average <<
", ";
85 if (vector_op_count > 0) {
86 double vector_average =
87 static_cast<double>(read_vector_bytes -
m_read_vector_bytes) / static_cast<double>(vector_op_count);
88 os <<
"\"read_vector_average\":" << vector_average <<
", ";
89 os <<
"\"read_vector_sigma\":" 91 vector_average * vector_average * vector_op_count) /
92 static_cast<double>(vector_op_count)))
94 double vector_count_average =
96 os <<
"\"read_vector_count_average\":" << vector_count_average <<
", ";
97 os <<
"\"read_vector_count_sigma\":" 99 vector_count_average * vector_count_average * vector_op_count) /
100 static_cast<double>(vector_op_count)))
106 os <<
"\"read_bytes_at_close\":" 117 os <<
"\"end_time\":" <<
time(
nullptr);
122 ssize_t read_single_operations = 0;
123 ssize_t read_single_bytes = 0;
124 ssize_t read_single_square = 0;
125 ssize_t read_vector_operations = 0;
126 ssize_t read_vector_bytes = 0;
127 ssize_t read_vector_square = 0;
128 ssize_t read_vector_count_sum = 0;
129 ssize_t read_vector_count_square = 0;
131 for (StorageAccount::StorageStats::const_iterator
i = stats.begin();
i != stats.end(); ++
i) {
132 if (
i->first == token.value()) {
135 for (StorageAccount::OperationStats::const_iterator j =
i->second.begin(); j !=
i->second.end(); ++j) {
137 read_vector_operations += j->second.attempts;
138 read_vector_bytes += j->second.amount;
139 read_vector_count_square += j->second.vector_square;
140 read_vector_square += j->second.amount_square;
141 read_vector_count_sum += j->second.vector_count;
143 read_single_operations += j->second.attempts;
144 read_single_bytes += j->second.amount;
145 read_single_square += j->second.amount_square;
162 m_serverhost(
"unknown"),
163 m_serverdomain(
"unknown"),
193 return &
found->second;
196 if (
v.first.size() < iURL.size()) {
197 if (
v.first == iURL.substr(iURL.size() -
v.first.size())) {
204 if (std::string::npos == iURL.find(
':')) {
205 for (
auto const &
v : m_lfnToFileInfo) {
206 if ((std::string::npos !=
v.first.find(
':')) and (
v.first.size() > iURL.size())) {
207 if (iURL ==
v.first.substr(
v.first.size() - iURL.size())) {
219 size_t dot_pos = servername.find(
'.');
222 if (dot_pos == std::string::npos) {
223 serverhost = servername.substr(0, servername.find(
":"));
224 serverdomain =
"unknown";
226 serverhost = servername.substr(0, dot_pos);
227 serverdomain = servername.substr(dot_pos + 1, servername.find(
":") - dot_pos - 1);
228 if (serverdomain.empty()) {
229 serverdomain =
"unknown";
235 if (
nullptr != lfn) {
242 edm::LogWarning(
"StatisticsSenderService") <<
"setCurrentServer: unknown url name " << url <<
"\n";
250 if (attempt.second) {
251 attempt.first->second.m_size =
size;
252 attempt.first->second.m_id =
m_counter++;
253 edm::LogInfo(
"StatisticsSenderService") <<
"openingFile: opening " << lfn <<
"\n";
255 ++(attempt.first->second.m_openCount);
256 edm::LogInfo(
"StatisticsSenderService") <<
"openingFile: re-opening" << lfn <<
"\n";
272 if (info && !info->empty() && (
m_userdn !=
"unknown") &&
273 ((info->find(
"dn") == info->end()) || (info->find(
"nodn") != info->end()))) {
278 if (
nullptr != lfn) {
285 edm::LogSystem(
"StatisticSenderService") <<
"\n" << results <<
"\n";
288 for (
const struct addrinfo *address = addresses; address !=
nullptr; address = address->ai_next) {
289 int sock = socket(address->ai_family, address->ai_socktype, address->ai_protocol);
293 auto close_del = [](
int *iSocket) { close(*iSocket); };
294 std::unique_ptr<int, decltype(close_del)> guard(&sock, close_del);
295 if (sendto(sock, results.c_str(), results.size(), 0, address->ai_addr, address->ai_addrlen) >= 0) {
300 auto c = --
found->second.m_openCount;
303 edm::LogWarning(
"StatisticsSenderService") <<
"fully closed: " << *lfn <<
"\n";
305 edm::LogWarning(
"StatisticsSenderService") <<
"partially closed: " << *lfn <<
"\n";
309 edm::LogWarning(
"StatisticsSenderService") <<
"closed: unknown url name " << url <<
"\n";
315 bool moreToTest =
false;
319 if (it->second.m_openCount == 0) {
320 auto lfn = it->first;
321 bool moreToTest2 =
false;
325 if (it2->second == lfn) {
331 }
while (moreToTest2);
338 }
while (moreToTest);
343 if (
nullptr != lfn) {
346 itFound->second.m_size =
size;
349 edm::LogWarning(
"StatisticsSenderService") <<
"setSize: unknown url name " << url <<
"\n";
368 if (dot_pos == std::string::npos) {
380 std::ostringstream os;
384 if (!siteName.empty()) {
385 os <<
"\"site_name\":\"" << siteName <<
"\", ";
391 os <<
"\"fallback\": true, ";
393 os <<
"\"fallback\": false, ";
396 switch (fileinfo.
m_type) {
398 os <<
"\"primary\", ";
402 os <<
"\"secondary\", ";
406 os <<
"\"embedded\", ";
413 os <<
"\"user_dn\":\"" <<
m_userdn <<
"\", ";
416 os <<
"\"server_host\":\"" << serverhost <<
"\", ";
417 os <<
"\"server_domain\":\"" << serverdomain <<
"\", ";
418 os <<
"\"unique_id\":\"" <<
m_guid <<
"-" << fileinfo.
m_id <<
"\", ";
419 os <<
"\"file_lfn\":\"" << fileinfo.
m_filelfn <<
"\", ";
424 os <<
"\"app_info\":\"" << jobId <<
"\", ";
427 if (fileinfo.
m_size >= 0) {
428 os <<
"\"file_size\":" << fileinfo.
m_size <<
", ";
454 int depth = sk_X509_num(certstack);
459 char *priorsubject =
nullptr;
460 char *subject =
nullptr;
461 X509 *x509cert = sk_X509_value(certstack, idx);
462 for (; x509cert && idx > 0; idx--) {
463 subject = X509_NAME_oneline(X509_get_subject_name(x509cert),
nullptr, 0);
464 if (subject && priorsubject && (strncmp(subject, priorsubject, strlen(subject)) != 0)) {
467 x509cert = sk_X509_value(certstack, idx);
469 OPENSSL_free(subject);
474 OPENSSL_free(subject);
482 STACK_OF(
X509) *certs =
nullptr;
483 char *subject =
nullptr;
484 unsigned char *
data =
nullptr;
486 char *
name =
nullptr;
489 if ((biof = BIO_new_file(filename.c_str(),
"r"))) {
490 certs = sk_X509_new_null();
491 bool encountered_error =
false;
492 while ((!encountered_error) && (!BIO_eof(biof)) && PEM_read_bio(biof, &name, &header, &data, &len)) {
493 if (strcmp(name, PEM_STRING_X509) == 0 || strcmp(name, PEM_STRING_X509_OLD) == 0) {
494 X509 *tmp_cert =
nullptr;
497 const unsigned char *
p;
499 tmp_cert = d2i_X509(&tmp_cert, &p, len);
501 sk_X509_push(certs, tmp_cert);
503 encountered_error =
true;
511 OPENSSL_free(header);
519 X509 *x509cert =
nullptr;
520 if (!encountered_error && sk_X509_num(certs)) {
524 subject = X509_NAME_oneline(X509_get_subject_name(x509cert),
nullptr, 0);
528 sk_X509_pop_free(certs, X509_free);
534 OPENSSL_free(subject);
542 char *
filename = getenv(
"X509_USER_PROXY");
546 std::stringstream ss;
547 ss <<
"/tmp/x509up_u" << geteuid();
virtual struct addrinfo const * statisticsDestination() const =0
ssize_t m_read_single_operations
static char const *const JOB_UNIQUE_ID_ENV_V2
ssize_t m_read_vector_bytes
void setSize(const std::string &urlOrLfn, size_t size)
void determineHostnames()
FileStatistics m_filestats
#define OUTPUT_STATISTIC(x)
ssize_t m_read_single_bytes
static const StorageStats & summary(void)
static bool getX509Subject(std::string &)
void fillUDP(const std::string &site, const FileInfo &fileinfo, bool, std::string &) const
virtual std::set< std::string > const * statisticsInfo() const =0
tbb::concurrent_unordered_map< std::string, std::string > m_urlToLfn
static X509 * findEEC(STACK_OF(X509)*certstack)
static char const *const JOB_UNIQUE_ID_ENV
tbb::concurrent_unordered_map< int, OperationStats > StorageStats
void watchPostCloseFile(PostCloseFile::slot_type const &iSlot)
static StorageClassToken tokenForStorageClassName(std::string const &iName)
ssize_t m_read_vector_count_sum
virtual std::string const & siteName(void) const =0
std::atomic< ssize_t > m_size
std::string const * matchedLfn(std::string const &iURL)
Abs< T >::type abs(const T &t)
std::string toString(const std::pair< T, T > &aT)
void fillUDP(std::ostringstream &os) const
void openingFile(std::string const &lfn, edm::InputType type, size_t size=-1)
std::string getReleaseVersion()
ssize_t m_read_vector_square
void setCurrentServer(const std::string &urlOrLfn, const std::string &servername)
tbb::concurrent_unordered_map< std::string, FileInfo > m_lfnToFileInfo
void closedFile(std::string const &lfn, bool usedFallback)
char data[epos_bytes_allocation]
static const char * getJobID()
ssize_t m_read_single_square
ssize_t m_read_vector_operations
std::string m_serverdomain
FileInfo(std::string const &iLFN, edm::InputType)
std::string m_clientdomain
StatisticsSenderService(edm::ParameterSet const &pset, edm::ActivityRegistry &ar)
ssize_t m_read_vector_count_square
static bool getX509SubjectFromFile(const std::string &filename, std::string &result)
void filePostCloseEvent(std::string const &lfn, bool usedFallback)