CMS 3D CMS Logo

runTheMatrix.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 from __future__ import print_function
3 import sys, os
4 
5 from Configuration.PyReleaseValidation.MatrixReader import MatrixReader
6 from Configuration.PyReleaseValidation.MatrixRunner import MatrixRunner
7 from Configuration.PyReleaseValidation.MatrixInjector import MatrixInjector,performInjectionOptionTest
8 
9 # ================================================================================
10 
11 def showRaw(opt):
12 
13  mrd = MatrixReader(opt)
14  mrd.showRaw(opt.useInput, opt.refRel, opt.fromScratch, opt.raw, opt.step1Only, selected=opt.testList)
15 
16  return 0
17 
18 # ================================================================================
19 
20 def runSelected(opt):
21 
22  mrd = MatrixReader(opt)
23  mrd.prepare(opt.useInput, opt.refRel, opt.fromScratch)
24 
25  # test for wrong input workflows
26  if opt.testList:
27  definedWf = [dwf.numId for dwf in mrd.workFlows]
28  definedSet = set(definedWf)
29  testSet = set(opt.testList)
30  undefSet = testSet - definedSet
31  if len(undefSet)>0: raise ValueError('Undefined workflows: '+', '.join(map(str,list(undefSet))))
32  duplicates = [wf for wf in testSet if definedWf.count(wf)>1 ]
33  if len(duplicates)>0: raise ValueError('Duplicated workflows: '+', '.join(map(str,list(duplicates))))
34 
35  ret = 0
36  if opt.show:
37  mrd.show(opt.testList, opt.extended, opt.cafVeto)
38  if opt.testList : print('testListected items:', opt.testList)
39  else:
40  mRunnerHi = MatrixRunner(mrd.workFlows, opt.nProcs, opt.nThreads)
41  ret = mRunnerHi.runTests(opt)
42 
43  if opt.wmcontrol:
44  if ret!=0:
45  print('Cannot go on with wmagent injection with failing workflows')
46  else:
47  wfInjector = MatrixInjector(opt,mode=opt.wmcontrol,options=opt.wmoptions)
48  ret= wfInjector.prepare(mrd,
49  mRunnerHi.runDirs)
50  if ret==0:
51  wfInjector.upload()
52  wfInjector.submit()
53  return ret
54 
55 # ================================================================================
56 
57 if __name__ == '__main__':
58 
59  #this can get out of here
60  predefinedSet={
61  'limited' : [
62  # See README for further details
63 
66  5.1, # TTbar_8TeV_TuneCUETP8M1 FastSim
67  8, # RelValBeamHalo Cosmics
68  9.0, # RelValHiggs200ChargedTaus
69  25, # RelValTTbar
70  101.0, # SingleElectronE120EHCAL + ECALHCAL.customise + fullMixCustomize_cff.setCrossingFrameOn
71 
72  # Run2
73  7.3, # UndergroundCosmicSPLooseMu
74  1306.0, # RelValSingleMuPt1_UP15
75  1330, # RelValZMM_13
76  135.4, # ZEE_13TeV_TuneCUETP8M1
77  25202.0, # RelValTTbar_13 PU = AVE_35_BX_25ns
78  250202.181, # RelValTTbar_13 PREMIX
79 
80  # Run3
81  11634.0, # TTbar_14TeV 2021
82  13234.0, # RelValTTbar_14TeV 2021 FastsSim
83  12434.0, # RelValTTbar_14TeV 2023
84  12834.0, # RelValTTbar_14TeV 2024
85  12846.0, # RelValZEE_13 2024
86  13034.0, # RelValTTbar_14TeV 2024 PU = Run3_Flat55To75_PoissonOOTPU
87  12834.7, # RelValTTbar_14TeV 2024 mkFit
88  14034.0, # RelValTTbar_14TeV Run3_2023_FastSim
89  14234.0, # RelValTTbar_14TeV Run3_2023_FastSim PU = Run3_Flat55To75_PoissonOOTPU
90  2500.4, # RelValTTbar_14TeV NanoAOD from existing MINI
91 
92  # Phase2
93  24834.0, # RelValTTbar_14TeV phase2_realistic_T25 Extended2026D98 (Phase-2 baseline)
94  24834.911, # TTbar_14TeV_TuneCP5 phase2_realistic_T25 DD4hepExtended2026D98 DD4Hep (HLLHC14TeV BeamSpot)
95  25034.999, # RelValTTbar_14TeV (PREMIX) phase2_realistic_T25 Extended2026D98 AVE_50_BX_25ns_m3p3
96  24896.0, # RelValCloseByPGun_CE_E_Front_120um phase2_realistic_T25 Extended2026D98
97  24900.0, # RelValCloseByPGun_CE_H_Coarse_Scint phase2_realistic_T25 Extended2026D98
98  23234.0, # TTbar_14TeV_TuneCP5 phase2_realistic_T21 Extended2026D94 (exercise with HFNose)
99 
100 
101 
103  4.22, # Run2011A Cosmics
104  4.53, # Run2012B Photon miniAODs
105  1000, # Run2011A MinimumBias Prompt RecoTLR.customisePrompt
106  1001, # Run2011A MinimumBias Data+Express
107 
108  136.731, # Run2016B SinglePhoton
109  136.7611, # Run2016E JetHT (reMINIAOD) Run2_2016_HIPM + run2_miniAOD_80XLegacy
110  136.8311, # Run2017F JetHT (reMINIAOD) run2_miniAOD_94XFall17
111  136.88811, # Run2018D JetHT (reMINIAOD) run2_miniAOD_UL_preSummer20 (UL MINI)
112  136.793, # Run2017C DoubleEG
113  136.874, # Run2018C EGamma
114 
115 
117  139.001, # Run2021 MinimumBias Commissioning2021
118 
119  # 2022
120  140.023, # Run2022B ZeroBias
121  140.043, # Run2022C ZeroBias
122  140.063, # Run2022D ZeroBias
123 
124  # 2023
125  141.044, # Run2023D JetMET0
126  141.042, # Run2023D ZeroBias
127  141.046, # Run2023D EGamma0
128 
129 
132  140.53, # HIRun2011 HIMinBiasUPC
133  # Run2
134  140.56, # HIRun2018A HIHardProbes Run2_2018_pp_on_AA
135 
136  158.01, # RelValHydjetQ_B12_5020GeV_2018_ppReco (reMINIAOD) (HI MC with pp-like reco)
137  312.0, # Pyquen_ZeemumuJets_pt10_2760GeV PU : HiMixGEN
138 
139  ],
140  'jetmc': [5.1, 13, 15, 25, 38, 39], #MC
141  'metmc' : [5.1, 15, 25, 37, 38, 39], #MC
142  'muonmc' : [5.1, 124.4, 124.5, 20, 21, 22, 23, 25, 30], #MC
143  }
144 
145 
146  import argparse
147  usage = 'usage: runTheMatrix.py --show -s '
148 
149  parser = argparse.ArgumentParser(usage,formatter_class=argparse.ArgumentDefaultsHelpFormatter)
150 
151  parser.add_argument('-b','--batchName',
152  help='relval batch: suffix to be appended to Campaign name',
153  dest='batchName',
154  default='')
155 
156  parser.add_argument('-m','--memoryOffset',
157  help='memory of the wf for single core',
158  dest='memoryOffset',
159  type=int,
160  default=3000)
161 
162  parser.add_argument('--addMemPerCore',
163  help='increase of memory per each n > 1 core: memory(n_core) = memoryOffset + (n_core-1) * memPerCore',
164  dest='memPerCore',
165  type=int,
166  default=1500)
167 
168  parser.add_argument('-j','--nproc',
169  help='number of processes. 0 Will use 4 processes, not execute anything but create the wfs',
170  dest='nProcs',
171  type=int,
172  default=4)
173 
174  parser.add_argument('-t','--nThreads',
175  help='number of threads per process to use in cmsRun.',
176  dest='nThreads',
177  type=int,
178  default=1)
179 
180  parser.add_argument('--nStreams',
181  help='number of streams to use in cmsRun.',
182  dest='nStreams',
183  type=int,
184  default=0)
185 
186  parser.add_argument('--nEvents',
187  help='number of events to process in cmsRun. If 0 will use the standard 10 events.',
188  dest='nEvents',
189  type=int,
190  default=0)
191 
192  parser.add_argument('--numberEventsInLuminosityBlock',
193  help='number of events in a luminosity block',
194  dest='numberEventsInLuminosityBlock',
195  type=int,
196  default=-1)
197 
198  parser.add_argument('-n','--showMatrix',
199  help='Only show the worflows. Use --ext to show more',
200  dest='show',
201  default=False,
202  action='store_true')
203 
204  parser.add_argument('-e','--extended',
205  help='Show details of workflows, used with --show',
206  dest='extended',
207  default=False,
208  action='store_true')
209 
210  parser.add_argument('-s','--selected',
211  help='Run a pre-defined selected matrix of wf. Deprecated, please use -l limited',
212  dest='restricted',
213  default=False,
214  action='store_true')
215 
216  parser.add_argument('-l','--list',
217  help='Comma separated list of workflow to be shown or ran. Possible keys are also '+str(predefinedSet.keys())+'. and wild card like muon, or mc',
218  dest='testList',
219  default=None)
220 
221  parser.add_argument('-f','--failed-from',
222  help='Provide a matrix report to specify the workflows to be run again. Augments the -l option if specified already',
223  dest='failed_from',
224  default=None)
225 
226  parser.add_argument('-r','--raw',
227  help='Temporary dump the .txt needed for prodAgent interface. To be discontinued soon. Argument must be the name of the set (standard, pileup,...)',
228  dest='raw')
229 
230  parser.add_argument('-i','--useInput',
231  help='Use recyling where available. Either all, or a comma separated list of wf number.',
232  dest='useInput',
233  type=lambda x: x.split(','),
234  default=None)
235 
236  parser.add_argument('-w','--what',
237  help='Specify the set to be used. Argument must be the name of a set (standard, pileup,...) or multiple sets separated by commas (--what standard,pileup )',
238  dest='what',
239  default='all')
240 
241  parser.add_argument('--step1',
242  help='Used with --raw. Limit the production to step1',
243  dest='step1Only',
244  default=False)
245 
246  parser.add_argument('--maxSteps',
247  help='Only run maximum on maxSteps. Used when we are only interested in first n steps.',
248  dest='maxSteps',
249  default=9999,
250  type=int)
251 
252  parser.add_argument('--fromScratch',
253  help='Comma separated list of wf to be run without recycling. all is not supported as default.',
254  dest='fromScratch',
255  type=lambda x: x.split(','),
256  default=None)
257 
258  parser.add_argument('--refRelease',
259  help='Allow to modify the recycling dataset version',
260  dest='refRel',
261  default=None)
262 
263  parser.add_argument('--wmcontrol',
264  help='Create the workflows for injection to WMAgent. In the WORKING. -wmcontrol init will create the the workflows, -wmcontrol test will dryRun a test, -wmcontrol submit will submit to wmagent',
265  choices=['init','test','submit','force'],
266  dest='wmcontrol',
267  default=None)
268 
269  parser.add_argument('--revertDqmio',
270  help='When submitting workflows to wmcontrol, force DQM outout to use pool and not DQMIO',
271  choices=['yes','no'],
272  dest='revertDqmio',
273  default='no')
274 
275  parser.add_argument('--optionswm',
276  help='Specify a few things for wm injection',
277  default='',
278  dest='wmoptions')
279 
280  parser.add_argument('--keep',
281  help='allow to specify for which comma separated steps the output is needed',
282  default=None)
283 
284  parser.add_argument('--label',
285  help='allow to give a special label to the output dataset name',
286  default='')
287 
288  parser.add_argument('--command',
289  help='provide a way to add additional command to all of the cmsDriver commands in the matrix',
290  dest='command',
291  action='append',
292  default=None)
293 
294  parser.add_argument('--apply',
295  help='allow to use the --command only for 1 comma separeated',
296  dest='apply',
297  default=None)
298 
299  parser.add_argument('--workflow',
300  help='define a workflow to be created or altered from the matrix',
301  action='append',
302  dest='workflow',
303  default=None)
304 
305  parser.add_argument('--dryRun',
306  help='do not run the wf at all',
307  action='store_true',
308  dest='dryRun',
309  default=False)
310 
311  parser.add_argument('--testbed',
312  help='workflow injection to cmswebtest (you need dedicated rqmgr account)',
313  dest='testbed',
314  default=False,
315  action='store_true')
316 
317  parser.add_argument('--noCafVeto',
318  help='Run from any source, ignoring the CAF label',
319  dest='cafVeto',
320  default=True,
321  action='store_false')
322 
323  parser.add_argument('--overWrite',
324  help='Change the content of a step for another. List of pairs.',
325  dest='overWrite',
326  default=None)
327 
328  parser.add_argument('--noRun',
329  help='Remove all run list selection from wfs',
330  dest='noRun',
331  default=False,
332  action='store_true')
333 
334  parser.add_argument('--das-options',
335  help='Options to be passed to dasgoclient.',
336  dest='dasOptions',
337  default="--limit 0",
338  action='store')
339 
340  parser.add_argument('--job-reports',
341  help='Dump framework job reports',
342  dest='jobReports',
343  default=False,
344  action='store_true')
345 
346  parser.add_argument('--ibeos',
347  help='Use IB EOS site configuration',
348  dest='IBEos',
349  default=False,
350  action='store_true')
351 
352  parser.add_argument('--sites',
353  help='Run DAS query to get data from a specific site. Set it to empty string to search all sites.',
354  dest='dasSites',
355  default='T2_CH_CERN',
356  action='store')
357 
358  parser.add_argument('--interactive',
359  help="Open the Matrix interactive shell",
360  action='store_true',
361  default=False)
362 
363  parser.add_argument('--dbs-url',
364  help='Overwrite DbsUrl value in JSON submitted to ReqMgr2',
365  dest='dbsUrl',
366  default=None,
367  action='store')
368 
369  gpugroup = parser.add_argument_group('GPU-related options','These options are only meaningful when --gpu is used, and is not set to forbidden.')
370 
371  gpugroup.add_argument('--gpu','--requires-gpu',
372  help='Enable GPU workflows. Possible options are "forbidden" (default), "required" (implied if no argument is given), or "optional".',
373  dest='gpu',
374  choices=['forbidden', 'optional', 'required'],
375  nargs='?',
376  const='required',
377  default='forbidden',
378  action='store')
379 
380  gpugroup.add_argument('--gpu-memory',
381  help='Specify the minimum amount of GPU memory required by the job, in MB.',
382  dest='GPUMemoryMB',
383  type=int,
384  default=8000)
385 
386  gpugroup.add_argument('--cuda-capabilities',
387  help='Specify a comma-separated list of CUDA "compute capabilities", or GPU hardware architectures, that the job can use.',
388  dest='CUDACapabilities',
389  type=lambda x: x.split(','),
390  default='6.0,6.1,6.2,7.0,7.2,7.5,8.0,8.6')
391 
392  # read the CUDA runtime version included in CMSSW
393  cudart_version = None
394  libcudart = os.path.realpath(os.path.expandvars('$CMSSW_RELEASE_BASE/external/$SCRAM_ARCH/lib/libcudart.so'))
395  if os.path.isfile(libcudart):
396  cudart_basename = os.path.basename(libcudart)
397  cudart_version = '.'.join(cudart_basename.split('.')[2:4])
398  gpugroup.add_argument('--cuda-runtime',
399  help='Specify major and minor version of the CUDA runtime used to build the application.',
400  dest='CUDARuntime',
401  default=cudart_version)
402 
403  gpugroup.add_argument('--force-gpu-name',
404  help='Request a specific GPU model, e.g. "Tesla T4" or "NVIDIA GeForce RTX 2080". The default behaviour is to accept any supported GPU.',
405  dest='GPUName',
406  default='')
407 
408  gpugroup.add_argument('--force-cuda-driver-version',
409  help='Request a specific CUDA driver version, e.g. 470.57.02. The default behaviour is to accept any supported CUDA driver version.',
410  dest='CUDADriverVersion',
411  default='')
412 
413  gpugroup.add_argument('--force-cuda-runtime-version',
414  help='Request a specific CUDA runtime version, e.g. 11.4. The default behaviour is to accept any supported CUDA runtime version.',
415  dest='CUDARuntimeVersion',
416  default='')
417 
418  opt = parser.parse_args()
419  if opt.command: opt.command = ' '.join(opt.command)
420  os.environ["CMSSW_DAS_QUERY_SITES"]=opt.dasSites
421  if opt.failed_from:
422  rerunthese=[]
423  with open(opt.failed_from,'r') as report:
424  for report_line in report:
425  if 'FAILED' in report_line:
426  to_run,_=report_line.split('_',1)
427  rerunthese.append(to_run)
428  if opt.testList:
429  opt.testList+=','.join(['']+rerunthese)
430  else:
431  opt.testList = ','.join(rerunthese)
432 
433  if opt.IBEos:
434  from subprocess import getstatusoutput as run_cmd
435 
436  ibeos_cache = os.path.join(os.getenv("LOCALRT"), "ibeos_cache.txt")
437  if not os.path.exists(ibeos_cache):
438  err, out = run_cmd("curl -L -s -o %s https://raw.githubusercontent.com/cms-sw/cms-sw.github.io/master/das_queries/ibeos.txt" % ibeos_cache)
439  if err:
440  run_cmd("rm -f %s" % ibeos_cache)
441  print("Error: Unable to download ibeos cache information")
442  print(out)
443  sys.exit(err)
444 
445  for cmssw_env in [ "CMSSW_BASE", "CMSSW_RELEASE_BASE" ]:
446  cmssw_base = os.getenv(cmssw_env,None)
447  if not cmssw_base: continue
448  cmssw_base = os.path.join(cmssw_base,"src/Utilities/General/ibeos")
449  if os.path.exists(cmssw_base):
450  os.environ["PATH"]=cmssw_base+":"+os.getenv("PATH")
451  os.environ["CMS_PATH"]="/cvmfs/cms-ib.cern.ch"
452  os.environ["SITECONFIG_PATH"]="/cvmfs/cms-ib.cern.ch/SITECONF/local"
453  os.environ["CMSSW_USE_IBEOS"]="true"
454  print(">> WARNING: You are using SITECONF from /cvmfs/cms-ib.cern.ch")
455  break
456  if opt.restricted:
457  print('Deprecated, please use -l limited')
458  if opt.testList: opt.testList+=',limited'
459  else: opt.testList='limited'
460 
461  def stepOrIndex(s):
462  if s.isdigit():
463  return int(s)
464  else:
465  return s
466  if opt.apply:
467  opt.apply=map(stepOrIndex,opt.apply.split(','))
468  if opt.keep:
469  opt.keep=map(stepOrIndex,opt.keep.split(','))
470 
471  if opt.testList:
472  testList=[]
473  for entry in opt.testList.split(','):
474  if not entry: continue
475  mapped=False
476  for k in predefinedSet:
477  if k.lower().startswith(entry.lower()) or k.lower().endswith(entry.lower()):
478  testList.extend(predefinedSet[k])
479  mapped=True
480  break
481  if not mapped:
482  try:
483  testList.append(float(entry))
484  except:
485  print(entry,'is not a possible selected entry')
486 
487  opt.testList = list(set(testList))
488 
489  if opt.wmcontrol:
491  if opt.overWrite:
492  opt.overWrite=eval(opt.overWrite)
493  if opt.interactive:
494  import cmd
495  from colorama import Fore, Style
496  from os import isatty
497  import subprocess
498  import time
499 
500  class TheMatrix(cmd.Cmd):
501  intro = "Welcome to the Matrix (? for help)"
502  prompt = "matrix> "
503 
504  def __init__(self, opt):
505  cmd.Cmd.__init__(self)
506  self.opt_ = opt
507  self.matrices_ = {}
508  tmp = MatrixReader(self.opt_)
509  self.processes_ = dict()
510  for what in tmp.files:
511  what = what.replace('relval_','')
512  self.opt_.what = what
513  self.matrices_[what] = MatrixReader(self.opt_)
514  self.matrices_[what].prepare(self.opt_.useInput, self.opt_.refRel,
515  self.opt_.fromScratch)
516  os.system("clear")
517 
518  def do_clear(self, arg):
519  """Clear the screen, put prompt at the top"""
520  os.system("clear")
521 
522  def do_exit(self, arg):
523  print("Leaving the Matrix")
524  return True
525 
526  def default(self, inp):
527  if inp == 'x' or inp == 'q':
528  return self.do_exit(inp)
529  else:
530  is_pipe = not isatty(sys.stdin.fileno())
531  print(Fore.RED + "Error: " + Fore.RESET + "unrecognized command.")
532  # Quit only if given a piped command.
533  if is_pipe:
534  sys.exit(1)
535 
536  def help_predefined(self):
537  print("\n".join(["predefined [predef1 [...]]\n",
538  "Run w/o argument, it will print the list of known predefined workflows.",
539  "Run with space-separated predefined workflows, it will print the workflow-ids registered to them"]))
540 
541  def complete_predefined(self, text, line, start_idx, end_idx):
542  if text and len(text) > 0:
543  return [t for t in predefinedSet.keys() if t.startswith(text)]
544  else:
545  return predefinedSet.keys()
546 
547  def do_predefined(self, arg):
548  """Print the list of predefined workflows"""
549  print("List of predefined workflows")
550  if arg:
551  for w in arg.split():
552  if w in predefinedSet.keys():
553  print("Predefined Set: %s" % w)
554  print(predefinedSet[w])
555  else:
556  print("Unknown Set: %s" % w)
557  else:
558  print("[ " + Fore.RED + ", ".join([str(k) for k in predefinedSet.keys()]) + Fore.RESET + " ]")
559 
560  def help_showWorkflow(self):
561  print("\n".join(["showWorkflow [workflow1 [...]]\n",
562  "Run w/o arguments, it will print the list of registered macro-workflows.",
563  "Run with space-separated workflows, it will print the full list of workflow-ids registered to them"]))
564 
565  def complete_showWorkflow(self, text, line, start_idx, end_idx):
566  if text and len(text) > 0:
567  return [t for t in self.matrices_.keys() if t.startswith(text)]
568  else:
569  return self.matrices_.keys()
570 
571  def do_showWorkflow(self, arg):
572  if arg == '':
573  print("Available workflows:")
574  for k in self.matrices_.keys():
575  print(Fore.RED + Style.BRIGHT + k)
576  print(Style.RESET_ALL)
577  else:
578  selected = arg.split()
579  for k in selected:
580  if k not in self.matrices_.keys():
581  print("Unknown workflow %s: skipping" % k)
582  else:
583  for wfl in self.matrices_[k].workFlows:
584  print("%s %s" % (Fore.BLUE + str(wfl.numId) + Fore.RESET,
585  Fore.GREEN + wfl.nameId + Fore.RESET))
586  print("%s contains %d workflows" % (Fore.RED + k + Fore.RESET, len(self.matrices_[k].workFlows)))
587 
588  def do_runWorkflow(self, arg):
589  # Split the input arguments into a list
590  args = arg.split()
591  if len(args) < 2:
592  print(Fore.RED + Style.BRIGHT + "Wrong number of parameters passed")
593  print(Style.RESET_ALL)
594  return
595  workflow_class = args[0]
596  workflow_id = args[1]
597  passed_down_args = list()
598  if len(args) > 2:
599  passed_down_args = args[2:]
600  print(Fore.YELLOW + Style.BRIGHT + "Running with the following options:\n")
601  print(Fore.GREEN + Style.BRIGHT + "Workflow class: {}".format(workflow_class))
602  print(Fore.GREEN + Style.BRIGHT + "Workflow ID: {}".format(workflow_id))
603  print(Fore.GREEN + Style.BRIGHT + "Additional runTheMatrix options: {}".format(passed_down_args))
604  print(Style.RESET_ALL)
605  if workflow_class not in self.matrices_.keys():
606  print(Fore.RED + Style.BRIGHT + "Unknown workflow selected: {}".format(workflow_class))
607  print("Available workflows:")
608  for k in self.matrices_.keys():
609  print(Fore.RED + Style.BRIGHT + k)
610  print(Style.RESET_ALL)
611  return
612  wflnums = [x.numId for x in self.matrices_[workflow_class].workFlows]
613  if float(workflow_id) not in wflnums:
614  print(Fore.RED + Style.BRIGHT + "Unknown workflow {}".format(workflow_id))
615  print(Fore.GREEN + Style.BRIGHT)
616  print(wflnums)
617  print(Style.RESET_ALL)
618  return
619  if workflow_id in self.processes_.keys():
620  # Check if the process is still active
621  if self.processes_[workflow_id][0].poll() is None:
622  print(Fore.RED + Style.BRIGHT + "Workflow {} already running!".format(workflow_id))
623  print(Style.RESET_ALL)
624  return
625  # If it was there but it's gone, proceeed and update the value for the same key
626  # run a job, redirecting standard output and error to files
627  lognames = ['stdout', 'stderr']
628  logfiles = tuple('%s_%s_%s.log' % (workflow_class, workflow_id, name) for name in lognames)
629  stdout = open(logfiles[0], 'w')
630  stderr = open(logfiles[1], 'w')
631  command = ('runTheMatrix.py', '-w', workflow_class, '-l', workflow_id)
632  if len(passed_down_args) > 0:
633  command += tuple(passed_down_args)
634  print(command)
635  p = subprocess.Popen(command,
636  stdout = stdout,
637  stderr = stderr)
638  self.processes_[workflow_id] = (p, time.time())
639 
640 
641  def complete_runWorkflow(self, text, line, start_idx, end_idx):
642  if text and len(text) > 0:
643  return [t for t in self.matrices_.keys() if t.startswith(text)]
644  else:
645  return self.matrices_.keys()
646 
647  def help_runWorkflow(self):
648  print("\n".join(["runWorkflow workflow_class workflow_id\n",
649  "This command will launch a new and independent process that invokes",
650  "the command:\n",
651  "runTheMatrix.py -w workflow_class -l workflow_id [runTheMatrix.py options]",
652  "\nYou can specify just one workflow_class and workflow_id per invocation.",
653  "The job will continue even after quitting the interactive session.",
654  "stdout and stderr of the new process will be automatically",
655  "redirected to 2 logfiles whose names contain the workflow_class",
656  "and workflow_id. Mutiple command can be issued one after the other.",
657  "The working directory of the new process will be the directory",
658  "from which the interactive session has started.",
659  "Autocompletion is available for workflow_class, but",
660  "not for workflow_id. Supplying a wrong workflow_class or",
661  "a non-existing workflow_id for a valid workflow_class",
662  "will trigger an error and no process will be invoked.",
663  "The interactive shell will keep track of all active processes",
664  "and will prevent the accidental resubmission of an already",
665  "active jobs."]))
666 
667  def do_jobs(self, args):
668  print(Fore.GREEN + Style.BRIGHT + "List of jobs:")
669  for w in self.processes_.keys():
670  if self.processes_[w][0].poll() is None:
671  print(Fore.YELLOW + Style.BRIGHT + "Active job: {} since {:.2f} seconds.".format(w, time.time() - self.processes_[w][1]))
672  else:
673  print(Fore.RED + Style.BRIGHT + "Done job: {}".format(w))
674  print(Style.RESET_ALL)
675 
676  def help_jobs(self):
677  print("\n".join(["Print a full list of active and done jobs submitted",
678  "in the ongoing interactive session"]))
679 
681  print("\n".join(["searchInWorkflow wfl_name search_regexp\n",
682  "This command will search for a match within all workflows registered to wfl_name.",
683  "The search is done on both the workflow name and the names of steps registered to it."]))
684 
685  def complete_searchInWorkflow(self, text, line, start_idx, end_idx):
686  if text and len(text) > 0:
687  return [t for t in self.matrices_.keys() if t.startswith(text)]
688  else:
689  return self.matrices_.keys()
690 
691  def do_searchInWorkflow(self, arg):
692  args = arg.split()
693  if len(args) < 2:
694  print("searchInWorkflow name regexp")
695  return
696  if args[0] not in self.matrices_.keys():
697  print("Unknown workflow")
698  return
699  import re
700  pattern = None
701  try:
702  pattern = re.compile(args[1])
703  except:
704  print("Failed to compile regexp %s" % args[1])
705  return
706  counter = 0
707  for wfl in self.matrices_[args[0]].workFlows:
708  if re.match(pattern, wfl.nameId):
709  print("%s %s" % (Fore.BLUE + str(wfl.numId) + Fore.RESET,
710  Fore.GREEN + wfl.nameId + Fore.RESET))
711  counter +=1
712  print("Found %s compatible workflows inside %s" % (Fore.RED + str(counter) + Fore.RESET,
713  Fore.YELLOW + str(args[0])) + Fore.RESET)
714 
715  def help_search(self):
716  print("\n".join(["search search_regexp\n",
717  "This command will search for a match within all workflows registered.",
718  "The search is done on both the workflow name and the names of steps registered to it."]))
719 
720  def do_search(self, arg):
721  args = arg.split()
722  if len(args) < 1:
723  print("search regexp")
724  return
725  for wfl in self.matrices_.keys():
726  self.do_searchInWorkflow(' '.join([wfl, args[0]]))
727 
729  print("\n".join(["dumpWorkflowId [wfl-id1 [...]]\n",
730  "Dumps the details (cmsDriver commands for all steps) of the space-separated workflow-ids in input."]))
731 
732  def do_dumpWorkflowId(self, arg):
733  wflids = arg.split()
734  if len(wflids) == 0:
735  print("dumpWorkflowId [wfl-id1 [...]]")
736  return
737 
738  fmt = "[%s]: %s\n"
739  maxLen = 100
740  for wflid in wflids:
741  dump = True
742  for key, mrd in self.matrices_.items():
743  for wfl in mrd.workFlows:
744  if wfl.numId == float(wflid):
745  if dump:
746  dump = False
747  print(Fore.GREEN + str(wfl.numId) + Fore.RESET + " " + Fore.YELLOW + wfl.nameId + Fore.RESET)
748  for i,s in enumerate(wfl.cmds):
749  print(fmt % (Fore.RED + str(i+1) + Fore.RESET,
750  (str(s)+' ')))
751  print("\nWorkflow found in %s." % key)
752  else:
753  print("Workflow also found in %s." % key)
754 
755  do_EOF = do_exit
756 
757  TheMatrix(opt).cmdloop()
758  sys.exit(0)
759 
760  if opt.raw and opt.show:
761  ret = showRaw(opt)
762  else:
763  ret = runSelected(opt)
764 
765 
766  sys.exit(ret)
def performInjectionOptionTest(opt)
def runSelected(opt)
Definition: runTheMatrix.py:20
def do_predefined(self, arg)
def do_showWorkflow(self, arg)
def complete_showWorkflow(self, text, line, start_idx, end_idx)
def do_search(self, arg)
def showRaw(opt)
Definition: runTheMatrix.py:11
def do_clear(self, arg)
def help_dumpWorkflowId(self)
def complete_predefined(self, text, line, start_idx, end_idx)
def help_searchInWorkflow(self)
def do_dumpWorkflowId(self, arg)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
def __init__(self, opt)
static std::string join(char **cmd)
Definition: RemoteFile.cc:19
def do_exit(self, arg)
def default(self, inp)
def complete_searchInWorkflow(self, text, line, start_idx, end_idx)
def stepOrIndex(s)
def complete_runWorkflow(self, text, line, start_idx, end_idx)
def do_runWorkflow(self, arg)
#define str(s)
def do_jobs(self, args)
def do_searchInWorkflow(self, arg)