CMS 3D CMS Logo

runTheMatrix.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 from __future__ import print_function
3 import sys, os
4 
5 from Configuration.PyReleaseValidation.MatrixReader import MatrixReader
6 from Configuration.PyReleaseValidation.MatrixRunner import MatrixRunner
7 from Configuration.PyReleaseValidation.MatrixInjector import MatrixInjector,performInjectionOptionTest
8 
9 # ================================================================================
10 
11 def showRaw(opt):
12 
13  mrd = MatrixReader(opt)
14  mrd.showRaw(opt.useInput, opt.refRel, opt.fromScratch, opt.raw, opt.step1Only, selected=opt.testList)
15 
16  return 0
17 
18 # ================================================================================
19 
20 def runSelected(opt):
21 
22  mrd = MatrixReader(opt)
23  mrd.prepare(opt.useInput, opt.refRel, opt.fromScratch)
24 
25  # test for wrong input workflows
26  if opt.testList:
27  definedWf = [dwf.numId for dwf in mrd.workFlows]
28  definedSet = set(definedWf)
29  testSet = set(opt.testList)
30  undefSet = testSet - definedSet
31  if len(undefSet)>0: raise ValueError('Undefined workflows: '+', '.join(map(str,list(undefSet))))
32  duplicates = [wf for wf in testSet if definedWf.count(wf)>1 ]
33  if len(duplicates)>0: raise ValueError('Duplicated workflows: '+', '.join(map(str,list(duplicates))))
34 
35  ret = 0
36  if opt.show:
37  mrd.show(opt.testList, opt.extended, opt.cafVeto)
38  if opt.testList : print('selected items:', opt.testList)
39  else:
40  mRunnerHi = MatrixRunner(mrd.workFlows, opt.nProcs, opt.nThreads)
41  ret = mRunnerHi.runTests(opt)
42 
43  if opt.wmcontrol:
44  if ret!=0:
45  print('Cannot go on with wmagent injection with failing workflows')
46  else:
47  wfInjector = MatrixInjector(opt,mode=opt.wmcontrol,options=opt.wmoptions)
48  ret= wfInjector.prepare(mrd,
49  mRunnerHi.runDirs)
50  if ret==0:
51  wfInjector.upload()
52  wfInjector.submit()
53  return ret
54 
55 # ================================================================================
56 
57 if __name__ == '__main__':
58 
59  #this can get out of here
60  predefinedSet={
61  'limited' : [
62  # See README for further details
63 
66  5.1, # TTbar_8TeV_TuneCUETP8M1 FastSim
67  8, # RelValBeamHalo Cosmics
68  9.0, # RelValHiggs200ChargedTaus
69  25, # RelValTTbar
70  101.0, # SingleElectronE120EHCAL + ECALHCAL.customise + fullMixCustomize_cff.setCrossingFrameOn
71 
72  # Run2
73  7.3, # UndergroundCosmicSPLooseMu
74  1306.0, # RelValSingleMuPt1_UP15
75  1330, # RelValZMM_13
76  135.4, # ZEE_13TeV_TuneCUETP8M1
77  25202.0, # RelValTTbar_13 PU = AVE_35_BX_25ns
78  250202.181, # RelValTTbar_13 PREMIX
79 
80  # Run3
81  11634.0, # TTbar_14TeV 2021
82  13234.0, # RelValTTbar_14TeV 2021 FastsSim
83  12434.0, # RelValTTbar_14TeV 2023
84  12834.0, # RelValTTbar_14TeV 2024
85  12846.0, # RelValZEE_13 2024
86  13034.0, # RelValTTbar_14TeV 2024 PU = Run3_Flat55To75_PoissonOOTPU
87  12834.7, # RelValTTbar_14TeV 2024 mkFit
88  14034.0, # RelValTTbar_14TeV Run3_2023_FastSim
89  14234.0, # RelValTTbar_14TeV Run3_2023_FastSim PU = Run3_Flat55To75_PoissonOOTPU
90  2500.201, # RelValTTbar_14TeV NanoAOD from existing MINI
91 
92  # Phase2
93  29634.0, # RelValTTbar_14TeV phase2_realistic_T33 Extended2026D110 (Phase-2 baseline)
94  29634.911, # TTbar_14TeV_TuneCP5 phase2_realistic_T33 DD4hepExtended2026D110 DD4Hep (HLLHC14TeV BeamSpot)
95  29834.999, # RelValTTbar_14TeV (PREMIX) phase2_realistic_T33 Extended2026D110 AVE_50_BX_25ns_m3p3
96  29696.0, # RelValCloseByPGun_CE_E_Front_120um phase2_realistic_T33 Extended2026D110
97  29700.0, # RelValCloseByPGun_CE_H_Coarse_Scint phase2_realistic_T33 Extended2026D110
98  23234.0, # TTbar_14TeV_TuneCP5 phase2_realistic_T21 Extended2026D94 (exercise with HFNose)
99 
100 
101 
103  4.22, # Run2011A Cosmics
104  4.53, # Run2012B Photon miniAODs
105  1000, # Run2011A MinimumBias Prompt RecoTLR.customisePrompt
106  1001, # Run2011A MinimumBias Data+Express
107 
108  136.731, # Run2016B SinglePhoton
109  136.793, # Run2017C DoubleEG
110  136.874, # Run2018C EGamma
111 
112 
114  139.001, # Run2021 MinimumBias Commissioning2021
115 
116  # 2022
117  140.023, # Run2022B ZeroBias
118  140.043, # Run2022C ZeroBias
119  140.063, # Run2022D ZeroBias
120 
121  # 2023
122  141.044, # Run2023D JetMET0
123  141.042, # Run2023D ZeroBias
124  141.046, # Run2023D EGamma0
125 
126 
129  140.56, # HIRun2018A HIHardProbes Run2_2018_pp_on_AA
130 
131  312.0, # Pyquen_ZeemumuJets_pt10_2760GeV PU : HiMixGEN
132 
133  ],
134  'jetmc': [5.1, 13, 15, 25, 38, 39], #MC
135  'metmc' : [5.1, 15, 25, 37, 38, 39], #MC
136  'muonmc' : [5.1, 124.4, 124.5, 20, 21, 22, 23, 25, 30], #MC
137  }
138 
139 
140  import argparse
141  usage = 'usage: runTheMatrix.py --show -s '
142 
143  parser = argparse.ArgumentParser(usage,formatter_class=argparse.ArgumentDefaultsHelpFormatter)
144 
145  parser.add_argument('-b','--batchName',
146  help='relval batch: suffix to be appended to Campaign name',
147  dest='batchName',
148  default='')
149 
150  parser.add_argument('-m','--memoryOffset',
151  help='memory of the wf for single core',
152  dest='memoryOffset',
153  type=int,
154  default=3000)
155 
156  parser.add_argument('--addMemPerCore',
157  help='increase of memory per each n > 1 core: memory(n_core) = memoryOffset + (n_core-1) * memPerCore',
158  dest='memPerCore',
159  type=int,
160  default=1500)
161 
162  parser.add_argument('-j','--nproc',
163  help='number of processes. 0 Will use 4 processes, not execute anything but create the wfs',
164  dest='nProcs',
165  type=int,
166  default=4)
167 
168  parser.add_argument('-t','--nThreads',
169  help='number of threads per process to use in cmsRun.',
170  dest='nThreads',
171  type=int,
172  default=1)
173 
174  parser.add_argument('--nStreams',
175  help='number of streams to use in cmsRun.',
176  dest='nStreams',
177  type=int,
178  default=0)
179 
180  parser.add_argument('--nEvents',
181  help='number of events to process in cmsRun. If 0 will use the standard 10 events.',
182  dest='nEvents',
183  type=int,
184  default=0)
185 
186  parser.add_argument('--numberEventsInLuminosityBlock',
187  help='number of events in a luminosity block',
188  dest='numberEventsInLuminosityBlock',
189  type=int,
190  default=-1)
191 
192  parser.add_argument('-n','--showMatrix',
193  help='Only show the worflows. Use --ext to show more',
194  dest='show',
195  default=False,
196  action='store_true')
197 
198  parser.add_argument('-e','--extended',
199  help='Show details of workflows, used with --show',
200  dest='extended',
201  default=False,
202  action='store_true')
203 
204  parser.add_argument('-s','--selected',
205  help='Run a pre-defined selected matrix of wf. Deprecated, please use -l limited',
206  dest='restricted',
207  default=False,
208  action='store_true')
209 
210  parser.add_argument('-l','--list',
211  help='Comma separated list of workflow to be shown or ran. Possible keys are also '+str(predefinedSet.keys())+'. and wild card like muon, or mc',
212  dest='testList',
213  default=None)
214 
215  parser.add_argument('-f','--failed-from',
216  help='Provide a matrix report to specify the workflows to be run again. Augments the -l option if specified already',
217  dest='failed_from',
218  default=None)
219 
220  parser.add_argument('-r','--raw',
221  help='Temporary dump the .txt needed for prodAgent interface. To be discontinued soon. Argument must be the name of the set (standard, pileup,...)',
222  dest='raw')
223 
224  parser.add_argument('-i','--useInput',
225  help='Use recyling where available. Either all, or a comma separated list of wf number.',
226  dest='useInput',
227  type=lambda x: x.split(','),
228  default=None)
229 
230  parser.add_argument('-w','--what',
231  help='Specify the set to be used. Argument must be the name of a set (standard, pileup,...) or multiple sets separated by commas (--what standard,pileup )',
232  dest='what',
233  default='all')
234 
235  parser.add_argument('--step1',
236  help='Used with --raw. Limit the production to step1',
237  dest='step1Only',
238  default=False)
239 
240  parser.add_argument('--maxSteps',
241  help='Only run maximum on maxSteps. Used when we are only interested in first n steps.',
242  dest='maxSteps',
243  default=9999,
244  type=int)
245 
246  parser.add_argument('--fromScratch',
247  help='Comma separated list of wf to be run without recycling. all is not supported as default.',
248  dest='fromScratch',
249  type=lambda x: x.split(','),
250  default=None)
251 
252  parser.add_argument('--refRelease',
253  help='Allow to modify the recycling dataset version',
254  dest='refRel',
255  default=None)
256 
257  parser.add_argument('--wmcontrol',
258  help='Create the workflows for injection to WMAgent. In the WORKING. -wmcontrol init will create the the workflows, -wmcontrol test will dryRun a test, -wmcontrol submit will submit to wmagent',
259  choices=['init','test','submit','force'],
260  dest='wmcontrol',
261  default=None)
262 
263  parser.add_argument('--revertDqmio',
264  help='When submitting workflows to wmcontrol, force DQM outout to use pool and not DQMIO',
265  choices=['yes','no'],
266  dest='revertDqmio',
267  default='no')
268 
269  parser.add_argument('--optionswm',
270  help='Specify a few things for wm injection',
271  default='',
272  dest='wmoptions')
273 
274  parser.add_argument('--keep',
275  help='allow to specify for which comma separated steps the output is needed',
276  default=None)
277 
278  parser.add_argument('--label',
279  help='allow to give a special label to the output dataset name',
280  default='')
281 
282  parser.add_argument('--command',
283  help='provide a way to add additional command to all of the cmsDriver commands in the matrix',
284  dest='command',
285  action='append',
286  default=None)
287 
288  parser.add_argument('--apply',
289  help='allow to use the --command only for 1 comma separeated',
290  dest='apply',
291  default=None)
292 
293  parser.add_argument('--workflow',
294  help='define a workflow to be created or altered from the matrix',
295  action='append',
296  dest='workflow',
297  default=None)
298 
299  parser.add_argument('--dryRun',
300  help='do not run the wf at all',
301  action='store_true',
302  dest='dryRun',
303  default=False)
304 
305  parser.add_argument('--testbed',
306  help='workflow injection to cmswebtest (you need dedicated rqmgr account)',
307  dest='testbed',
308  default=False,
309  action='store_true')
310 
311  parser.add_argument('--noCafVeto',
312  help='Run from any source, ignoring the CAF label',
313  dest='cafVeto',
314  default=True,
315  action='store_false')
316 
317  parser.add_argument('--overWrite',
318  help='Change the content of a step for another. List of pairs.',
319  dest='overWrite',
320  default=None)
321 
322  parser.add_argument('--noRun',
323  help='Remove all run list selection from wfs',
324  dest='noRun',
325  default=False,
326  action='store_true')
327 
328  parser.add_argument('--das-options',
329  help='Options to be passed to dasgoclient.',
330  dest='dasOptions',
331  default="--limit 0",
332  action='store')
333 
334  parser.add_argument('--job-reports',
335  help='Dump framework job reports',
336  dest='jobReports',
337  default=False,
338  action='store_true')
339 
340  parser.add_argument('--ibeos',
341  help='Use IB EOS site configuration',
342  dest='IBEos',
343  default=False,
344  action='store_true')
345 
346  parser.add_argument('--sites',
347  help='Run DAS query to get data from a specific site. Set it to empty string to search all sites.',
348  dest='dasSites',
349  default='T2_CH_CERN',
350  action='store')
351 
352  parser.add_argument('--interactive',
353  help="Open the Matrix interactive shell",
354  action='store_true',
355  default=False)
356 
357  parser.add_argument('--dbs-url',
358  help='Overwrite DbsUrl value in JSON submitted to ReqMgr2',
359  dest='dbsUrl',
360  default=None,
361  action='store')
362 
363  gpugroup = parser.add_argument_group('GPU-related options','These options are only meaningful when --gpu is used, and is not set to forbidden.')
364 
365  gpugroup.add_argument('--gpu','--requires-gpu',
366  help='Enable GPU workflows. Possible options are "forbidden" (default), "required" (implied if no argument is given), or "optional".',
367  dest='gpu',
368  choices=['forbidden', 'optional', 'required'],
369  nargs='?',
370  const='required',
371  default='forbidden',
372  action='store')
373 
374  gpugroup.add_argument('--gpu-memory',
375  help='Specify the minimum amount of GPU memory required by the job, in MB.',
376  dest='GPUMemoryMB',
377  type=int,
378  default=8000)
379 
380  gpugroup.add_argument('--cuda-capabilities',
381  help='Specify a comma-separated list of CUDA "compute capabilities", or GPU hardware architectures, that the job can use.',
382  dest='CUDACapabilities',
383  type=lambda x: x.split(','),
384  default='6.0,6.1,6.2,7.0,7.2,7.5,8.0,8.6')
385 
386  # read the CUDA runtime version included in CMSSW
387  cudart_version = None
388  libcudart = os.path.realpath(os.path.expandvars('$CMSSW_RELEASE_BASE/external/$SCRAM_ARCH/lib/libcudart.so'))
389  if os.path.isfile(libcudart):
390  cudart_basename = os.path.basename(libcudart)
391  cudart_version = '.'.join(cudart_basename.split('.')[2:4])
392  gpugroup.add_argument('--cuda-runtime',
393  help='Specify major and minor version of the CUDA runtime used to build the application.',
394  dest='CUDARuntime',
395  default=cudart_version)
396 
397  gpugroup.add_argument('--force-gpu-name',
398  help='Request a specific GPU model, e.g. "Tesla T4" or "NVIDIA GeForce RTX 2080". The default behaviour is to accept any supported GPU.',
399  dest='GPUName',
400  default='')
401 
402  gpugroup.add_argument('--force-cuda-driver-version',
403  help='Request a specific CUDA driver version, e.g. 470.57.02. The default behaviour is to accept any supported CUDA driver version.',
404  dest='CUDADriverVersion',
405  default='')
406 
407  gpugroup.add_argument('--force-cuda-runtime-version',
408  help='Request a specific CUDA runtime version, e.g. 11.4. The default behaviour is to accept any supported CUDA runtime version.',
409  dest='CUDARuntimeVersion',
410  default='')
411 
412  opt = parser.parse_args()
413  if opt.command: opt.command = ' '.join(opt.command)
414  os.environ["CMSSW_DAS_QUERY_SITES"]=opt.dasSites
415  if opt.failed_from:
416  rerunthese=[]
417  with open(opt.failed_from,'r') as report:
418  for report_line in report:
419  if 'FAILED' in report_line:
420  to_run,_=report_line.split('_',1)
421  rerunthese.append(to_run)
422  if opt.testList:
423  opt.testList+=','.join(['']+rerunthese)
424  else:
425  opt.testList = ','.join(rerunthese)
426 
427  if opt.IBEos:
428  from subprocess import getstatusoutput as run_cmd
429 
430  ibeos_cache = os.path.join(os.getenv("LOCALRT"), "ibeos_cache.txt")
431  if not os.path.exists(ibeos_cache):
432  err, out = run_cmd("curl -L -s -o %s https://raw.githubusercontent.com/cms-sw/cms-sw.github.io/master/das_queries/ibeos.txt" % ibeos_cache)
433  if err:
434  run_cmd("rm -f %s" % ibeos_cache)
435  print("Error: Unable to download ibeos cache information")
436  print(out)
437  sys.exit(err)
438 
439  for cmssw_env in [ "CMSSW_BASE", "CMSSW_RELEASE_BASE" ]:
440  cmssw_base = os.getenv(cmssw_env,None)
441  if not cmssw_base: continue
442  cmssw_base = os.path.join(cmssw_base,"src/Utilities/General/ibeos")
443  if os.path.exists(cmssw_base):
444  os.environ["PATH"]=cmssw_base+":"+os.getenv("PATH")
445  os.environ["CMS_PATH"]="/cvmfs/cms-ib.cern.ch"
446  os.environ["SITECONFIG_PATH"]="/cvmfs/cms-ib.cern.ch/SITECONF/local"
447  os.environ["CMSSW_USE_IBEOS"]="true"
448  print(">> WARNING: You are using SITECONF from /cvmfs/cms-ib.cern.ch")
449  break
450  if opt.restricted:
451  print('Deprecated, please use -l limited')
452  if opt.testList: opt.testList+=',limited'
453  else: opt.testList='limited'
454 
455  def stepOrIndex(s):
456  if s.isdigit():
457  return int(s)
458  else:
459  return s
460  if opt.apply:
461  opt.apply=map(stepOrIndex,opt.apply.split(','))
462  if opt.keep:
463  opt.keep=map(stepOrIndex,opt.keep.split(','))
464 
465  if opt.testList:
466  testList=[]
467  for entry in opt.testList.split(','):
468  if not entry: continue
469  mapped=False
470  for k in predefinedSet:
471  if k.lower().startswith(entry.lower()) or k.lower().endswith(entry.lower()):
472  testList.extend(predefinedSet[k])
473  mapped=True
474  break
475  if not mapped:
476  try:
477  testList.append(float(entry))
478  except:
479  print(entry,'is not a possible selected entry')
480 
481  opt.testList = list(set(testList))
482 
483  if opt.wmcontrol:
485  if opt.overWrite:
486  opt.overWrite=eval(opt.overWrite)
487  if opt.interactive:
488  import cmd
489  from colorama import Fore, Style
490  from os import isatty
491  import subprocess
492  import time
493 
494  class TheMatrix(cmd.Cmd):
495  intro = "Welcome to the Matrix (? for help)"
496  prompt = "matrix> "
497 
498  def __init__(self, opt):
499  cmd.Cmd.__init__(self)
500  self.opt_ = opt
501  self.matrices_ = {}
502  tmp = MatrixReader(self.opt_)
503  self.processes_ = dict()
504  for what in tmp.files:
505  what = what.replace('relval_','')
506  self.opt_.what = what
507  self.matrices_[what] = MatrixReader(self.opt_)
508  self.matrices_[what].prepare(self.opt_.useInput, self.opt_.refRel,
509  self.opt_.fromScratch)
510  os.system("clear")
511 
512  def do_clear(self, arg):
513  """Clear the screen, put prompt at the top"""
514  os.system("clear")
515 
516  def do_exit(self, arg):
517  print("Leaving the Matrix")
518  return True
519 
520  def default(self, inp):
521  if inp == 'x' or inp == 'q':
522  return self.do_exit(inp)
523  else:
524  is_pipe = not isatty(sys.stdin.fileno())
525  print(Fore.RED + "Error: " + Fore.RESET + "unrecognized command.")
526  # Quit only if given a piped command.
527  if is_pipe:
528  sys.exit(1)
529 
530  def help_predefined(self):
531  print("\n".join(["predefined [predef1 [...]]\n",
532  "Run w/o argument, it will print the list of known predefined workflows.",
533  "Run with space-separated predefined workflows, it will print the workflow-ids registered to them"]))
534 
535  def complete_predefined(self, text, line, start_idx, end_idx):
536  if text and len(text) > 0:
537  return [t for t in predefinedSet.keys() if t.startswith(text)]
538  else:
539  return predefinedSet.keys()
540 
541  def do_predefined(self, arg):
542  """Print the list of predefined workflows"""
543  print("List of predefined workflows")
544  if arg:
545  for w in arg.split():
546  if w in predefinedSet.keys():
547  print("Predefined Set: %s" % w)
548  print(predefinedSet[w])
549  else:
550  print("Unknown Set: %s" % w)
551  else:
552  print("[ " + Fore.RED + ", ".join([str(k) for k in predefinedSet.keys()]) + Fore.RESET + " ]")
553 
554  def help_showWorkflow(self):
555  print("\n".join(["showWorkflow [workflow1 [...]]\n",
556  "Run w/o arguments, it will print the list of registered macro-workflows.",
557  "Run with space-separated workflows, it will print the full list of workflow-ids registered to them"]))
558 
559  def complete_showWorkflow(self, text, line, start_idx, end_idx):
560  if text and len(text) > 0:
561  return [t for t in self.matrices_.keys() if t.startswith(text)]
562  else:
563  return self.matrices_.keys()
564 
565  def do_showWorkflow(self, arg):
566  if arg == '':
567  print("Available workflows:")
568  for k in self.matrices_.keys():
569  print(Fore.RED + Style.BRIGHT + k)
570  print(Style.RESET_ALL)
571  else:
572  selected = arg.split()
573  for k in selected:
574  if k not in self.matrices_.keys():
575  print("Unknown workflow %s: skipping" % k)
576  else:
577  for wfl in self.matrices_[k].workFlows:
578  print("%s %s" % (Fore.BLUE + str(wfl.numId) + Fore.RESET,
579  Fore.GREEN + wfl.nameId + Fore.RESET))
580  print("%s contains %d workflows" % (Fore.RED + k + Fore.RESET, len(self.matrices_[k].workFlows)))
581 
582  def do_runWorkflow(self, arg):
583  # Split the input arguments into a list
584  args = arg.split()
585  if len(args) < 2:
586  print(Fore.RED + Style.BRIGHT + "Wrong number of parameters passed")
587  print(Style.RESET_ALL)
588  return
589  workflow_class = args[0]
590  workflow_id = args[1]
591  passed_down_args = list()
592  if len(args) > 2:
593  passed_down_args = args[2:]
594  print(Fore.YELLOW + Style.BRIGHT + "Running with the following options:\n")
595  print(Fore.GREEN + Style.BRIGHT + "Workflow class: {}".format(workflow_class))
596  print(Fore.GREEN + Style.BRIGHT + "Workflow ID: {}".format(workflow_id))
597  print(Fore.GREEN + Style.BRIGHT + "Additional runTheMatrix options: {}".format(passed_down_args))
598  print(Style.RESET_ALL)
599  if workflow_class not in self.matrices_.keys():
600  print(Fore.RED + Style.BRIGHT + "Unknown workflow selected: {}".format(workflow_class))
601  print("Available workflows:")
602  for k in self.matrices_.keys():
603  print(Fore.RED + Style.BRIGHT + k)
604  print(Style.RESET_ALL)
605  return
606  wflnums = [x.numId for x in self.matrices_[workflow_class].workFlows]
607  if float(workflow_id) not in wflnums:
608  print(Fore.RED + Style.BRIGHT + "Unknown workflow {}".format(workflow_id))
609  print(Fore.GREEN + Style.BRIGHT)
610  print(wflnums)
611  print(Style.RESET_ALL)
612  return
613  if workflow_id in self.processes_.keys():
614  # Check if the process is still active
615  if self.processes_[workflow_id][0].poll() is None:
616  print(Fore.RED + Style.BRIGHT + "Workflow {} already running!".format(workflow_id))
617  print(Style.RESET_ALL)
618  return
619  # If it was there but it's gone, proceeed and update the value for the same key
620  # run a job, redirecting standard output and error to files
621  lognames = ['stdout', 'stderr']
622  logfiles = tuple('%s_%s_%s.log' % (workflow_class, workflow_id, name) for name in lognames)
623  stdout = open(logfiles[0], 'w')
624  stderr = open(logfiles[1], 'w')
625  command = ('runTheMatrix.py', '-w', workflow_class, '-l', workflow_id)
626  if len(passed_down_args) > 0:
627  command += tuple(passed_down_args)
628  print(command)
629  p = subprocess.Popen(command,
630  stdout = stdout,
631  stderr = stderr)
632  self.processes_[workflow_id] = (p, time.time())
633 
634 
635  def complete_runWorkflow(self, text, line, start_idx, end_idx):
636  if text and len(text) > 0:
637  return [t for t in self.matrices_.keys() if t.startswith(text)]
638  else:
639  return self.matrices_.keys()
640 
641  def help_runWorkflow(self):
642  print("\n".join(["runWorkflow workflow_class workflow_id\n",
643  "This command will launch a new and independent process that invokes",
644  "the command:\n",
645  "runTheMatrix.py -w workflow_class -l workflow_id [runTheMatrix.py options]",
646  "\nYou can specify just one workflow_class and workflow_id per invocation.",
647  "The job will continue even after quitting the interactive session.",
648  "stdout and stderr of the new process will be automatically",
649  "redirected to 2 logfiles whose names contain the workflow_class",
650  "and workflow_id. Mutiple command can be issued one after the other.",
651  "The working directory of the new process will be the directory",
652  "from which the interactive session has started.",
653  "Autocompletion is available for workflow_class, but",
654  "not for workflow_id. Supplying a wrong workflow_class or",
655  "a non-existing workflow_id for a valid workflow_class",
656  "will trigger an error and no process will be invoked.",
657  "The interactive shell will keep track of all active processes",
658  "and will prevent the accidental resubmission of an already",
659  "active jobs."]))
660 
661  def do_jobs(self, args):
662  print(Fore.GREEN + Style.BRIGHT + "List of jobs:")
663  for w in self.processes_.keys():
664  if self.processes_[w][0].poll() is None:
665  print(Fore.YELLOW + Style.BRIGHT + "Active job: {} since {:.2f} seconds.".format(w, time.time() - self.processes_[w][1]))
666  else:
667  print(Fore.RED + Style.BRIGHT + "Done job: {}".format(w))
668  print(Style.RESET_ALL)
669 
670  def help_jobs(self):
671  print("\n".join(["Print a full list of active and done jobs submitted",
672  "in the ongoing interactive session"]))
673 
675  print("\n".join(["searchInWorkflow wfl_name search_regexp\n",
676  "This command will search for a match within all workflows registered to wfl_name.",
677  "The search is done on both the workflow name and the names of steps registered to it."]))
678 
679  def complete_searchInWorkflow(self, text, line, start_idx, end_idx):
680  if text and len(text) > 0:
681  return [t for t in self.matrices_.keys() if t.startswith(text)]
682  else:
683  return self.matrices_.keys()
684 
685  def do_searchInWorkflow(self, arg):
686  args = arg.split()
687  if len(args) < 2:
688  print("searchInWorkflow name regexp")
689  return
690  if args[0] not in self.matrices_.keys():
691  print("Unknown workflow")
692  return
693  import re
694  pattern = None
695  try:
696  pattern = re.compile(args[1])
697  except:
698  print("Failed to compile regexp %s" % args[1])
699  return
700  counter = 0
701  for wfl in self.matrices_[args[0]].workFlows:
702  if re.match(pattern, wfl.nameId):
703  print("%s %s" % (Fore.BLUE + str(wfl.numId) + Fore.RESET,
704  Fore.GREEN + wfl.nameId + Fore.RESET))
705  counter +=1
706  print("Found %s compatible workflows inside %s" % (Fore.RED + str(counter) + Fore.RESET,
707  Fore.YELLOW + str(args[0])) + Fore.RESET)
708 
709  def help_search(self):
710  print("\n".join(["search search_regexp\n",
711  "This command will search for a match within all workflows registered.",
712  "The search is done on both the workflow name and the names of steps registered to it."]))
713 
714  def do_search(self, arg):
715  args = arg.split()
716  if len(args) < 1:
717  print("search regexp")
718  return
719  for wfl in self.matrices_.keys():
720  self.do_searchInWorkflow(' '.join([wfl, args[0]]))
721 
723  print("\n".join(["dumpWorkflowId [wfl-id1 [...]]\n",
724  "Dumps the details (cmsDriver commands for all steps) of the space-separated workflow-ids in input."]))
725 
726  def do_dumpWorkflowId(self, arg):
727  wflids = arg.split()
728  if len(wflids) == 0:
729  print("dumpWorkflowId [wfl-id1 [...]]")
730  return
731 
732  fmt = "[%s]: %s\n"
733  maxLen = 100
734  for wflid in wflids:
735  dump = True
736  for key, mrd in self.matrices_.items():
737  for wfl in mrd.workFlows:
738  if wfl.numId == float(wflid):
739  if dump:
740  dump = False
741  print(Fore.GREEN + str(wfl.numId) + Fore.RESET + " " + Fore.YELLOW + wfl.nameId + Fore.RESET)
742  for i,s in enumerate(wfl.cmds):
743  print(fmt % (Fore.RED + str(i+1) + Fore.RESET,
744  (str(s)+' ')))
745  print("\nWorkflow found in %s." % key)
746  else:
747  print("Workflow also found in %s." % key)
748 
749  do_EOF = do_exit
750 
751  TheMatrix(opt).cmdloop()
752  sys.exit(0)
753 
754  if opt.raw and opt.show:
755  ret = showRaw(opt)
756  else:
757  ret = runSelected(opt)
758 
759 
760  sys.exit(ret)
def performInjectionOptionTest(opt)
def runSelected(opt)
Definition: runTheMatrix.py:20
def do_predefined(self, arg)
def do_showWorkflow(self, arg)
def complete_showWorkflow(self, text, line, start_idx, end_idx)
def do_search(self, arg)
def showRaw(opt)
Definition: runTheMatrix.py:11
def do_clear(self, arg)
def help_dumpWorkflowId(self)
def complete_predefined(self, text, line, start_idx, end_idx)
def help_searchInWorkflow(self)
def do_dumpWorkflowId(self, arg)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
def __init__(self, opt)
static std::string join(char **cmd)
Definition: RemoteFile.cc:21
def do_exit(self, arg)
def default(self, inp)
def complete_searchInWorkflow(self, text, line, start_idx, end_idx)
def stepOrIndex(s)
def complete_runWorkflow(self, text, line, start_idx, end_idx)
def do_runWorkflow(self, arg)
#define str(s)
def do_jobs(self, args)
def do_searchInWorkflow(self, arg)