CMS 3D CMS Logo

/data/refman/pasoursint/CMSSW_6_1_2_SLHC4_patch1/src/Utilities/StorageFactory/src/StatisticsSenderService.cc

Go to the documentation of this file.
00001 
00002 #include "Utilities/StorageFactory/interface/StatisticsSenderService.h"
00003 #include "Utilities/StorageFactory/interface/StorageAccount.h"
00004 #include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
00005 #include "FWCore/Catalog/interface/SiteLocalConfig.h"
00006 #include "FWCore/ServiceRegistry/interface/Service.h"
00007 #include "FWCore/Utilities/src/Guid.h"
00008 
00009 #include <string>
00010 
00011 #include <unistd.h>
00012 #include <fcntl.h>
00013 #include <string.h>
00014 
00015 #include <openssl/x509.h>
00016 #include <openssl/pem.h>
00017 
00018 #define UPDATE_STATISTIC(x) \
00019     m_ ## x = x;
00020 
00021 #define UPDATE_AND_OUTPUT_STATISTIC(x) \
00022     os << "\"" #x "\":" << (x-m_ ## x) << ", "; \
00023     UPDATE_STATISTIC(x)
00024 
00025 // Simple hack to define HOST_NAME_MAX on Mac.
00026 // Allows arrays to be statically allocated
00027 #ifndef HOST_NAME_MAX
00028 #define HOST_NAME_MAX 128
00029 #endif
00030 
00031 #define JOB_UNIQUE_ID_ENV "CRAB_UNIQUE_JOB_ID"
00032 #define JOB_UNIQUE_ID_ENV_V2 "DashboardJobId"
00033 
00034 using namespace edm::storage;
00035 
00036 StatisticsSenderService::FileStatistics::FileStatistics() :
00037   m_read_single_operations(0),
00038   m_read_single_bytes(0),
00039   m_read_single_square(0),
00040   m_read_vector_operations(0),
00041   m_read_vector_bytes(0),
00042   m_read_vector_square(0),
00043   m_read_vector_count_sum(0),
00044   m_read_vector_count_square(0),
00045   m_read_bytes_at_close(0),
00046   m_start_time(time(NULL))
00047 {}
00048 
00049 void
00050 StatisticsSenderService::FileStatistics::fillUDP(std::ostringstream &os) {
00051   const StorageAccount::StorageStats &stats = StorageAccount::summary();
00052   ssize_t read_single_operations = 0;
00053   ssize_t read_single_bytes = 0;
00054   ssize_t read_single_square = 0;
00055   ssize_t read_vector_operations = 0;
00056   ssize_t read_vector_bytes = 0;
00057   ssize_t read_vector_square = 0;
00058   ssize_t read_vector_count_sum = 0;
00059   ssize_t read_vector_count_square = 0;
00060   for (StorageAccount::StorageStats::const_iterator i = stats.begin (); i != stats.end(); ++i) {
00061     if (i->first == "tstoragefile") {
00062       continue;
00063     }
00064     for (StorageAccount::OperationStats::const_iterator j = i->second->begin(); j != i->second->end(); ++j) {
00065       if (j->first == "readv") {
00066         read_vector_operations += j->second.attempts;
00067         read_vector_bytes += j->second.amount;
00068         read_vector_count_square += j->second.vector_square;
00069         read_vector_square += j->second.amount_square;
00070         read_vector_count_sum += j->second.vector_count;
00071       } else if (j->first == "read") {
00072         read_single_operations += j->second.attempts;
00073         read_single_bytes += j->second.amount;
00074         read_single_square += j->second.amount_square;
00075       }
00076     }
00077   }
00078   int64_t single_op_count = read_single_operations - m_read_single_operations;
00079   if (single_op_count > 0) {
00080     double single_sum = read_single_bytes-m_read_single_bytes;
00081     double single_average = single_sum/static_cast<double>(single_op_count);
00082     os << "\"read_single_sigma\":" << sqrt((static_cast<double>(read_single_square-m_read_single_square) - single_average*single_average*single_op_count)/static_cast<double>(single_op_count)) << ", ";
00083     os << "\"read_single_average\":" << single_average << ", ";
00084   }
00085   m_read_single_square = read_single_square;
00086   int64_t vector_op_count = read_vector_operations - m_read_vector_operations;
00087   if (vector_op_count > 0) {
00088     double vector_average = static_cast<double>(read_vector_bytes-m_read_vector_bytes)/static_cast<double>(vector_op_count);
00089     os << "\"read_vector_average\":" << vector_average << ", ";
00090     os << "\"read_vector_sigma\":" << sqrt((static_cast<double>(read_vector_square-m_read_vector_square) - vector_average*vector_average*vector_op_count)/static_cast<double>(vector_op_count)) << ", ";
00091     double vector_count_average = static_cast<double>(read_vector_count_sum-m_read_vector_count_sum)/static_cast<double>(vector_op_count);
00092     os << "\"read_vector_count_average\":" << vector_count_average << ", ";
00093     os << "\"read_vector_count_sigma\":" << sqrt((static_cast<double>(read_vector_count_square-m_read_vector_count_square) - vector_count_average*vector_count_average*vector_op_count)/static_cast<double>(vector_op_count)) << ", ";
00094   }
00095   m_read_vector_square = read_vector_square;
00096   m_read_vector_count_square = read_vector_count_square;
00097   m_read_vector_count_sum = read_vector_count_sum;
00098 
00099   os << "\"read_bytes\":" << (read_vector_bytes + read_single_bytes - m_read_vector_bytes - m_read_single_bytes) << ", ";
00100   os << "\"read_bytes_at_close\":" << (read_vector_bytes + read_single_bytes - m_read_vector_bytes - m_read_single_bytes) << ", ";
00101 
00102   // See top of file for macros; not complex, just avoiding copy/paste
00103   UPDATE_AND_OUTPUT_STATISTIC(read_single_operations)
00104   UPDATE_AND_OUTPUT_STATISTIC(read_single_bytes)
00105   UPDATE_AND_OUTPUT_STATISTIC(read_vector_operations)
00106   UPDATE_AND_OUTPUT_STATISTIC(read_vector_bytes)
00107 
00108   os << "\"start_time\":" << m_start_time << ", ";
00109   m_start_time = time(NULL);
00110   // NOTE: last entry doesn't have the trailing comma.
00111   os << "\"end_time\":" << m_start_time;
00112 }
00113 
00114 StatisticsSenderService::StatisticsSenderService(edm::ParameterSet const& /*pset*/, edm::ActivityRegistry& ar) :
00115   m_clienthost("unknown"),
00116   m_clientdomain("unknown"),
00117   m_serverhost("unknown"),
00118   m_serverdomain("unknown"),
00119   m_filelfn("unknown"),
00120   m_filestats(),
00121   m_guid(Guid().toString()),
00122   m_counter(0),
00123   m_size(-1),
00124   m_userdn("unknown")
00125 {
00126   determineHostnames();
00127   ar.watchPreCloseFile(this, &StatisticsSenderService::filePreCloseEvent);
00128   if (!getX509Subject(m_userdn)) {
00129     m_userdn = "unknown";
00130   }
00131 }
00132 
00133 const char *
00134 StatisticsSenderService::getJobID() {
00135   const char * id = getenv(JOB_UNIQUE_ID_ENV);
00136   // Dashboard developers requested that we migrate to this environment variable.
00137   return id ? id : getenv(JOB_UNIQUE_ID_ENV_V2);
00138 }
00139 
00140 void
00141 StatisticsSenderService::setCurrentServer(const std::string &servername) {
00142   size_t dot_pos = servername.find(".");
00143   if (dot_pos == std::string::npos) {
00144     m_serverhost = servername.substr(0, servername.find(":"));
00145     m_serverdomain = "unknown";
00146   } else {
00147     m_serverhost = servername.substr(0, dot_pos);
00148     m_serverdomain = servername.substr(dot_pos+1, servername.find(":")-dot_pos-1);
00149     if (m_serverdomain.empty()) {
00150       m_serverdomain = "unknown";
00151     }
00152   }
00153 }
00154 
00155 void
00156 StatisticsSenderService::setSize(size_t size) {
00157   m_size = size;
00158 }
00159 
00160 void
00161 StatisticsSenderService::filePreCloseEvent(std::string const& lfn, bool usedFallback) {
00162   m_filelfn = lfn;
00163 
00164   edm::Service<edm::SiteLocalConfig> pSLC;
00165   if (!pSLC.isAvailable()) {
00166     return;
00167   }
00168 
00169   const struct addrinfo * addresses = pSLC->statisticsDestination();
00170   if (!addresses) {
00171     return;
00172   }
00173 
00174   std::string results;
00175   fillUDP(pSLC->siteName(), usedFallback, results);
00176 
00177   for (const struct addrinfo *address = addresses; address != nullptr; address = address->ai_next) {
00178     int sock = socket(address->ai_family, address->ai_socktype, address->ai_protocol);
00179     if (sock < 0) {
00180       continue;
00181     }
00182     if (sendto(sock, results.c_str(), results.size(), 0, address->ai_addr, address->ai_addrlen) >= 0) {
00183       break; 
00184     }
00185   }
00186 
00187   m_counter++;
00188 }
00189 
00190 void
00191 StatisticsSenderService::determineHostnames(void) {
00192   char tmpName[HOST_NAME_MAX];
00193   if (gethostname(tmpName, HOST_NAME_MAX) != 0) {
00194     // Sigh, no way to log errors from here.
00195     m_clienthost = "unknown";
00196   } else {
00197     m_clienthost = tmpName;
00198   }
00199   size_t dot_pos = m_clienthost.find(".");
00200   if (dot_pos == std::string::npos) {
00201     m_clientdomain = "unknown";
00202   } else {
00203     m_clientdomain = m_clienthost.substr(dot_pos+1, m_clienthost.size()-dot_pos-1);
00204     m_clienthost = m_clienthost.substr(0, dot_pos);
00205   }
00206 }
00207 
00208 void
00209 StatisticsSenderService::fillUDP(const std::string& siteName, bool usedFallback, std::string &udpinfo) {
00210   std::ostringstream os;
00211 
00212   // Header - same for all IO accesses
00213   os << "{";
00214   if (!siteName.empty()) {
00215     os << "\"site_name\":\"" << siteName << "\", ";
00216   }
00217   if (usedFallback) {
00218     os << "\"fallback\": true, ";
00219   }
00220   os << "\"user_dn\":\"" << m_userdn << "\", ";
00221   os << "\"client_host\":\"" << m_clienthost << "\", ";
00222   os << "\"client_domain\":\"" << m_clientdomain << "\", ";
00223   os << "\"server_host\":\"" << m_serverhost << "\", ";
00224   os << "\"server_domain\":\"" << m_serverdomain << "\", ";
00225   os << "\"unique_id\":\"" << m_guid << "-" << m_counter << "\", ";
00226   os << "\"file_lfn\":\"" << m_filelfn << "\", ";
00227   // Dashboard devs requested that we send out no app_info if a job ID
00228   // is not present in the environment.
00229   const char * jobId = getJobID();
00230   if (jobId) {
00231     os << "\"app_info\":\"" << jobId << "\", ";
00232   }
00233 
00234   if (m_size >= 0) {
00235     os << "\"file_size\":" << m_size << ", ";
00236   }
00237 
00238   m_filestats.fillUDP(os);
00239 
00240   os << "}";
00241   udpinfo = os.str();
00242 }
00243 
00244 /*
00245  * Pull the X509 user subject from the environment.
00246  * Based on initial code from the Frontier client:
00247  *   http://cdcvs.fnal.gov/cgi-bin/public-cvs/cvsweb-public.cgi/~checkout~/frontier/client/frontier.c?rev=1.57&content-type=text/plain
00248  * This was further extended by walking up the returned chain similar to the Globus function
00249  *   globus_gsi_cert_utils-6.6/library/globus_gsi_cert_utils.c:globus_gsi_cert_utils_get_eec
00250  *   globus_gsi_credential-3.5/library/globus_gsi_credential.c:globus_gsi_cred_read_proxy_bio
00251  */
00252 
00253 /* 
00254  * Given a stack of x509 proxies, take a guess at the EEC.
00255  * Assumes the proxies are in reverse sorted order and looks for the first
00256  * proxy which is not a substring of the prior proxy.
00257  * THIS DOES NOT VERIFY THE RESULTS, and is a best-effort GUESS.
00258  * Again, DO NOT REUSE THIS CODE THINKING IT VERIFIES THE CHAIN!
00259  */
00260 static X509 * findEEC(STACK_OF(X509) * certstack) {
00261   int depth = sk_X509_num(certstack);
00262   if (depth == 0) {
00263     return nullptr;
00264   }
00265   int idx = depth-1;
00266   char *priorsubject = nullptr;
00267   char *subject = nullptr;
00268   X509 *x509cert = sk_X509_value(certstack, idx);
00269   for (; x509cert && idx>0; idx--) {
00270     subject = X509_NAME_oneline(X509_get_subject_name(x509cert),0,0);
00271     if (subject && priorsubject && (strncmp(subject, priorsubject, strlen(subject)) != 0)) {
00272       break;
00273     }
00274     x509cert = sk_X509_value(certstack, idx);
00275     if (subject) {
00276       OPENSSL_free(subject);
00277       subject = nullptr;
00278     }
00279   }
00280   if (subject) {
00281     OPENSSL_free(subject);
00282     subject = nullptr;
00283   }
00284   return x509cert;
00285 }
00286 
00287 static bool
00288 getX509SubjectFromFile(const std::string &filename, std::string &result) {
00289   BIO *biof = nullptr;
00290   STACK_OF(X509) *certs = nullptr;
00291   char *subject = nullptr;
00292   unsigned char *data = nullptr;
00293   char *header = nullptr;
00294   char *name = nullptr;
00295   long len = 0U;
00296 
00297   if((biof = BIO_new_file(filename.c_str(), "r")))  {
00298 
00299     certs = sk_X509_new_null();
00300     bool encountered_error = false;
00301     while ((!encountered_error) && (!BIO_eof(biof)) && PEM_read_bio(biof, &name, &header, &data, &len)) {
00302       if (strcmp(name, PEM_STRING_X509) == 0 || strcmp(name, PEM_STRING_X509_OLD) == 0) {
00303         X509 * tmp_cert = nullptr;
00304         // See WARNINGS section in http://www.openssl.org/docs/crypto/d2i_X509.html
00305         // Without this cmsRun crashes on a mac with a valid grid proxy.
00306         const unsigned char *p;
00307         p=data;
00308         tmp_cert = d2i_X509(&tmp_cert, &p, len);
00309         if (tmp_cert) {
00310           sk_X509_push(certs, tmp_cert);
00311         } else {
00312           encountered_error = true;
00313         }
00314       } // Note we ignore any proxy key in the file.
00315       if (data) { OPENSSL_free(data); data = nullptr;}
00316       if (header) { OPENSSL_free(header); header = nullptr;}
00317       if (name) { OPENSSL_free(name); name = nullptr;}
00318     }
00319     X509 *x509cert = nullptr;
00320     if (!encountered_error && sk_X509_num(certs)) {
00321       x509cert = findEEC(certs);
00322     }
00323     if (x509cert) {
00324       subject = X509_NAME_oneline(X509_get_subject_name(x509cert),0,0);
00325     }
00326     // Note we do not free x509cert directly, as it's still owned by the certs stack.
00327     if (certs) {
00328       sk_X509_pop_free(certs, X509_free);
00329       x509cert = nullptr;
00330     }
00331     BIO_free(biof);
00332     if (subject) {
00333       result = subject;
00334       OPENSSL_free(subject);
00335      return true;
00336     }
00337   }
00338   return false;
00339 }
00340 
00341 bool
00342 StatisticsSenderService::getX509Subject(std::string &result) {
00343   char *filename = getenv("X509_USER_PROXY");
00344   if (filename && getX509SubjectFromFile(filename, result)) {
00345     return true;
00346   }
00347   std::stringstream ss;
00348   ss << "/tmp/x509up_u" << geteuid();
00349   return getX509SubjectFromFile(ss.str(), result);
00350 }