00001
00002 #include "Utilities/StorageFactory/interface/StatisticsSenderService.h"
00003 #include "Utilities/StorageFactory/interface/StorageAccount.h"
00004 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
00005 #include "FWCore/Catalog/interface/SiteLocalConfig.h"
00006 #include "FWCore/ServiceRegistry/interface/Service.h"
00007 #include "FWCore/Utilities/src/Guid.h"
00008
00009 #include <string>
00010
00011 #include <unistd.h>
00012 #include <fcntl.h>
00013 #include <string.h>
00014
00015 #include <openssl/x509.h>
00016 #include <openssl/pem.h>
00017
00018 #define UPDATE_STATISTIC(x) \
00019 m_ ## x = x;
00020
00021 #define UPDATE_AND_OUTPUT_STATISTIC(x) \
00022 os << "\"" #x "\":" << (x-m_ ## x) << ", "; \
00023 UPDATE_STATISTIC(x)
00024
00025
00026
00027 #ifndef HOST_NAME_MAX
00028 #define HOST_NAME_MAX 128
00029 #endif
00030
00031 #define JOB_UNIQUE_ID_ENV "CRAB_UNIQUE_JOB_ID"
00032 #define JOB_UNIQUE_ID_ENV_V2 "DashboardJobId"
00033
00034 using namespace edm::storage;
00035
00036 StatisticsSenderService::FileStatistics::FileStatistics() :
00037 m_read_single_operations(0),
00038 m_read_single_bytes(0),
00039 m_read_single_square(0),
00040 m_read_vector_operations(0),
00041 m_read_vector_bytes(0),
00042 m_read_vector_square(0),
00043 m_read_vector_count_sum(0),
00044 m_read_vector_count_square(0),
00045 m_read_bytes_at_close(0),
00046 m_start_time(time(NULL))
00047 {}
00048
00049 void
00050 StatisticsSenderService::FileStatistics::fillUDP(std::ostringstream &os) {
00051 const StorageAccount::StorageStats &stats = StorageAccount::summary();
00052 ssize_t read_single_operations = 0;
00053 ssize_t read_single_bytes = 0;
00054 ssize_t read_single_square = 0;
00055 ssize_t read_vector_operations = 0;
00056 ssize_t read_vector_bytes = 0;
00057 ssize_t read_vector_square = 0;
00058 ssize_t read_vector_count_sum = 0;
00059 ssize_t read_vector_count_square = 0;
00060 for (StorageAccount::StorageStats::const_iterator i = stats.begin (); i != stats.end(); ++i) {
00061 if (i->first == "tstoragefile") {
00062 continue;
00063 }
00064 for (StorageAccount::OperationStats::const_iterator j = i->second->begin(); j != i->second->end(); ++j) {
00065 if (j->first == "readv") {
00066 read_vector_operations += j->second.attempts;
00067 read_vector_bytes += j->second.amount;
00068 read_vector_count_square += j->second.vector_square;
00069 read_vector_square += j->second.amount_square;
00070 read_vector_count_sum += j->second.vector_count;
00071 } else if (j->first == "read") {
00072 read_single_operations += j->second.attempts;
00073 read_single_bytes += j->second.amount;
00074 read_single_square += j->second.amount_square;
00075 }
00076 }
00077 }
00078 int64_t single_op_count = read_single_operations - m_read_single_operations;
00079 if (single_op_count > 0) {
00080 double single_sum = read_single_bytes-m_read_single_bytes;
00081 double single_average = single_sum/static_cast<double>(single_op_count);
00082 os << "\"read_single_sigma\":" << sqrt((static_cast<double>(read_single_square-m_read_single_square) - single_average*single_average*single_op_count)/static_cast<double>(single_op_count)) << ", ";
00083 os << "\"read_single_average\":" << single_average << ", ";
00084 }
00085 m_read_single_square = read_single_square;
00086 int64_t vector_op_count = read_vector_operations - m_read_vector_operations;
00087 if (vector_op_count > 0) {
00088 double vector_average = static_cast<double>(read_vector_bytes-m_read_vector_bytes)/static_cast<double>(vector_op_count);
00089 os << "\"read_vector_average\":" << vector_average << ", ";
00090 os << "\"read_vector_sigma\":" << sqrt((static_cast<double>(read_vector_square-m_read_vector_square) - vector_average*vector_average*vector_op_count)/static_cast<double>(vector_op_count)) << ", ";
00091 double vector_count_average = static_cast<double>(read_vector_count_sum-m_read_vector_count_sum)/static_cast<double>(vector_op_count);
00092 os << "\"read_vector_count_average\":" << vector_count_average << ", ";
00093 os << "\"read_vector_count_sigma\":" << sqrt((static_cast<double>(read_vector_count_square-m_read_vector_count_square) - vector_count_average*vector_count_average*vector_op_count)/static_cast<double>(vector_op_count)) << ", ";
00094 }
00095 m_read_vector_square = read_vector_square;
00096 m_read_vector_count_square = read_vector_count_square;
00097 m_read_vector_count_sum = read_vector_count_sum;
00098
00099 os << "\"read_bytes\":" << (read_vector_bytes + read_single_bytes - m_read_vector_bytes - m_read_single_bytes) << ", ";
00100 os << "\"read_bytes_at_close\":" << (read_vector_bytes + read_single_bytes - m_read_vector_bytes - m_read_single_bytes) << ", ";
00101
00102
00103 UPDATE_AND_OUTPUT_STATISTIC(read_single_operations)
00104 UPDATE_AND_OUTPUT_STATISTIC(read_single_bytes)
00105 UPDATE_AND_OUTPUT_STATISTIC(read_vector_operations)
00106 UPDATE_AND_OUTPUT_STATISTIC(read_vector_bytes)
00107
00108 os << "\"start_time\":" << m_start_time << ", ";
00109 m_start_time = time(NULL);
00110
00111 os << "\"end_time\":" << m_start_time;
00112 }
00113
00114 StatisticsSenderService::StatisticsSenderService(edm::ParameterSet const& , edm::ActivityRegistry& ar) :
00115 m_clienthost("unknown"),
00116 m_clientdomain("unknown"),
00117 m_serverhost("unknown"),
00118 m_serverdomain("unknown"),
00119 m_filelfn("unknown"),
00120 m_filestats(),
00121 m_guid(Guid().toString()),
00122 m_counter(0),
00123 m_size(-1),
00124 m_userdn("unknown")
00125 {
00126 determineHostnames();
00127 ar.watchPreCloseFile(this, &StatisticsSenderService::filePreCloseEvent);
00128 if (!getX509Subject(m_userdn)) {
00129 m_userdn = "unknown";
00130 }
00131 }
00132
00133 const char *
00134 StatisticsSenderService::getJobID() {
00135 const char * id = getenv(JOB_UNIQUE_ID_ENV);
00136
00137 return id ? id : getenv(JOB_UNIQUE_ID_ENV_V2);
00138 }
00139
00140 void
00141 StatisticsSenderService::setCurrentServer(const std::string &servername) {
00142 size_t dot_pos = servername.find(".");
00143 if (dot_pos == std::string::npos) {
00144 m_serverhost = servername.substr(0, servername.find(":"));
00145 m_serverdomain = "unknown";
00146 } else {
00147 m_serverhost = servername.substr(0, dot_pos);
00148 m_serverdomain = servername.substr(dot_pos+1, servername.find(":")-dot_pos-1);
00149 if (m_serverdomain.empty()) {
00150 m_serverdomain = "unknown";
00151 }
00152 }
00153 }
00154
00155 void
00156 StatisticsSenderService::setSize(size_t size) {
00157 m_size = size;
00158 }
00159
00160 void
00161 StatisticsSenderService::filePreCloseEvent(std::string const& lfn, bool usedFallback) {
00162 m_filelfn = lfn;
00163
00164 edm::Service<edm::SiteLocalConfig> pSLC;
00165 if (!pSLC.isAvailable()) {
00166 return;
00167 }
00168
00169 const struct addrinfo * addresses = pSLC->statisticsDestination();
00170 if (!addresses) {
00171 return;
00172 }
00173
00174 std::string results;
00175 fillUDP(pSLC->siteName(), usedFallback, results);
00176
00177 for (const struct addrinfo *address = addresses; address != nullptr; address = address->ai_next) {
00178 int sock = socket(address->ai_family, address->ai_socktype, address->ai_protocol);
00179 if (sock < 0) {
00180 continue;
00181 }
00182 if (sendto(sock, results.c_str(), results.size(), 0, address->ai_addr, address->ai_addrlen) >= 0) {
00183 break;
00184 }
00185 }
00186
00187 m_counter++;
00188 }
00189
00190 void
00191 StatisticsSenderService::determineHostnames(void) {
00192 char tmpName[HOST_NAME_MAX];
00193 if (gethostname(tmpName, HOST_NAME_MAX) != 0) {
00194
00195 m_clienthost = "unknown";
00196 } else {
00197 m_clienthost = tmpName;
00198 }
00199 size_t dot_pos = m_clienthost.find(".");
00200 if (dot_pos == std::string::npos) {
00201 m_clientdomain = "unknown";
00202 } else {
00203 m_clientdomain = m_clienthost.substr(dot_pos+1, m_clienthost.size()-dot_pos-1);
00204 m_clienthost = m_clienthost.substr(0, dot_pos);
00205 }
00206 }
00207
00208 void
00209 StatisticsSenderService::fillUDP(const std::string& siteName, bool usedFallback, std::string &udpinfo) {
00210 std::ostringstream os;
00211
00212
00213 os << "{";
00214 if (!siteName.empty()) {
00215 os << "\"site_name\":\"" << siteName << "\", ";
00216 }
00217 if (usedFallback) {
00218 os << "\"fallback\": true, ";
00219 }
00220 os << "\"user_dn\":\"" << m_userdn << "\", ";
00221 os << "\"client_host\":\"" << m_clienthost << "\", ";
00222 os << "\"client_domain\":\"" << m_clientdomain << "\", ";
00223 os << "\"server_host\":\"" << m_serverhost << "\", ";
00224 os << "\"server_domain\":\"" << m_serverdomain << "\", ";
00225 os << "\"unique_id\":\"" << m_guid << "-" << m_counter << "\", ";
00226 os << "\"file_lfn\":\"" << m_filelfn << "\", ";
00227
00228
00229 const char * jobId = getJobID();
00230 if (jobId) {
00231 os << "\"app_info\":\"" << jobId << "\", ";
00232 }
00233
00234 if (m_size >= 0) {
00235 os << "\"file_size\":" << m_size << ", ";
00236 }
00237
00238 m_filestats.fillUDP(os);
00239
00240 os << "}";
00241 udpinfo = os.str();
00242 }
00243
00244
00245
00246
00247
00248
00249
00250
00251
00252
00253
00254
00255
00256
00257
00258
00259
00260 static X509 * findEEC(STACK_OF(X509) * certstack) {
00261 int depth = sk_X509_num(certstack);
00262 if (depth == 0) {
00263 return nullptr;
00264 }
00265 int idx = depth-1;
00266 char *priorsubject = nullptr;
00267 char *subject = nullptr;
00268 X509 *x509cert = sk_X509_value(certstack, idx);
00269 for (; x509cert && idx>0; idx--) {
00270 subject = X509_NAME_oneline(X509_get_subject_name(x509cert),0,0);
00271 if (subject && priorsubject && (strncmp(subject, priorsubject, strlen(subject)) != 0)) {
00272 break;
00273 }
00274 x509cert = sk_X509_value(certstack, idx);
00275 if (subject) {
00276 OPENSSL_free(subject);
00277 subject = nullptr;
00278 }
00279 }
00280 if (subject) {
00281 OPENSSL_free(subject);
00282 subject = nullptr;
00283 }
00284 return x509cert;
00285 }
00286
00287 static bool
00288 getX509SubjectFromFile(const std::string &filename, std::string &result) {
00289 BIO *biof = nullptr;
00290 STACK_OF(X509) *certs = nullptr;
00291 char *subject = nullptr;
00292 unsigned char *data = nullptr;
00293 char *header = nullptr;
00294 char *name = nullptr;
00295 long len = 0U;
00296
00297 if((biof = BIO_new_file(filename.c_str(), "r"))) {
00298
00299 certs = sk_X509_new_null();
00300 bool encountered_error = false;
00301 while ((!encountered_error) && (!BIO_eof(biof)) && PEM_read_bio(biof, &name, &header, &data, &len)) {
00302 if (strcmp(name, PEM_STRING_X509) == 0 || strcmp(name, PEM_STRING_X509_OLD) == 0) {
00303 X509 * tmp_cert = nullptr;
00304
00305
00306 const unsigned char *p;
00307 p=data;
00308 tmp_cert = d2i_X509(&tmp_cert, &p, len);
00309 if (tmp_cert) {
00310 sk_X509_push(certs, tmp_cert);
00311 } else {
00312 encountered_error = true;
00313 }
00314 }
00315 if (data) { OPENSSL_free(data); data = nullptr;}
00316 if (header) { OPENSSL_free(header); header = nullptr;}
00317 if (name) { OPENSSL_free(name); name = nullptr;}
00318 }
00319 X509 *x509cert = nullptr;
00320 if (!encountered_error && sk_X509_num(certs)) {
00321 x509cert = findEEC(certs);
00322 }
00323 if (x509cert) {
00324 subject = X509_NAME_oneline(X509_get_subject_name(x509cert),0,0);
00325 }
00326
00327 if (certs) {
00328 sk_X509_pop_free(certs, X509_free);
00329 x509cert = nullptr;
00330 }
00331 BIO_free(biof);
00332 if (subject) {
00333 result = subject;
00334 OPENSSL_free(subject);
00335 return true;
00336 }
00337 }
00338 return false;
00339 }
00340
00341 bool
00342 StatisticsSenderService::getX509Subject(std::string &result) {
00343 char *filename = getenv("X509_USER_PROXY");
00344 if (filename && getX509SubjectFromFile(filename, result)) {
00345 return true;
00346 }
00347 std::stringstream ss;
00348 ss << "/tmp/x509up_u" << geteuid();
00349 return getX509SubjectFromFile(ss.str(), result);
00350 }