2 from __future__
import print_function
5 from Configuration.PyReleaseValidation.MatrixReader
import MatrixReader
6 from Configuration.PyReleaseValidation.MatrixRunner
import MatrixRunner
7 from Configuration.PyReleaseValidation.MatrixInjector
import MatrixInjector,performInjectionOptionTest
14 mrd.showRaw(opt.useInput, opt.refRel, opt.fromScratch, opt.raw, opt.step1Only, selected=opt.testList)
23 mrd.prepare(opt.useInput, opt.refRel, opt.fromScratch)
27 definedWf = [dwf.numId
for dwf
in mrd.workFlows]
28 definedSet = set(definedWf)
29 testSet = set(opt.testList)
30 undefSet = testSet - definedSet
31 if len(undefSet)>0:
raise ValueError(
'Undefined workflows: '+
', '.
join(
map(str,list(undefSet))))
32 duplicates = [wf
for wf
in testSet
if definedWf.count(wf)>1 ]
33 if len(duplicates)>0:
raise ValueError(
'Duplicated workflows: '+
', '.
join(
map(str,list(duplicates))))
37 mrd.show(opt.testList, opt.extended, opt.cafVeto)
38 if opt.testList :
print(
'testListected items:', opt.testList)
40 mRunnerHi =
MatrixRunner(mrd.workFlows, opt.nProcs, opt.nThreads)
41 ret = mRunnerHi.runTests(opt)
45 print(
'Cannot go on with wmagent injection with failing workflows')
47 wfInjector =
MatrixInjector(opt,mode=opt.wmcontrol,options=opt.wmoptions)
48 ret= wfInjector.prepare(mrd,
57 if __name__ ==
'__main__':
140 'jetmc': [5.1, 13, 15, 25, 38, 39],
141 'metmc' : [5.1, 15, 25, 37, 38, 39],
142 'muonmc' : [5.1, 124.4, 124.5, 20, 21, 22, 23, 25, 30],
147 usage =
'usage: runTheMatrix.py --show -s ' 149 parser = argparse.ArgumentParser(usage,formatter_class=argparse.ArgumentDefaultsHelpFormatter)
151 parser.add_argument(
'-b',
'--batchName',
152 help=
'relval batch: suffix to be appended to Campaign name',
156 parser.add_argument(
'-m',
'--memoryOffset',
157 help=
'memory of the wf for single core',
162 parser.add_argument(
'--addMemPerCore',
163 help=
'increase of memory per each n > 1 core: memory(n_core) = memoryOffset + (n_core-1) * memPerCore',
168 parser.add_argument(
'-j',
'--nproc',
169 help=
'number of processes. 0 Will use 4 processes, not execute anything but create the wfs',
174 parser.add_argument(
'-t',
'--nThreads',
175 help=
'number of threads per process to use in cmsRun.',
180 parser.add_argument(
'--nStreams',
181 help=
'number of streams to use in cmsRun.',
186 parser.add_argument(
'--nEvents',
187 help=
'number of events to process in cmsRun. If 0 will use the standard 10 events.',
192 parser.add_argument(
'--numberEventsInLuminosityBlock',
193 help=
'number of events in a luminosity block',
194 dest=
'numberEventsInLuminosityBlock',
198 parser.add_argument(
'-n',
'--showMatrix',
199 help=
'Only show the worflows. Use --ext to show more',
204 parser.add_argument(
'-e',
'--extended',
205 help=
'Show details of workflows, used with --show',
210 parser.add_argument(
'-s',
'--selected',
211 help=
'Run a pre-defined selected matrix of wf. Deprecated, please use -l limited',
216 parser.add_argument(
'-l',
'--list',
217 help=
'Comma separated list of workflow to be shown or ran. Possible keys are also '+
str(predefinedSet.keys())+
'. and wild card like muon, or mc',
221 parser.add_argument(
'-f',
'--failed-from',
222 help=
'Provide a matrix report to specify the workflows to be run again. Augments the -l option if specified already',
226 parser.add_argument(
'-r',
'--raw',
227 help=
'Temporary dump the .txt needed for prodAgent interface. To be discontinued soon. Argument must be the name of the set (standard, pileup,...)',
230 parser.add_argument(
'-i',
'--useInput',
231 help=
'Use recyling where available. Either all, or a comma separated list of wf number.',
233 type=
lambda x: x.split(
','),
236 parser.add_argument(
'-w',
'--what',
237 help=
'Specify the set to be used. Argument must be the name of a set (standard, pileup,...) or multiple sets separated by commas (--what standard,pileup )',
241 parser.add_argument(
'--step1',
242 help=
'Used with --raw. Limit the production to step1',
246 parser.add_argument(
'--maxSteps',
247 help=
'Only run maximum on maxSteps. Used when we are only interested in first n steps.',
252 parser.add_argument(
'--fromScratch',
253 help=
'Comma separated list of wf to be run without recycling. all is not supported as default.',
255 type=
lambda x: x.split(
','),
258 parser.add_argument(
'--refRelease',
259 help=
'Allow to modify the recycling dataset version',
263 parser.add_argument(
'--wmcontrol',
264 help=
'Create the workflows for injection to WMAgent. In the WORKING. -wmcontrol init will create the the workflows, -wmcontrol test will dryRun a test, -wmcontrol submit will submit to wmagent',
265 choices=[
'init',
'test',
'submit',
'force'],
269 parser.add_argument(
'--revertDqmio',
270 help=
'When submitting workflows to wmcontrol, force DQM outout to use pool and not DQMIO',
271 choices=[
'yes',
'no'],
275 parser.add_argument(
'--optionswm',
276 help=
'Specify a few things for wm injection',
280 parser.add_argument(
'--keep',
281 help=
'allow to specify for which comma separated steps the output is needed',
284 parser.add_argument(
'--label',
285 help=
'allow to give a special label to the output dataset name',
288 parser.add_argument(
'--command',
289 help=
'provide a way to add additional command to all of the cmsDriver commands in the matrix',
294 parser.add_argument(
'--apply',
295 help=
'allow to use the --command only for 1 comma separeated',
299 parser.add_argument(
'--workflow',
300 help=
'define a workflow to be created or altered from the matrix',
305 parser.add_argument(
'--dryRun',
306 help=
'do not run the wf at all',
311 parser.add_argument(
'--testbed',
312 help=
'workflow injection to cmswebtest (you need dedicated rqmgr account)',
317 parser.add_argument(
'--noCafVeto',
318 help=
'Run from any source, ignoring the CAF label',
321 action=
'store_false')
323 parser.add_argument(
'--overWrite',
324 help=
'Change the content of a step for another. List of pairs.',
328 parser.add_argument(
'--noRun',
329 help=
'Remove all run list selection from wfs',
334 parser.add_argument(
'--das-options',
335 help=
'Options to be passed to dasgoclient.',
340 parser.add_argument(
'--job-reports',
341 help=
'Dump framework job reports',
346 parser.add_argument(
'--ibeos',
347 help=
'Use IB EOS site configuration',
352 parser.add_argument(
'--sites',
353 help=
'Run DAS query to get data from a specific site. Set it to empty string to search all sites.',
355 default=
'T2_CH_CERN',
358 parser.add_argument(
'--interactive',
359 help=
"Open the Matrix interactive shell",
363 parser.add_argument(
'--dbs-url',
364 help=
'Overwrite DbsUrl value in JSON submitted to ReqMgr2',
369 gpugroup = parser.add_argument_group(
'GPU-related options',
'These options are only meaningful when --gpu is used, and is not set to forbidden.')
371 gpugroup.add_argument(
'--gpu',
'--requires-gpu',
372 help=
'Enable GPU workflows. Possible options are "forbidden" (default), "required" (implied if no argument is given), or "optional".',
374 choices=[
'forbidden',
'optional',
'required'],
380 gpugroup.add_argument(
'--gpu-memory',
381 help=
'Specify the minimum amount of GPU memory required by the job, in MB.',
386 gpugroup.add_argument(
'--cuda-capabilities',
387 help=
'Specify a comma-separated list of CUDA "compute capabilities", or GPU hardware architectures, that the job can use.',
388 dest=
'CUDACapabilities',
389 type=
lambda x: x.split(
','),
390 default=
'6.0,6.1,6.2,7.0,7.2,7.5,8.0,8.6')
393 cudart_version =
None 394 libcudart = os.path.realpath(os.path.expandvars(
'$CMSSW_RELEASE_BASE/external/$SCRAM_ARCH/lib/libcudart.so'))
395 if os.path.isfile(libcudart):
396 cudart_basename = os.path.basename(libcudart)
397 cudart_version =
'.'.
join(cudart_basename.split(
'.')[2:4])
398 gpugroup.add_argument(
'--cuda-runtime',
399 help=
'Specify major and minor version of the CUDA runtime used to build the application.',
401 default=cudart_version)
403 gpugroup.add_argument(
'--force-gpu-name',
404 help=
'Request a specific GPU model, e.g. "Tesla T4" or "NVIDIA GeForce RTX 2080". The default behaviour is to accept any supported GPU.',
408 gpugroup.add_argument(
'--force-cuda-driver-version',
409 help=
'Request a specific CUDA driver version, e.g. 470.57.02. The default behaviour is to accept any supported CUDA driver version.',
410 dest=
'CUDADriverVersion',
413 gpugroup.add_argument(
'--force-cuda-runtime-version',
414 help=
'Request a specific CUDA runtime version, e.g. 11.4. The default behaviour is to accept any supported CUDA runtime version.',
415 dest=
'CUDARuntimeVersion',
418 opt = parser.parse_args()
419 if opt.command: opt.command =
' '.
join(opt.command)
420 os.environ[
"CMSSW_DAS_QUERY_SITES"]=opt.dasSites
423 with open(opt.failed_from,
'r') as report: 424 for report_line
in report:
425 if 'FAILED' in report_line:
426 to_run,_=report_line.split(
'_',1)
427 rerunthese.append(to_run)
429 opt.testList+=
','.
join([
'']+rerunthese)
431 opt.testList =
','.
join(rerunthese)
434 from subprocess
import getstatusoutput
as run_cmd
436 ibeos_cache = os.path.join(os.getenv(
"LOCALRT"),
"ibeos_cache.txt")
437 if not os.path.exists(ibeos_cache):
438 err, out = run_cmd(
"curl -L -s -o %s https://raw.githubusercontent.com/cms-sw/cms-sw.github.io/master/das_queries/ibeos.txt" % ibeos_cache)
440 run_cmd(
"rm -f %s" % ibeos_cache)
441 print(
"Error: Unable to download ibeos cache information")
445 for cmssw_env
in [
"CMSSW_BASE",
"CMSSW_RELEASE_BASE" ]:
446 cmssw_base = os.getenv(cmssw_env,
None)
447 if not cmssw_base:
continue 448 cmssw_base = os.path.join(cmssw_base,
"src/Utilities/General/ibeos")
449 if os.path.exists(cmssw_base):
450 os.environ[
"PATH"]=cmssw_base+
":"+os.getenv(
"PATH")
451 os.environ[
"CMS_PATH"]=
"/cvmfs/cms-ib.cern.ch" 452 os.environ[
"SITECONFIG_PATH"]=
"/cvmfs/cms-ib.cern.ch/SITECONF/local" 453 os.environ[
"CMSSW_USE_IBEOS"]=
"true" 454 print(
">> WARNING: You are using SITECONF from /cvmfs/cms-ib.cern.ch")
457 print(
'Deprecated, please use -l limited')
458 if opt.testList: opt.testList+=
',limited' 459 else: opt.testList=
'limited' 467 opt.apply=
map(stepOrIndex,opt.apply.split(
','))
469 opt.keep=
map(stepOrIndex,opt.keep.split(
','))
473 for entry
in opt.testList.split(
','):
474 if not entry:
continue 476 for k
in predefinedSet:
477 if k.lower().startswith(entry.lower())
or k.lower().endswith(entry.lower()):
478 testList.extend(predefinedSet[k])
483 testList.append(
float(entry))
485 print(entry,
'is not a possible selected entry')
487 opt.testList = list(set(testList))
492 opt.overWrite=eval(opt.overWrite)
495 from colorama
import Fore, Style
496 from os
import isatty
501 intro =
"Welcome to the Matrix (? for help)" 505 cmd.Cmd.__init__(self)
510 for what
in tmp.files:
511 what = what.replace(
'relval_',
'')
512 self.
opt_.what = what
515 self.
opt_.fromScratch)
519 """Clear the screen, put prompt at the top""" 523 print(
"Leaving the Matrix")
527 if inp ==
'x' or inp ==
'q':
530 is_pipe =
not isatty(sys.stdin.fileno())
531 print(Fore.RED +
"Error: " + Fore.RESET +
"unrecognized command.")
537 print(
"\n".
join([
"predefined [predef1 [...]]\n",
538 "Run w/o argument, it will print the list of known predefined workflows.",
539 "Run with space-separated predefined workflows, it will print the workflow-ids registered to them"]))
542 if text
and len(text) > 0:
543 return [t
for t
in predefinedSet.keys()
if t.startswith(text)]
545 return predefinedSet.keys()
548 """Print the list of predefined workflows""" 549 print(
"List of predefined workflows")
551 for w
in arg.split():
552 if w
in predefinedSet.keys():
553 print(
"Predefined Set: %s" % w)
554 print(predefinedSet[w])
556 print(
"Unknown Set: %s" % w)
558 print(
"[ " + Fore.RED +
", ".
join([
str(k)
for k
in predefinedSet.keys()]) + Fore.RESET +
" ]")
561 print(
"\n".
join([
"showWorkflow [workflow1 [...]]\n",
562 "Run w/o arguments, it will print the list of registered macro-workflows.",
563 "Run with space-separated workflows, it will print the full list of workflow-ids registered to them"]))
566 if text
and len(text) > 0:
567 return [t
for t
in self.
matrices_.
keys()
if t.startswith(text)]
573 print(
"Available workflows:")
575 print(Fore.RED + Style.BRIGHT + k)
576 print(Style.RESET_ALL)
578 selected = arg.split()
581 print(
"Unknown workflow %s: skipping" % k)
584 print(
"%s %s" % (Fore.BLUE +
str(wfl.numId) + Fore.RESET,
585 Fore.GREEN + wfl.nameId + Fore.RESET))
586 print(
"%s contains %d workflows" % (Fore.RED + k + Fore.RESET, len(self.
matrices_[k].workFlows)))
592 print(Fore.RED + Style.BRIGHT +
"Wrong number of parameters passed")
593 print(Style.RESET_ALL)
595 workflow_class = args[0]
596 workflow_id = args[1]
597 passed_down_args = list()
599 passed_down_args = args[2:]
600 print(Fore.YELLOW + Style.BRIGHT +
"Running with the following options:\n")
601 print(Fore.GREEN + Style.BRIGHT +
"Workflow class: {}".
format(workflow_class))
602 print(Fore.GREEN + Style.BRIGHT +
"Workflow ID: {}".
format(workflow_id))
603 print(Fore.GREEN + Style.BRIGHT +
"Additional runTheMatrix options: {}".
format(passed_down_args))
604 print(Style.RESET_ALL)
606 print(Fore.RED + Style.BRIGHT +
"Unknown workflow selected: {}".
format(workflow_class))
607 print(
"Available workflows:")
609 print(Fore.RED + Style.BRIGHT + k)
610 print(Style.RESET_ALL)
612 wflnums = [x.numId
for x
in self.
matrices_[workflow_class].workFlows]
613 if float(workflow_id)
not in wflnums:
614 print(Fore.RED + Style.BRIGHT +
"Unknown workflow {}".
format(workflow_id))
615 print(Fore.GREEN + Style.BRIGHT)
617 print(Style.RESET_ALL)
621 if self.
processes_[workflow_id][0].poll()
is None:
622 print(Fore.RED + Style.BRIGHT +
"Workflow {} already running!".
format(workflow_id))
623 print(Style.RESET_ALL)
627 lognames = [
'stdout',
'stderr']
628 logfiles =
tuple(
'%s_%s_%s.log' % (workflow_class, workflow_id, name)
for name
in lognames)
629 stdout = open(logfiles[0],
'w')
630 stderr = open(logfiles[1],
'w')
631 command = (
'runTheMatrix.py',
'-w', workflow_class,
'-l', workflow_id)
632 if len(passed_down_args) > 0:
633 command +=
tuple(passed_down_args)
635 p = subprocess.Popen(command,
638 self.
processes_[workflow_id] = (p, time.time())
642 if text
and len(text) > 0:
643 return [t
for t
in self.
matrices_.
keys()
if t.startswith(text)]
648 print(
"\n".
join([
"runWorkflow workflow_class workflow_id\n",
649 "This command will launch a new and independent process that invokes",
651 "runTheMatrix.py -w workflow_class -l workflow_id [runTheMatrix.py options]",
652 "\nYou can specify just one workflow_class and workflow_id per invocation.",
653 "The job will continue even after quitting the interactive session.",
654 "stdout and stderr of the new process will be automatically",
655 "redirected to 2 logfiles whose names contain the workflow_class",
656 "and workflow_id. Mutiple command can be issued one after the other.",
657 "The working directory of the new process will be the directory",
658 "from which the interactive session has started.",
659 "Autocompletion is available for workflow_class, but",
660 "not for workflow_id. Supplying a wrong workflow_class or",
661 "a non-existing workflow_id for a valid workflow_class",
662 "will trigger an error and no process will be invoked.",
663 "The interactive shell will keep track of all active processes",
664 "and will prevent the accidental resubmission of an already",
668 print(Fore.GREEN + Style.BRIGHT +
"List of jobs:")
671 print(Fore.YELLOW + Style.BRIGHT +
"Active job: {} since {:.2f} seconds.".
format(w, time.time() - self.
processes_[w][1]))
673 print(Fore.RED + Style.BRIGHT +
"Done job: {}".
format(w))
674 print(Style.RESET_ALL)
677 print(
"\n".
join([
"Print a full list of active and done jobs submitted",
678 "in the ongoing interactive session"]))
681 print(
"\n".
join([
"searchInWorkflow wfl_name search_regexp\n",
682 "This command will search for a match within all workflows registered to wfl_name.",
683 "The search is done on both the workflow name and the names of steps registered to it."]))
686 if text
and len(text) > 0:
687 return [t
for t
in self.
matrices_.
keys()
if t.startswith(text)]
694 print(
"searchInWorkflow name regexp")
697 print(
"Unknown workflow")
702 pattern = re.compile(args[1])
704 print(
"Failed to compile regexp %s" % args[1])
707 for wfl
in self.
matrices_[args[0]].workFlows:
708 if re.match(pattern, wfl.nameId):
709 print(
"%s %s" % (Fore.BLUE +
str(wfl.numId) + Fore.RESET,
710 Fore.GREEN + wfl.nameId + Fore.RESET))
712 print(
"Found %s compatible workflows inside %s" % (Fore.RED +
str(counter) + Fore.RESET,
713 Fore.YELLOW +
str(args[0])) + Fore.RESET)
716 print(
"\n".
join([
"search search_regexp\n",
717 "This command will search for a match within all workflows registered.",
718 "The search is done on both the workflow name and the names of steps registered to it."]))
723 print(
"search regexp")
729 print(
"\n".
join([
"dumpWorkflowId [wfl-id1 [...]]\n",
730 "Dumps the details (cmsDriver commands for all steps) of the space-separated workflow-ids in input."]))
735 print(
"dumpWorkflowId [wfl-id1 [...]]")
743 for wfl
in mrd.workFlows:
744 if wfl.numId ==
float(wflid):
747 print(Fore.GREEN +
str(wfl.numId) + Fore.RESET +
" " + Fore.YELLOW + wfl.nameId + Fore.RESET)
748 for i,s
in enumerate(wfl.cmds):
749 print(fmt % (Fore.RED +
str(i+1) + Fore.RESET,
751 print(
"\nWorkflow found in %s." % key)
753 print(
"Workflow also found in %s." % key)
760 if opt.raw
and opt.show:
def performInjectionOptionTest(opt)
def do_predefined(self, arg)
def do_showWorkflow(self, arg)
def complete_showWorkflow(self, text, line, start_idx, end_idx)
def help_showWorkflow(self)
def help_dumpWorkflowId(self)
def complete_predefined(self, text, line, start_idx, end_idx)
def help_searchInWorkflow(self)
def do_dumpWorkflowId(self, arg)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
def help_runWorkflow(self)
def help_predefined(self)
static std::string join(char **cmd)
def complete_searchInWorkflow(self, text, line, start_idx, end_idx)
def complete_runWorkflow(self, text, line, start_idx, end_idx)
def do_runWorkflow(self, arg)
def do_searchInWorkflow(self, arg)