2 from __future__
import print_function
5 from Configuration.PyReleaseValidation.MatrixReader
import MatrixReader
6 from Configuration.PyReleaseValidation.MatrixRunner
import MatrixRunner
7 from Configuration.PyReleaseValidation.MatrixInjector
import MatrixInjector,performInjectionOptionTest
14 mrd.showRaw(opt.useInput, opt.refRel, opt.fromScratch, opt.raw, opt.step1Only, selected=opt.testList)
23 mrd.prepare(opt.useInput, opt.refRel, opt.fromScratch)
27 definedWf = [dwf.numId
for dwf
in mrd.workFlows]
28 definedSet = set(definedWf)
29 testSet = set(opt.testList)
30 undefSet = testSet - definedSet
31 if len(undefSet)>0:
raise ValueError(
'Undefined workflows: '+
', '.
join(
map(str,list(undefSet))))
32 duplicates = [wf
for wf
in testSet
if definedWf.count(wf)>1 ]
33 if len(duplicates)>0:
raise ValueError(
'Duplicated workflows: '+
', '.
join(
map(str,list(duplicates))))
37 mrd.show(opt.testList, opt.extended, opt.cafVeto)
38 if opt.testList :
print(
'selected items:', opt.testList)
40 mRunnerHi =
MatrixRunner(mrd.workFlows, opt.nProcs, opt.nThreads)
41 ret = mRunnerHi.runTests(opt)
45 print(
'Cannot go on with wmagent injection with failing workflows')
47 wfInjector =
MatrixInjector(opt,mode=opt.wmcontrol,options=opt.wmoptions)
48 ret= wfInjector.prepare(mrd,
57 if __name__ ==
'__main__':
139 'jetmc': [5.1, 13, 15, 25, 38, 39],
140 'metmc' : [5.1, 15, 25, 37, 38, 39],
141 'muonmc' : [5.1, 124.4, 124.5, 20, 21, 22, 23, 25, 30],
146 usage =
'usage: runTheMatrix.py --show -s ' 148 parser = argparse.ArgumentParser(usage,formatter_class=argparse.ArgumentDefaultsHelpFormatter)
150 parser.add_argument(
'-b',
'--batchName',
151 help=
'relval batch: suffix to be appended to Campaign name',
155 parser.add_argument(
'-m',
'--memoryOffset',
156 help=
'memory of the wf for single core',
161 parser.add_argument(
'--addMemPerCore',
162 help=
'increase of memory per each n > 1 core: memory(n_core) = memoryOffset + (n_core-1) * memPerCore',
167 parser.add_argument(
'-j',
'--nproc',
168 help=
'number of processes. 0 Will use 4 processes, not execute anything but create the wfs',
173 parser.add_argument(
'-t',
'--nThreads',
174 help=
'number of threads per process to use in cmsRun.',
179 parser.add_argument(
'--nStreams',
180 help=
'number of streams to use in cmsRun.',
185 parser.add_argument(
'--nEvents',
186 help=
'number of events to process in cmsRun. If 0 will use the standard 10 events.',
191 parser.add_argument(
'--numberEventsInLuminosityBlock',
192 help=
'number of events in a luminosity block',
193 dest=
'numberEventsInLuminosityBlock',
197 parser.add_argument(
'-n',
'--showMatrix',
198 help=
'Only show the worflows. Use --ext to show more',
203 parser.add_argument(
'-e',
'--extended',
204 help=
'Show details of workflows, used with --show',
209 parser.add_argument(
'-s',
'--selected',
210 help=
'Run a pre-defined selected matrix of wf. Deprecated, please use -l limited',
215 parser.add_argument(
'-l',
'--list',
216 help=
'Comma separated list of workflow to be shown or ran. Possible keys are also '+
str(predefinedSet.keys())+
'. and wild card like muon, or mc',
220 parser.add_argument(
'-f',
'--failed-from',
221 help=
'Provide a matrix report to specify the workflows to be run again. Augments the -l option if specified already',
225 parser.add_argument(
'-r',
'--raw',
226 help=
'Temporary dump the .txt needed for prodAgent interface. To be discontinued soon. Argument must be the name of the set (standard, pileup,...)',
229 parser.add_argument(
'-i',
'--useInput',
230 help=
'Use recyling where available. Either all, or a comma separated list of wf number.',
232 type=
lambda x: x.split(
','),
235 parser.add_argument(
'-w',
'--what',
236 help=
'Specify the set to be used. Argument must be the name of a set (standard, pileup,...) or multiple sets separated by commas (--what standard,pileup )',
240 parser.add_argument(
'--step1',
241 help=
'Used with --raw. Limit the production to step1',
245 parser.add_argument(
'--maxSteps',
246 help=
'Only run maximum on maxSteps. Used when we are only interested in first n steps.',
251 parser.add_argument(
'--fromScratch',
252 help=
'Comma separated list of wf to be run without recycling. all is not supported as default.',
254 type=
lambda x: x.split(
','),
257 parser.add_argument(
'--refRelease',
258 help=
'Allow to modify the recycling dataset version',
262 parser.add_argument(
'--wmcontrol',
263 help=
'Create the workflows for injection to WMAgent. In the WORKING. -wmcontrol init will create the the workflows, -wmcontrol test will dryRun a test, -wmcontrol submit will submit to wmagent',
264 choices=[
'init',
'test',
'submit',
'force'],
268 parser.add_argument(
'--revertDqmio',
269 help=
'When submitting workflows to wmcontrol, force DQM outout to use pool and not DQMIO',
270 choices=[
'yes',
'no'],
274 parser.add_argument(
'--optionswm',
275 help=
'Specify a few things for wm injection',
279 parser.add_argument(
'--keep',
280 help=
'allow to specify for which comma separated steps the output is needed',
283 parser.add_argument(
'--label',
284 help=
'allow to give a special label to the output dataset name',
287 parser.add_argument(
'--command',
288 help=
'provide a way to add additional command to all of the cmsDriver commands in the matrix',
293 parser.add_argument(
'--apply',
294 help=
'allow to use the --command only for 1 comma separeated',
298 parser.add_argument(
'--workflow',
299 help=
'define a workflow to be created or altered from the matrix',
304 parser.add_argument(
'--dryRun',
305 help=
'do not run the wf at all',
310 parser.add_argument(
'--testbed',
311 help=
'workflow injection to cmswebtest (you need dedicated rqmgr account)',
316 parser.add_argument(
'--noCafVeto',
317 help=
'Run from any source, ignoring the CAF label',
320 action=
'store_false')
322 parser.add_argument(
'--overWrite',
323 help=
'Change the content of a step for another. List of pairs.',
327 parser.add_argument(
'--noRun',
328 help=
'Remove all run list selection from wfs',
333 parser.add_argument(
'--das-options',
334 help=
'Options to be passed to dasgoclient.',
339 parser.add_argument(
'--job-reports',
340 help=
'Dump framework job reports',
345 parser.add_argument(
'--ibeos',
346 help=
'Use IB EOS site configuration',
351 parser.add_argument(
'--sites',
352 help=
'Run DAS query to get data from a specific site. Set it to empty string to search all sites.',
354 default=
'T2_CH_CERN',
357 parser.add_argument(
'--interactive',
358 help=
"Open the Matrix interactive shell",
362 parser.add_argument(
'--dbs-url',
363 help=
'Overwrite DbsUrl value in JSON submitted to ReqMgr2',
368 gpugroup = parser.add_argument_group(
'GPU-related options',
'These options are only meaningful when --gpu is used, and is not set to forbidden.')
370 gpugroup.add_argument(
'--gpu',
'--requires-gpu',
371 help=
'Enable GPU workflows. Possible options are "forbidden" (default), "required" (implied if no argument is given), or "optional".',
373 choices=[
'forbidden',
'optional',
'required'],
379 gpugroup.add_argument(
'--gpu-memory',
380 help=
'Specify the minimum amount of GPU memory required by the job, in MB.',
385 gpugroup.add_argument(
'--cuda-capabilities',
386 help=
'Specify a comma-separated list of CUDA "compute capabilities", or GPU hardware architectures, that the job can use.',
387 dest=
'CUDACapabilities',
388 type=
lambda x: x.split(
','),
389 default=
'6.0,6.1,6.2,7.0,7.2,7.5,8.0,8.6')
392 cudart_version =
None 393 libcudart = os.path.realpath(os.path.expandvars(
'$CMSSW_RELEASE_BASE/external/$SCRAM_ARCH/lib/libcudart.so'))
394 if os.path.isfile(libcudart):
395 cudart_basename = os.path.basename(libcudart)
396 cudart_version =
'.'.
join(cudart_basename.split(
'.')[2:4])
397 gpugroup.add_argument(
'--cuda-runtime',
398 help=
'Specify major and minor version of the CUDA runtime used to build the application.',
400 default=cudart_version)
402 gpugroup.add_argument(
'--force-gpu-name',
403 help=
'Request a specific GPU model, e.g. "Tesla T4" or "NVIDIA GeForce RTX 2080". The default behaviour is to accept any supported GPU.',
407 gpugroup.add_argument(
'--force-cuda-driver-version',
408 help=
'Request a specific CUDA driver version, e.g. 470.57.02. The default behaviour is to accept any supported CUDA driver version.',
409 dest=
'CUDADriverVersion',
412 gpugroup.add_argument(
'--force-cuda-runtime-version',
413 help=
'Request a specific CUDA runtime version, e.g. 11.4. The default behaviour is to accept any supported CUDA runtime version.',
414 dest=
'CUDARuntimeVersion',
417 opt = parser.parse_args()
418 if opt.command: opt.command =
' '.
join(opt.command)
419 os.environ[
"CMSSW_DAS_QUERY_SITES"]=opt.dasSites
422 with open(opt.failed_from,
'r') as report: 423 for report_line
in report:
424 if 'FAILED' in report_line:
425 to_run,_=report_line.split(
'_',1)
426 rerunthese.append(to_run)
428 opt.testList+=
','.
join([
'']+rerunthese)
430 opt.testList =
','.
join(rerunthese)
433 from subprocess
import getstatusoutput
as run_cmd
435 ibeos_cache = os.path.join(os.getenv(
"LOCALRT"),
"ibeos_cache.txt")
436 if not os.path.exists(ibeos_cache):
437 err, out = run_cmd(
"curl -L -s -o %s https://raw.githubusercontent.com/cms-sw/cms-sw.github.io/master/das_queries/ibeos.txt" % ibeos_cache)
439 run_cmd(
"rm -f %s" % ibeos_cache)
440 print(
"Error: Unable to download ibeos cache information")
444 for cmssw_env
in [
"CMSSW_BASE",
"CMSSW_RELEASE_BASE" ]:
445 cmssw_base = os.getenv(cmssw_env,
None)
446 if not cmssw_base:
continue 447 cmssw_base = os.path.join(cmssw_base,
"src/Utilities/General/ibeos")
448 if os.path.exists(cmssw_base):
449 os.environ[
"PATH"]=cmssw_base+
":"+os.getenv(
"PATH")
450 os.environ[
"CMS_PATH"]=
"/cvmfs/cms-ib.cern.ch" 451 os.environ[
"SITECONFIG_PATH"]=
"/cvmfs/cms-ib.cern.ch/SITECONF/local" 452 os.environ[
"CMSSW_USE_IBEOS"]=
"true" 453 print(
">> WARNING: You are using SITECONF from /cvmfs/cms-ib.cern.ch")
456 print(
'Deprecated, please use -l limited')
457 if opt.testList: opt.testList+=
',limited' 458 else: opt.testList=
'limited' 466 opt.apply=
map(stepOrIndex,opt.apply.split(
','))
468 opt.keep=
map(stepOrIndex,opt.keep.split(
','))
472 for entry
in opt.testList.split(
','):
473 if not entry:
continue 475 for k
in predefinedSet:
476 if k.lower().startswith(entry.lower())
or k.lower().endswith(entry.lower()):
477 testList.extend(predefinedSet[k])
482 testList.append(
float(entry))
484 print(entry,
'is not a possible selected entry')
486 opt.testList = list(set(testList))
491 opt.overWrite=eval(opt.overWrite)
494 from colorama
import Fore, Style
495 from os
import isatty
500 intro =
"Welcome to the Matrix (? for help)" 504 cmd.Cmd.__init__(self)
509 for what
in tmp.files:
510 what = what.replace(
'relval_',
'')
511 self.
opt_.what = what
514 self.
opt_.fromScratch)
518 """Clear the screen, put prompt at the top""" 522 print(
"Leaving the Matrix")
526 if inp ==
'x' or inp ==
'q':
529 is_pipe =
not isatty(sys.stdin.fileno())
530 print(Fore.RED +
"Error: " + Fore.RESET +
"unrecognized command.")
536 print(
"\n".
join([
"predefined [predef1 [...]]\n",
537 "Run w/o argument, it will print the list of known predefined workflows.",
538 "Run with space-separated predefined workflows, it will print the workflow-ids registered to them"]))
541 if text
and len(text) > 0:
542 return [t
for t
in predefinedSet.keys()
if t.startswith(text)]
544 return predefinedSet.keys()
547 """Print the list of predefined workflows""" 548 print(
"List of predefined workflows")
550 for w
in arg.split():
551 if w
in predefinedSet.keys():
552 print(
"Predefined Set: %s" % w)
553 print(predefinedSet[w])
555 print(
"Unknown Set: %s" % w)
557 print(
"[ " + Fore.RED +
", ".
join([
str(k)
for k
in predefinedSet.keys()]) + Fore.RESET +
" ]")
560 print(
"\n".
join([
"showWorkflow [workflow1 [...]]\n",
561 "Run w/o arguments, it will print the list of registered macro-workflows.",
562 "Run with space-separated workflows, it will print the full list of workflow-ids registered to them"]))
565 if text
and len(text) > 0:
566 return [t
for t
in self.
matrices_.
keys()
if t.startswith(text)]
572 print(
"Available workflows:")
574 print(Fore.RED + Style.BRIGHT + k)
575 print(Style.RESET_ALL)
577 selected = arg.split()
580 print(
"Unknown workflow %s: skipping" % k)
583 print(
"%s %s" % (Fore.BLUE +
str(wfl.numId) + Fore.RESET,
584 Fore.GREEN + wfl.nameId + Fore.RESET))
585 print(
"%s contains %d workflows" % (Fore.RED + k + Fore.RESET, len(self.
matrices_[k].workFlows)))
591 print(Fore.RED + Style.BRIGHT +
"Wrong number of parameters passed")
592 print(Style.RESET_ALL)
594 workflow_class = args[0]
595 workflow_id = args[1]
596 passed_down_args = list()
598 passed_down_args = args[2:]
599 print(Fore.YELLOW + Style.BRIGHT +
"Running with the following options:\n")
600 print(Fore.GREEN + Style.BRIGHT +
"Workflow class: {}".
format(workflow_class))
601 print(Fore.GREEN + Style.BRIGHT +
"Workflow ID: {}".
format(workflow_id))
602 print(Fore.GREEN + Style.BRIGHT +
"Additional runTheMatrix options: {}".
format(passed_down_args))
603 print(Style.RESET_ALL)
605 print(Fore.RED + Style.BRIGHT +
"Unknown workflow selected: {}".
format(workflow_class))
606 print(
"Available workflows:")
608 print(Fore.RED + Style.BRIGHT + k)
609 print(Style.RESET_ALL)
611 wflnums = [x.numId
for x
in self.
matrices_[workflow_class].workFlows]
612 if float(workflow_id)
not in wflnums:
613 print(Fore.RED + Style.BRIGHT +
"Unknown workflow {}".
format(workflow_id))
614 print(Fore.GREEN + Style.BRIGHT)
616 print(Style.RESET_ALL)
620 if self.
processes_[workflow_id][0].poll()
is None:
621 print(Fore.RED + Style.BRIGHT +
"Workflow {} already running!".
format(workflow_id))
622 print(Style.RESET_ALL)
626 lognames = [
'stdout',
'stderr']
627 logfiles =
tuple(
'%s_%s_%s.log' % (workflow_class, workflow_id, name)
for name
in lognames)
628 stdout = open(logfiles[0],
'w')
629 stderr = open(logfiles[1],
'w')
630 command = (
'runTheMatrix.py',
'-w', workflow_class,
'-l', workflow_id)
631 if len(passed_down_args) > 0:
632 command +=
tuple(passed_down_args)
634 p = subprocess.Popen(command,
637 self.
processes_[workflow_id] = (p, time.time())
641 if text
and len(text) > 0:
642 return [t
for t
in self.
matrices_.
keys()
if t.startswith(text)]
647 print(
"\n".
join([
"runWorkflow workflow_class workflow_id\n",
648 "This command will launch a new and independent process that invokes",
650 "runTheMatrix.py -w workflow_class -l workflow_id [runTheMatrix.py options]",
651 "\nYou can specify just one workflow_class and workflow_id per invocation.",
652 "The job will continue even after quitting the interactive session.",
653 "stdout and stderr of the new process will be automatically",
654 "redirected to 2 logfiles whose names contain the workflow_class",
655 "and workflow_id. Mutiple command can be issued one after the other.",
656 "The working directory of the new process will be the directory",
657 "from which the interactive session has started.",
658 "Autocompletion is available for workflow_class, but",
659 "not for workflow_id. Supplying a wrong workflow_class or",
660 "a non-existing workflow_id for a valid workflow_class",
661 "will trigger an error and no process will be invoked.",
662 "The interactive shell will keep track of all active processes",
663 "and will prevent the accidental resubmission of an already",
667 print(Fore.GREEN + Style.BRIGHT +
"List of jobs:")
670 print(Fore.YELLOW + Style.BRIGHT +
"Active job: {} since {:.2f} seconds.".
format(w, time.time() - self.
processes_[w][1]))
672 print(Fore.RED + Style.BRIGHT +
"Done job: {}".
format(w))
673 print(Style.RESET_ALL)
676 print(
"\n".
join([
"Print a full list of active and done jobs submitted",
677 "in the ongoing interactive session"]))
680 print(
"\n".
join([
"searchInWorkflow wfl_name search_regexp\n",
681 "This command will search for a match within all workflows registered to wfl_name.",
682 "The search is done on both the workflow name and the names of steps registered to it."]))
685 if text
and len(text) > 0:
686 return [t
for t
in self.
matrices_.
keys()
if t.startswith(text)]
693 print(
"searchInWorkflow name regexp")
696 print(
"Unknown workflow")
701 pattern = re.compile(args[1])
703 print(
"Failed to compile regexp %s" % args[1])
706 for wfl
in self.
matrices_[args[0]].workFlows:
707 if re.match(pattern, wfl.nameId):
708 print(
"%s %s" % (Fore.BLUE +
str(wfl.numId) + Fore.RESET,
709 Fore.GREEN + wfl.nameId + Fore.RESET))
711 print(
"Found %s compatible workflows inside %s" % (Fore.RED +
str(counter) + Fore.RESET,
712 Fore.YELLOW +
str(args[0])) + Fore.RESET)
715 print(
"\n".
join([
"search search_regexp\n",
716 "This command will search for a match within all workflows registered.",
717 "The search is done on both the workflow name and the names of steps registered to it."]))
722 print(
"search regexp")
728 print(
"\n".
join([
"dumpWorkflowId [wfl-id1 [...]]\n",
729 "Dumps the details (cmsDriver commands for all steps) of the space-separated workflow-ids in input."]))
734 print(
"dumpWorkflowId [wfl-id1 [...]]")
742 for wfl
in mrd.workFlows:
743 if wfl.numId ==
float(wflid):
746 print(Fore.GREEN +
str(wfl.numId) + Fore.RESET +
" " + Fore.YELLOW + wfl.nameId + Fore.RESET)
747 for i,s
in enumerate(wfl.cmds):
748 print(fmt % (Fore.RED +
str(i+1) + Fore.RESET,
750 print(
"\nWorkflow found in %s." % key)
752 print(
"Workflow also found in %s." % key)
759 if opt.raw
and opt.show:
def performInjectionOptionTest(opt)
def do_predefined(self, arg)
def do_showWorkflow(self, arg)
def complete_showWorkflow(self, text, line, start_idx, end_idx)
def help_showWorkflow(self)
def help_dumpWorkflowId(self)
def complete_predefined(self, text, line, start_idx, end_idx)
def help_searchInWorkflow(self)
def do_dumpWorkflowId(self, arg)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
def help_runWorkflow(self)
def help_predefined(self)
static std::string join(char **cmd)
def complete_searchInWorkflow(self, text, line, start_idx, end_idx)
def complete_runWorkflow(self, text, line, start_idx, end_idx)
def do_runWorkflow(self, arg)
def do_searchInWorkflow(self, arg)