CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
cmsPerfSuiteHarvest.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #A script to "harverst" PerfSuite work directories, producing an xml file with all the data ready to be uploaded to the PerfSuiteDB DB.
3 import sys, os, re
4 import getopt
7 import Validation.Performance.cmssw_exportdb_xml as cmssw_exportdb_xml
8 from Validation.Performance.parserPerfsuiteMetadata import parserPerfsuiteMetadata
9 
11 import Validation.Performance.parserEdmSize as parserEdmSize
12 
13 import glob
14 from commands import getstatusoutput
15 
16 """ indicates whether the CMSSW is [use False] available or not. on our testing machine it's not [use True] """
17 _TEST_RUN = False
18 
19 """ global variables """
20 test_timing_report_log = re.compile("TimingReport.log$", re.IGNORECASE)
21 test_igprof_report_log = re.compile("^(.*)(IgProfMem|IgProfPerf)\.gz", re.IGNORECASE)
22 test_memcheck_report_log = re.compile("^(.*)memcheck_vlgd.xml", re.IGNORECASE)
23 
24 
25 xmldoc = minidom.Document()
26 release = None
27 steps = {}
28 candles = {}
29 pileups = {}
30 
31 def usage(argv):
32  script = argv[0]
33  return """
34  Usage: %(script)s [-v cmssw_version] [--version=cmssw_version]
35 
36  if the cmssw version is in the system's environment (after running cmsenv):
37  $ %(script)s
38 
39  otherwise one must specify the cmssw version:
40  $ %(script)s --version=CMSSW_3_2_0
41  $ %(script)s -v CMSSW_3_2_0
42 
43  """ % locals()
44 
45 def get_params(argv):
46  """
47  Returns the version of CMSSW to be used which it is taken from:
48  * command line parameter or
49  * environment variable
50  in case of error returns None
51 
52  And also the directory to put the xml files to: if none --> returns ""
53  """
54 
55  """ try to get the version for command line argument """
56  #print argv
57  #FIXME: this should be rewritten using getopt properly
58  version = None
59  #xml_dir = "cmsperfvm:/data/projects/conf/PerfSuiteDB/xml_dropbox" #Set this as default (assume change in write_xml to write to remote machines)
60  #NB write_xml is in Validation/Performance/python/cmssw_exportdb_xml.py
61  #Setting the default to write to a local directory:
62  xml_dir="PerfSuiteDBData"
63  try:
64  opts, args = getopt.getopt(argv[1:], "v:", ["version=", "outdir="])
65  except getopt.GetoptError, e:
66  print e
67  for opt, arg in opts:
68  if opt in ("-v", "--version"):
69  version = arg
70  if opt == "--outdir":
71  xml_dir = arg
72 
73  """ if not get it from environment string """
74  if not version:
75  try:
76  version = os.environ["CMSSW_VERSION"]
77  except KeyError:
78  pass
79 
80  return (version, xml_dir)
81 
82 def _eventContent_DEBUG(edm_report):
83  # for testing / information
84  EC_count = {}
85  if not _TEST_RUN:
86  # count the products in event-content's
87  for prod in edm_report:
89  for ec in ecs:
90  if not EC_count.has_key(ec):
91  EC_count[ec] = []
92  EC_count[ec].append(prod)
93  #print out the statistics
94  for (ec, prods) in EC_count.items():
95  print "==== %s EVENT CONTENT: have %d items, the listing is: ===" % (ec, len(prods))
96  # list of products
97  print "\n *".join(["%(cpp_type)s_%(module_name)s_%(module_label)s" % prod for prod in prods])
98 
99 
101  """ returns modified product by adding the event content relationship """
102 
103  if not _TEST_RUN:
104  product["event_content"] = ",".join(parseEventContent.List_ECs_forProduct(product))
105  return product
106 
107 
109  (sequenceWithModules, sequenceWithModulesString) =ModuleToSequenceAssign.assignModulesToSeqs()
110  return [{"name": seq, "modules": ",".join(modules)} for (seq, modules) in sequenceWithModulesString.items()]
111 
112 
113 def exportIgProfReport(path, igProfReport, igProfType, runinfo):
114  jobID = igProfReport["jobID"]
115  #print jobID
116  candleLong = os.path.split(path)[1].replace("_IgProf_Perf", "").replace("_IgProf_Mem", "").replace("_PU", "")
117  found = False
118  #print igProfType
119  if runinfo['TestResults'].has_key(igProfType):
120  for result in runinfo['TestResults'][igProfType]:
121  if candleLong == result["candle"] and jobID["pileup_type"] == result['pileup_type'] and jobID["conditions"] == result['conditions'] and jobID["event_content"] == result['event_content']:
122  jobID["candle"] = jobID["candle"].upper()
123  if not result.has_key("jobs"):
124  result['jobs'] = []
125  result['jobs'].append(igProfReport)
126  found = True
127  break
128 
129  if not found:
130  print "============ (almost) ERROR: NOT FOUND THE ENTRY in cmsPerfSuite.log, exporting as separate entry ======== "
131  print "JOB ID: %s " % str(jobID)
132  print " ====================== "
133  runinfo['unrecognized_jobs'].append(igProfReport)
134  #export_xml(xml_doc = xmldoc, **igProfReport)
135 
136 
137 def exportTimeSizeJob(path, timeSizeReport, runinfo):
138  candleLong = os.path.split(path)[1].replace("_TimeSize", "").replace("_PU", "")
139  jobID = timeSizeReport["jobID"]
140 
141  #search for a run Test to which could belong our JOB
142  found = False
143  if runinfo['TestResults'].has_key('TimeSize'):
144  for result in runinfo['TestResults']['TimeSize']:
145  #print result
146  """ If this is the testResult which fits TimeSize job """
147  #TODO: we do not check teh step when assigning because of the different names, check if this is really OK. make a decission which step name to use later, long or short one
148  #and jobID["step"] in result['steps'].split(parserPerfsuiteMetadata._LINE_SEPARATOR)
149  if result['candle'] == candleLong and jobID["pileup_type"] == result['pileup_type'] and jobID["conditions"] == result['conditions'] and jobID["event_content"] == result['event_content']:
150  #print result
151  if not result.has_key("jobs"):
152  result['jobs'] = []
153  result['jobs'].append(timeSizeReport)
154  found = True
155  break
156 
157  if not found:
158  print "============ (almost) ERROR: NOT FOUND THE ENTRY in cmsPerfSuite.log, exporting as separate entry ======== "
159  print "JOB ID: %s " % str(jobID)
160  print " ====================== "
161  runinfo['unrecognized_jobs'].append(timeSizeReport)
162  #export_xml(xml_doc = xmldoc, **timeSizeReport)
163 
164 def exportMemcheckReport(path, MemcheckReport, runinfo):
165  candleLong = os.path.split(path)[1].replace("_Memcheck", "").replace("_PU", "")
166  jobID = MemcheckReport["jobID"]
167 
168  #search for a run Test to which could belong our JOB
169  found = False
170  if runinfo['TestResults'].has_key('Memcheck'):
171  for result in runinfo['TestResults']['Memcheck']:
172  #print result
173  #print jobID
174  """ If this is the testResult which fits Memcheck job """
175  #TODO: we do not check teh step when assigning because of the different names, check if this is really OK. make a decission which step name to use later, long or short one
176  #and jobID["step"] in result['steps'].split(parserPerfsuiteMetadata._LINE_SEPARATOR)
177  if result['candle'] == candleLong and jobID["pileup_type"] == result['pileup_type'] and jobID["conditions"] == result['conditions'] and jobID["event_content"] == result['event_content']:
178  #print result
179  if not result.has_key("jobs"):
180  result['jobs'] = []
181  result['jobs'].append(MemcheckReport)
182  found = True
183  break
184 
185  if not found:
186  print "============ (almost) ERROR: NOT FOUND THE ENTRY in cmsPerfSuite.log, exporting as separate entry ======== "
187  print "JOB ID: %s " % str(jobID)
188  print " ====================== "
189  runinfo['unrecognized_jobs'].append(MemcheckReport)
190 
191 def process_timesize_dir(path, runinfo):
192  global release,event_content,conditions
193  """ if the release is not provided explicitly we take it from the Simulation candles file """
194  if (not release):
195  release_fromlogfile = read_SimulationCandles(path)
196  release = release_fromlogfile
197  print "release from simulation candles: %s" % release
198 
199  if (not release):
200  # TODO: raise exception!
201  raise Exception("the release was not found!")
202 
203 
204  """ process the TimingReport log files """
205 
206  # get the file list
207  files = os.listdir(path)
208  timing_report_files = [os.path.join(path, f) for f in files
209  if test_timing_report_log.search(f)
210  and os.path.isfile(os.path.join(path, f)) ]
211 
212  # print timing_report_files
213  for timelog_f in timing_report_files:
214  print "\nProcessing file: %s" % timelog_f
215  print "------- "
216 
217  jobID = getJobID_fromTimeReportLogName(os.path.join(path, timelog_f))
218  print "jobID: %s" % str(jobID)
219  (candle, step, pileup_type, conditions, event_content) = jobID
220  jobID = dict(zip(("candle", "step", "pileup_type", "conditions", "event_content"), jobID))
221  print "Dictionary based jobID %s: " % str(jobID)
222 
223  #if any of jobID fields except (isPILEUP) is empty we discard the job as all those are the jobID keys and we must have them
224  discard = len([key for key, value in jobID.items() if key != "pileup_type" and not value])
225  if discard:
226  print " ====================== The job HAS BEEN DISCARDED =============== "
227  print " NOT ALL DATA WAS AVAILABLE "
228  print " JOB ID = %s " % str(jobID)
229  print " ======================= end ===================================== "
230  continue
231 
232  # TODO: automaticaly detect type of report file!!!
233  (mod_timelog, evt_timelog, rss_data, vsize_data) =loadTimeLog(timelog_f)
234 
235  mod_timelog= processModuleTimeLogData(mod_timelog, groupBy = "module_name")
236  print "Number of modules grouped by (module_label+module_name): %s" % len(mod_timelog)
237 
238  # add to the list to generate the readable filename :)
239  steps[step] = 1
240  candles[candle] = 1
241  if pileup_type=="":
242  pileups["NoPileUp"]=1
243  else:
244  pileups[pileup_type] = 1
245 
246  # root file size (number)
247  root_file_size = getRootFileSize(path = path, candle = candle, step = step.replace(':', '='))
248  # number of events
249  num_events = read_ConfigurationFromSimulationCandles(path = path, step = step, is_pileup = pileup_type)["num_events"]
250 
251  #EdmSize
252  edm_report = parserEdmSize.getEdmReport(path = path, candle = candle, step = step)
253  if edm_report != False:
254  try:
255  # add event content data
256  edm_report = map(assign_event_content_for_product, edm_report)
257  # for testing / imformation
258  _eventContent_DEBUG(edm_report)
259  except Exception, e:
260  print e
261 
262  timeSizeReport = {
263  "jobID":jobID,
264  "release": release,
265  "timelog_result": (mod_timelog, evt_timelog, rss_data, vsize_data),
266  "metadata": {"testname": "TimeSize", "root_file_size": root_file_size, "num_events": num_events},
267  "edmSize_result": edm_report
268  }
269 
270  # export to xml: actualy exporting gets suspended and put into runinfo
271  exportTimeSizeJob(path, timeSizeReport, runinfo)
272 
273 def process_memcheck_dir(path, runinfo):
274  global release,event_content,conditions
275  """ if the release is not provided explicitly we take it from the Simulation candles file """
276  if (not release):
277  release_fromlogfile = read_SimulationCandles(path)
278  release = release_fromlogfile
279  print "release from simulation candles: %s" % release
280 
281  if (not release):
282  # TODO: raise exception!
283  raise Exception("the release was not found!")
284 
285  """ process the vlgd files """
286 
287  # get the file list
288  files = os.listdir(path)
289  memcheck_files = [os.path.join(path, f) for f in files
290  if test_memcheck_report_log.search(f)
291  and os.path.isfile(os.path.join(path, f)) ]
292 
293  if len(memcheck_files) == 0: # Fast protection for old runs, where the _vlgd files is not created...
294  print "No _vlgd files found!"
295  else:
296  for file in memcheck_files:
297  jobID = getJobID_fromMemcheckLogName(os.path.join(path, file))
298 
299  (candle, step, pileup_type, conditions, event_content) = jobID
300 
301  print "jobID: %s" % str(jobID)
302  jobID = dict(zip(("candle", "step", "pileup_type", "conditions", "event_content"), jobID))
303 
304  print "Dictionary based jobID %s: " % str(jobID)
305 
306  #if any of jobID fields except (isPILEUP) is empty we discard the job as all those are the jobID keys and we must have them
307  discard = len([key for key, value in jobID.items() if key != "pileup_type" and not value])
308  if discard:
309  print " ====================== The job HAS BEEN DISCARDED =============== "
310  print " NOT ALL DATA WAS AVAILABLE "
311  print " JOB ID = %s " % str(jobID)
312  print " ======================= end ===================================== "
313  continue
314 
315  # add to the list to generate the readable filename :)
316  steps[step] = 1
317  candles[candle.upper()] = 1
318  if pileup_type=="":
319  pileups["NoPileUp"]=1
320  else:
321  pileups[pileup_type] = 1
322 
323  memerror = getMemcheckError(path)
324 
325  MemcheckReport = {
326  "jobID": jobID,
327  "release": release,
328  "memcheck_errors": {"error_num": memerror},
329  "metadata": {"testname": "Memcheck"},
330  }
331 
332  # export to xml: actualy exporting gets suspended and put into runinfo
333  exportMemcheckReport(path, MemcheckReport, runinfo)
334 
336  globbed = glob.glob(os.path.join(path, "*memcheck_vlgd.xml"))
337 
338  errnum = 0
339 
340  for f in globbed:
341  #print f
342  cmd = "grep '<error>' "+f+ " | wc -l "
343  p = os.popen(cmd, 'r')
344  errnum += int(p.readlines()[0])
345 
346  return errnum
347 
348 
349 def process_igprof_dir(path, runinfo):
350  global release,event_content,conditions
351  """ if the release is not provided explicitly we take it from the Simulation candles file """
352  if (not release):
353  release_fromlogfile = read_SimulationCandles(path)
354  release = release_fromlogfile
355  print "release from simulation candles: %s" % release
356 
357  if (not release):
358  # TODO: raise exception!
359  raise Exception("the release was not found!")
360 
361  """ process the IgProf sql3 files """
362 
363  # get the file list
364  files = os.listdir(path)
365  igprof_files = [os.path.join(path, f) for f in files
366  if test_igprof_report_log.search(f)
367  and os.path.isfile(os.path.join(path, f)) ]
368 
369  if len(igprof_files) == 0: # No files...
370  print "No igprof files found!"
371  else:
372  for file in igprof_files:
373  jobID = getJobID_fromIgProfLogName(file)
374 
375  (candle, step, pileup_type, conditions, event_content) = jobID
376 
377  print "jobID: %s" % str(jobID)
378  jobID = dict(zip(("candle", "step", "pileup_type", "conditions", "event_content"), jobID))
379 
380  print "Dictionary based jobID %s: " % str(jobID)
381 
382  igProfType = path.split("/")[-1].replace("TTbar_", "").replace("MinBias_", "").replace("PU_", "")
383 
384  #if any of jobID fields except (isPILEUP) is empty we discard the job as all those are the jobID keys and we must have them
385  discard = len([key for key, value in jobID.items() if key != "pileup_type" and not value])
386  if discard:
387  print " ====================== The job HAS BEEN DISCARDED =============== "
388  print " NOT ALL DATA WAS AVAILABLE "
389  print " JOB ID = %s " % str(jobID)
390  print " ======================= end ===================================== "
391  continue
392 
393  # add to the list to generate the readable filename :)
394  steps[step] = 1
395  candles[candle.upper()] = 1
396  if pileup_type=="":
397  pileups["NoPileUp"]=1
398  else:
399  pileups[pileup_type] = 1
400 
401  igs = getIgSummary(path)
402  #print igs
403 
404  igProfReport = {
405  "jobID": jobID,
406  "release": release,
407  "igprof_result": igs,
408  "metadata": {"testname": igProfType},
409  }
410 
411  # print igProfReport
412  # export to xml: actualy exporting gets suspended and put into runinfo
413  exportIgProfReport(path, igProfReport, igProfType, runinfo)
414 
415 #get IgProf summary information from the sql3 files
416 def getIgSummary(path):
417  igresult = []
418  globbed = glob.glob(os.path.join(path, "*.sql3"))
419 
420  for f in globbed:
421  #print f
422  profileInfo = getSummaryInfo(f)
423  if not profileInfo:
424  continue
425  cumCounts, cumCalls = profileInfo
426  dump, architecture, release, rest = f.rsplit("/", 3)
427  candle, sequence, pileup, conditions, process, counterType, events = rest.split("___")
428  events = events.replace(".sql3", "")
429  igresult.append({"counter_type": counterType, "event": events, "cumcounts": cumCounts, "cumcalls": cumCalls})
430 
431  #fail-safe(nasty) fix for the diff (even if it gets fixed in the sqls, won't screw this up again...)
432  for ig in igresult:
433  if 'diff' in ig['event']:
434  eventLast,eventOne = ig['event'].split('_diff_')
435  for part in igresult:
436  if part['counter_type'] == ig['counter_type'] and part['event'] == eventOne:
437  cumcountsOne = part['cumcounts']
438  cumcallsOne = part['cumcalls']
439  if part['counter_type'] == ig['counter_type'] and part['event'] == eventLast:
440  cumcountsLast = part['cumcounts']
441  cumcallsLast = part['cumcalls']
442  ig['cumcounts'] = cumcountsLast - cumcountsOne
443  ig['cumcalls'] = cumcallsLast - cumcallsOne
444 
445  return igresult
446 
447 def getSummaryInfo(database):
448  summary_query="""SELECT counter, total_count, total_freq, tick_period
449  FROM summary;"""
450  error, output = doQuery(summary_query, database)
451  if error or not output or output.count("\n") > 1:
452  return None
453  counter, total_count, total_freq, tick_period = output.split("@@@")
454  if counter == "PERF_TICKS":
455  return float(tick_period) * float(total_count), int(total_freq)
456  else:
457  return int(total_count), int(total_freq)
458 
459 def doQuery(query, database):
460  if os.path.exists("/usr/bin/sqlite3"):
461  sqlite="/usr/bin/sqlite3"
462  else:
463  sqlite="/afs/cern.ch/user/e/eulisse/www/bin/sqlite"
464  return getstatusoutput("echo '%s' | %s -separator @@@ %s" % (query, sqlite, database))
465 
466 #TimeSize
467 def searchTimeSizeFiles(runinfo):
468  """ so far we will use the current dir to search in """
469  path = os.getcwd()
470  #print path
471  print 'full path =', os.path.abspath(path)
472 
473  files = os.listdir(path)
474 
475  test_timeSizeDirs = re.compile("_TimeSize$", re.IGNORECASE)
476  timesize_dirs = [os.path.join(path, f) for f in files if test_timeSizeDirs.search(f) and os.path.isdir(os.path.join(path, f))]
477 
478  for timesize_dir in timesize_dirs:
479  # print timesize_dir
480  process_timesize_dir(timesize_dir, runinfo)
481 
482 #Memcheck
483 def searchMemcheckFiles(runinfo):
484  """ so far we will use the current dir to search in """
485  path = os.getcwd()
486  #print path
487  print 'full path =', os.path.abspath(path)
488 
489  files = os.listdir(path)
490 
491  test_MemcheckDirs = re.compile("_Memcheck(.*)$", re.IGNORECASE)
492  memcheck_dirs = [os.path.join(path, f) for f in files if test_MemcheckDirs.search(f) and os.path.isdir(os.path.join(path, f))]
493 
494  for memcheck_dir in memcheck_dirs:
495  print memcheck_dir
496  process_memcheck_dir(memcheck_dir, runinfo)
497 
498 #IgProf
499 def searchIgProfFiles(runinfo):
500  """ so far we will use the current dir to search in """
501  path = os.getcwd()
502  #print path
503  print 'full path =', os.path.abspath(path)
504 
505  files = os.listdir(path)
506 
507  test_IgProfDirs = re.compile("_IgProf(.*)$", re.IGNORECASE)
508  igprof_dirs = [os.path.join(path, f) for f in files if test_IgProfDirs.search(f) and os.path.isdir(os.path.join(path, f))]
509 
510  for igprof_dir in igprof_dirs:
511  print igprof_dir
512  process_igprof_dir(igprof_dir, runinfo)
513 
515  """ Exports the sequences to XML Doc """
516  try:
517  env_cmssw_version = os.environ["CMSSW_VERSION"]
518  except KeyError:
519  print "<<<<< ====== Error: cannot get CMSSW version [just integrity check for sequences]. \
520  Is the CMSSW environment initialized? (use cmsenv) ==== >>>>"
521  env_cmssw_version = None
522 
523  print " ==== exporting the sequences. loading files for currently loaded CMSSW version: %s, while the CMSSW we are currently harversting is %s ===" %(env_cmssw_version, release)
524  xml_export_Sequences(xml_doc = xmldoc, sequences = get_modules_sequences_relationships(), release=release)
525 
526 
527 
528 if __name__ == "__main__":
529  #searchFiles()
530  #TO DO:
531  #Use option parser! This is messy.
532 
533  (release, output_dir) = get_params(sys.argv)
534 
535  if not release:
536  """ print usage(sys.argv)
537  sys.exit(2) """
538  print "The version was not provided explicitly, will try to get one from SimulationCandles file """
539 
540 
541  # Export the metadata from cmsPerfSuite.log (in current working directory!)
542  print "Parsing cmsPerfSuite.log: getting all the metadata concerning the run"
543  p = parserPerfsuiteMetadata(os.getcwd())
544  run_info = p.parseAll()
545 
546  print "Loading Sequences and Event-Content(s). Please wait..."
547 
548  Sequences_OK = False
549  EventContents_OK = False
550 
551  if not _TEST_RUN:
552  try:
553  import Validation.Performance.ModuleToSequenceAssign as ModuleToSequenceAssign
554  Sequences_OK = True
555  except Exception, e:
556  print e
557  try:
558  import Validation.Performance.parseEventContent as parseEventContent
559  EventContents_OK = True
560  except Exception, e:
561  print e
562 
563  print "Parsing TimeSize report"
564  # Search for TimeSize files: EdmSize, TimingReport
565  searchTimeSizeFiles(run_info)
566  print "Parsing IgProf report"
567  # Search for IgProf files
568  searchIgProfFiles(run_info)
569  print "Parsing Memcheck report"
570  # Search for Memcheck files
571  searchMemcheckFiles(run_info)
572  #print run_info
573 
574  print "Exporting sequences and event-content rules"
575  if not _TEST_RUN:
576  """ for testing on laptom we have no CMSSW """
577  # export sequences (for currently loaded CMSSW)
578  if Sequences_OK:
579  exportSequences()
580 
581  if EventContents_OK:
582  # export event's content rules
583  eventContentRules = parseEventContent.getTxtEventContentRules()
584  cmssw_exportdb_xml.exportECRules(xmldoc, eventContentRules)
585 
586 
587  cmssw_exportdb_xml.exportRunInfo(xmldoc, run_info, release = release)
588  #save the XML file, TODO: change fileName after introducting the JobID
589  import datetime
590  now = datetime.datetime.now()
591  #Changing slightly the XML filename format
592  #FIXME: review this convention and archive the xml in a separate CASTOR xml directory for quick recovery of DB...
593  file_name = "%s___%s___%s___%s___%s___%s___%s.xml" % (release, "_".join(steps.keys()), "_".join(candles.keys()), "_".join(pileups.keys()),event_content,conditions,now.isoformat())
594  print "Writing the output to: %s " % file_name
595 
596  write_xml(xmldoc, output_dir, file_name) #change this function to be able to handle directories in remote machines (via tar pipes for now could always revert to rsync later).
597  #NB write_xml is in Validation/Performance/python/cmssw_exportdb_xml.py
def getJobID_fromTimeReportLogName
def processModuleTimeLogData
mod_data[&quot;stats&quot;] =calc_MinMaxAvgRMS(f_time = lambda x: x[&quot;time&quot;], f_evt_num = lambda x: x[&quot;event_num...
def replace
Definition: linker.py:10
dictionary map
Definition: Association.py:205
def read_ConfigurationFromSimulationCandles
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
double split
Definition: MVATrainer.cc:139
def getJobID_fromMemcheckLogName