2 from __future__
import print_function
5 from Configuration.PyReleaseValidation.MatrixReader
import MatrixReader
6 from Configuration.PyReleaseValidation.MatrixRunner
import MatrixRunner
7 from Configuration.PyReleaseValidation.MatrixInjector
import MatrixInjector,performInjectionOptionTest
14 mrd.showRaw(opt.useInput, opt.refRel, opt.fromScratch, opt.raw, opt.step1Only, selected=opt.testList)
23 mrd.prepare(opt.useInput, opt.refRel, opt.fromScratch)
27 definedWf = [dwf.numId
for dwf
in mrd.workFlows]
28 definedSet = set(definedWf)
29 testSet = set(opt.testList)
30 undefSet = testSet - definedSet
31 if len(undefSet)>0:
raise ValueError(
'Undefined workflows: '+
', '.
join(
map(str,list(undefSet))))
32 duplicates = [wf
for wf
in testSet
if definedWf.count(wf)>1 ]
33 if len(duplicates)>0:
raise ValueError(
'Duplicated workflows: '+
', '.
join(
map(str,list(duplicates))))
37 mrd.show(opt.testList, opt.extended, opt.cafVeto)
38 if opt.testList :
print(
'testListected items:', opt.testList)
40 mRunnerHi =
MatrixRunner(mrd.workFlows, opt.nProcs, opt.nThreads)
41 ret = mRunnerHi.runTests(opt)
45 print(
'Cannot go on with wmagent injection with failing workflows')
47 wfInjector =
MatrixInjector(opt,mode=opt.wmcontrol,options=opt.wmoptions)
48 ret= wfInjector.prepare(mrd,
57 if __name__ ==
'__main__':
138 'jetmc': [5.1, 13, 15, 25, 38, 39],
139 'metmc' : [5.1, 15, 25, 37, 38, 39],
140 'muonmc' : [5.1, 124.4, 124.5, 20, 21, 22, 23, 25, 30],
145 usage =
'usage: runTheMatrix.py --show -s ' 147 parser = argparse.ArgumentParser(usage,formatter_class=argparse.ArgumentDefaultsHelpFormatter)
149 parser.add_argument(
'-b',
'--batchName',
150 help=
'relval batch: suffix to be appended to Campaign name',
154 parser.add_argument(
'-m',
'--memoryOffset',
155 help=
'memory of the wf for single core',
160 parser.add_argument(
'--addMemPerCore',
161 help=
'increase of memory per each n > 1 core: memory(n_core) = memoryOffset + (n_core-1) * memPerCore',
166 parser.add_argument(
'-j',
'--nproc',
167 help=
'number of processes. 0 Will use 4 processes, not execute anything but create the wfs',
172 parser.add_argument(
'-t',
'--nThreads',
173 help=
'number of threads per process to use in cmsRun.',
178 parser.add_argument(
'--nStreams',
179 help=
'number of streams to use in cmsRun.',
184 parser.add_argument(
'--nEvents',
185 help=
'number of events to process in cmsRun. If 0 will use the standard 10 events.',
190 parser.add_argument(
'--numberEventsInLuminosityBlock',
191 help=
'number of events in a luminosity block',
192 dest=
'numberEventsInLuminosityBlock',
196 parser.add_argument(
'-n',
'--showMatrix',
197 help=
'Only show the worflows. Use --ext to show more',
202 parser.add_argument(
'-e',
'--extended',
203 help=
'Show details of workflows, used with --show',
208 parser.add_argument(
'-s',
'--selected',
209 help=
'Run a pre-defined selected matrix of wf. Deprecated, please use -l limited',
214 parser.add_argument(
'-l',
'--list',
215 help=
'Comma separated list of workflow to be shown or ran. Possible keys are also '+
str(predefinedSet.keys())+
'. and wild card like muon, or mc',
219 parser.add_argument(
'-f',
'--failed-from',
220 help=
'Provide a matrix report to specify the workflows to be run again. Augments the -l option if specified already',
224 parser.add_argument(
'-r',
'--raw',
225 help=
'Temporary dump the .txt needed for prodAgent interface. To be discontinued soon. Argument must be the name of the set (standard, pileup,...)',
228 parser.add_argument(
'-i',
'--useInput',
229 help=
'Use recyling where available. Either all, or a comma separated list of wf number.',
231 type=
lambda x: x.split(
','),
234 parser.add_argument(
'-w',
'--what',
235 help=
'Specify the set to be used. Argument must be the name of a set (standard, pileup,...) or multiple sets separated by commas (--what standard,pileup )',
239 parser.add_argument(
'--step1',
240 help=
'Used with --raw. Limit the production to step1',
244 parser.add_argument(
'--maxSteps',
245 help=
'Only run maximum on maxSteps. Used when we are only interested in first n steps.',
250 parser.add_argument(
'--fromScratch',
251 help=
'Comma separated list of wf to be run without recycling. all is not supported as default.',
253 type=
lambda x: x.split(
','),
256 parser.add_argument(
'--refRelease',
257 help=
'Allow to modify the recycling dataset version',
261 parser.add_argument(
'--wmcontrol',
262 help=
'Create the workflows for injection to WMAgent. In the WORKING. -wmcontrol init will create the the workflows, -wmcontrol test will dryRun a test, -wmcontrol submit will submit to wmagent',
263 choices=[
'init',
'test',
'submit',
'force'],
267 parser.add_argument(
'--revertDqmio',
268 help=
'When submitting workflows to wmcontrol, force DQM outout to use pool and not DQMIO',
269 choices=[
'yes',
'no'],
273 parser.add_argument(
'--optionswm',
274 help=
'Specify a few things for wm injection',
278 parser.add_argument(
'--keep',
279 help=
'allow to specify for which comma separated steps the output is needed',
282 parser.add_argument(
'--label',
283 help=
'allow to give a special label to the output dataset name',
286 parser.add_argument(
'--command',
287 help=
'provide a way to add additional command to all of the cmsDriver commands in the matrix',
292 parser.add_argument(
'--apply',
293 help=
'allow to use the --command only for 1 comma separeated',
297 parser.add_argument(
'--workflow',
298 help=
'define a workflow to be created or altered from the matrix',
303 parser.add_argument(
'--dryRun',
304 help=
'do not run the wf at all',
309 parser.add_argument(
'--testbed',
310 help=
'workflow injection to cmswebtest (you need dedicated rqmgr account)',
315 parser.add_argument(
'--noCafVeto',
316 help=
'Run from any source, ignoring the CAF label',
319 action=
'store_false')
321 parser.add_argument(
'--overWrite',
322 help=
'Change the content of a step for another. List of pairs.',
326 parser.add_argument(
'--noRun',
327 help=
'Remove all run list selection from wfs',
332 parser.add_argument(
'--das-options',
333 help=
'Options to be passed to dasgoclient.',
338 parser.add_argument(
'--job-reports',
339 help=
'Dump framework job reports',
344 parser.add_argument(
'--ibeos',
345 help=
'Use IB EOS site configuration',
350 parser.add_argument(
'--sites',
351 help=
'Run DAS query to get data from a specific site. Set it to empty string to search all sites.',
353 default=
'T2_CH_CERN',
356 parser.add_argument(
'--interactive',
357 help=
"Open the Matrix interactive shell",
361 parser.add_argument(
'--dbs-url',
362 help=
'Overwrite DbsUrl value in JSON submitted to ReqMgr2',
367 gpugroup = parser.add_argument_group(
'GPU-related options',
'These options are only meaningful when --gpu is used, and is not set to forbidden.')
369 gpugroup.add_argument(
'--gpu',
'--requires-gpu',
370 help=
'Enable GPU workflows. Possible options are "forbidden" (default), "required" (implied if no argument is given), or "optional".',
372 choices=[
'forbidden',
'optional',
'required'],
378 gpugroup.add_argument(
'--gpu-memory',
379 help=
'Specify the minimum amount of GPU memory required by the job, in MB.',
384 gpugroup.add_argument(
'--cuda-capabilities',
385 help=
'Specify a comma-separated list of CUDA "compute capabilities", or GPU hardware architectures, that the job can use.',
386 dest=
'CUDACapabilities',
387 type=
lambda x: x.split(
','),
388 default=
'6.0,6.1,6.2,7.0,7.2,7.5,8.0,8.6')
391 cudart_version =
None 392 libcudart = os.path.realpath(os.path.expandvars(
'$CMSSW_RELEASE_BASE/external/$SCRAM_ARCH/lib/libcudart.so'))
393 if os.path.isfile(libcudart):
394 cudart_basename = os.path.basename(libcudart)
395 cudart_version =
'.'.
join(cudart_basename.split(
'.')[2:4])
396 gpugroup.add_argument(
'--cuda-runtime',
397 help=
'Specify major and minor version of the CUDA runtime used to build the application.',
399 default=cudart_version)
401 gpugroup.add_argument(
'--force-gpu-name',
402 help=
'Request a specific GPU model, e.g. "Tesla T4" or "NVIDIA GeForce RTX 2080". The default behaviour is to accept any supported GPU.',
406 gpugroup.add_argument(
'--force-cuda-driver-version',
407 help=
'Request a specific CUDA driver version, e.g. 470.57.02. The default behaviour is to accept any supported CUDA driver version.',
408 dest=
'CUDADriverVersion',
411 gpugroup.add_argument(
'--force-cuda-runtime-version',
412 help=
'Request a specific CUDA runtime version, e.g. 11.4. The default behaviour is to accept any supported CUDA runtime version.',
413 dest=
'CUDARuntimeVersion',
416 opt = parser.parse_args()
417 if opt.command: opt.command =
' '.
join(opt.command)
418 os.environ[
"CMSSW_DAS_QUERY_SITES"]=opt.dasSites
421 with open(opt.failed_from,
'r') as report: 422 for report_line
in report:
423 if 'FAILED' in report_line:
424 to_run,_=report_line.split(
'_',1)
425 rerunthese.append(to_run)
427 opt.testList+=
','.
join([
'']+rerunthese)
429 opt.testList =
','.
join(rerunthese)
432 from subprocess
import getstatusoutput
as run_cmd
434 ibeos_cache = os.path.join(os.getenv(
"LOCALRT"),
"ibeos_cache.txt")
435 if not os.path.exists(ibeos_cache):
436 err, out = run_cmd(
"curl -L -s -o %s https://raw.githubusercontent.com/cms-sw/cms-sw.github.io/master/das_queries/ibeos.txt" % ibeos_cache)
438 run_cmd(
"rm -f %s" % ibeos_cache)
439 print(
"Error: Unable to download ibeos cache information")
443 for cmssw_env
in [
"CMSSW_BASE",
"CMSSW_RELEASE_BASE" ]:
444 cmssw_base = os.getenv(cmssw_env,
None)
445 if not cmssw_base:
continue 446 cmssw_base = os.path.join(cmssw_base,
"src/Utilities/General/ibeos")
447 if os.path.exists(cmssw_base):
448 os.environ[
"PATH"]=cmssw_base+
":"+os.getenv(
"PATH")
449 os.environ[
"CMS_PATH"]=
"/cvmfs/cms-ib.cern.ch" 450 os.environ[
"SITECONFIG_PATH"]=
"/cvmfs/cms-ib.cern.ch/SITECONF/local" 451 os.environ[
"CMSSW_USE_IBEOS"]=
"true" 452 print(
">> WARNING: You are using SITECONF from /cvmfs/cms-ib.cern.ch")
455 print(
'Deprecated, please use -l limited')
456 if opt.testList: opt.testList+=
',limited' 457 else: opt.testList=
'limited' 465 opt.apply=
map(stepOrIndex,opt.apply.split(
','))
467 opt.keep=
map(stepOrIndex,opt.keep.split(
','))
471 for entry
in opt.testList.split(
','):
472 if not entry:
continue 474 for k
in predefinedSet:
475 if k.lower().startswith(entry.lower())
or k.lower().endswith(entry.lower()):
476 testList.extend(predefinedSet[k])
481 testList.append(
float(entry))
483 print(entry,
'is not a possible selected entry')
485 opt.testList = list(set(testList))
490 opt.overWrite=eval(opt.overWrite)
493 from colorama
import Fore, Style
494 from os
import isatty
499 intro =
"Welcome to the Matrix (? for help)" 503 cmd.Cmd.__init__(self)
508 for what
in tmp.files:
509 what = what.replace(
'relval_',
'')
510 self.
opt_.what = what
513 self.
opt_.fromScratch)
517 """Clear the screen, put prompt at the top""" 521 print(
"Leaving the Matrix")
525 if inp ==
'x' or inp ==
'q':
528 is_pipe =
not isatty(sys.stdin.fileno())
529 print(Fore.RED +
"Error: " + Fore.RESET +
"unrecognized command.")
535 print(
"\n".
join([
"predefined [predef1 [...]]\n",
536 "Run w/o argument, it will print the list of known predefined workflows.",
537 "Run with space-separated predefined workflows, it will print the workflow-ids registered to them"]))
540 if text
and len(text) > 0:
541 return [t
for t
in predefinedSet.keys()
if t.startswith(text)]
543 return predefinedSet.keys()
546 """Print the list of predefined workflows""" 547 print(
"List of predefined workflows")
549 for w
in arg.split():
550 if w
in predefinedSet.keys():
551 print(
"Predefined Set: %s" % w)
552 print(predefinedSet[w])
554 print(
"Unknown Set: %s" % w)
556 print(
"[ " + Fore.RED +
", ".
join([
str(k)
for k
in predefinedSet.keys()]) + Fore.RESET +
" ]")
559 print(
"\n".
join([
"showWorkflow [workflow1 [...]]\n",
560 "Run w/o arguments, it will print the list of registered macro-workflows.",
561 "Run with space-separated workflows, it will print the full list of workflow-ids registered to them"]))
564 if text
and len(text) > 0:
565 return [t
for t
in self.
matrices_.
keys()
if t.startswith(text)]
571 print(
"Available workflows:")
573 print(Fore.RED + Style.BRIGHT + k)
574 print(Style.RESET_ALL)
576 selected = arg.split()
579 print(
"Unknown workflow %s: skipping" % k)
582 print(
"%s %s" % (Fore.BLUE +
str(wfl.numId) + Fore.RESET,
583 Fore.GREEN + wfl.nameId + Fore.RESET))
584 print(
"%s contains %d workflows" % (Fore.RED + k + Fore.RESET, len(self.
matrices_[k].workFlows)))
590 print(Fore.RED + Style.BRIGHT +
"Wrong number of parameters passed")
591 print(Style.RESET_ALL)
593 workflow_class = args[0]
594 workflow_id = args[1]
595 passed_down_args = list()
597 passed_down_args = args[2:]
598 print(Fore.YELLOW + Style.BRIGHT +
"Running with the following options:\n")
599 print(Fore.GREEN + Style.BRIGHT +
"Workflow class: {}".
format(workflow_class))
600 print(Fore.GREEN + Style.BRIGHT +
"Workflow ID: {}".
format(workflow_id))
601 print(Fore.GREEN + Style.BRIGHT +
"Additional runTheMatrix options: {}".
format(passed_down_args))
602 print(Style.RESET_ALL)
604 print(Fore.RED + Style.BRIGHT +
"Unknown workflow selected: {}".
format(workflow_class))
605 print(
"Available workflows:")
607 print(Fore.RED + Style.BRIGHT + k)
608 print(Style.RESET_ALL)
610 wflnums = [x.numId
for x
in self.
matrices_[workflow_class].workFlows]
611 if float(workflow_id)
not in wflnums:
612 print(Fore.RED + Style.BRIGHT +
"Unknown workflow {}".
format(workflow_id))
613 print(Fore.GREEN + Style.BRIGHT)
615 print(Style.RESET_ALL)
619 if self.
processes_[workflow_id][0].poll()
is None:
620 print(Fore.RED + Style.BRIGHT +
"Workflow {} already running!".
format(workflow_id))
621 print(Style.RESET_ALL)
625 lognames = [
'stdout',
'stderr']
626 logfiles =
tuple(
'%s_%s_%s.log' % (workflow_class, workflow_id, name)
for name
in lognames)
627 stdout = open(logfiles[0],
'w')
628 stderr = open(logfiles[1],
'w')
629 command = (
'runTheMatrix.py',
'-w', workflow_class,
'-l', workflow_id)
630 if len(passed_down_args) > 0:
631 command +=
tuple(passed_down_args)
633 p = subprocess.Popen(command,
636 self.
processes_[workflow_id] = (p, time.time())
640 if text
and len(text) > 0:
641 return [t
for t
in self.
matrices_.
keys()
if t.startswith(text)]
646 print(
"\n".
join([
"runWorkflow workflow_class workflow_id\n",
647 "This command will launch a new and independent process that invokes",
649 "runTheMatrix.py -w workflow_class -l workflow_id [runTheMatrix.py options]",
650 "\nYou can specify just one workflow_class and workflow_id per invocation.",
651 "The job will continue even after quitting the interactive session.",
652 "stdout and stderr of the new process will be automatically",
653 "redirected to 2 logfiles whose names contain the workflow_class",
654 "and workflow_id. Mutiple command can be issued one after the other.",
655 "The working directory of the new process will be the directory",
656 "from which the interactive session has started.",
657 "Autocompletion is available for workflow_class, but",
658 "not for workflow_id. Supplying a wrong workflow_class or",
659 "a non-existing workflow_id for a valid workflow_class",
660 "will trigger an error and no process will be invoked.",
661 "The interactive shell will keep track of all active processes",
662 "and will prevent the accidental resubmission of an already",
666 print(Fore.GREEN + Style.BRIGHT +
"List of jobs:")
669 print(Fore.YELLOW + Style.BRIGHT +
"Active job: {} since {:.2f} seconds.".
format(w, time.time() - self.
processes_[w][1]))
671 print(Fore.RED + Style.BRIGHT +
"Done job: {}".
format(w))
672 print(Style.RESET_ALL)
675 print(
"\n".
join([
"Print a full list of active and done jobs submitted",
676 "in the ongoing interactive session"]))
679 print(
"\n".
join([
"searchInWorkflow wfl_name search_regexp\n",
680 "This command will search for a match within all workflows registered to wfl_name.",
681 "The search is done on both the workflow name and the names of steps registered to it."]))
684 if text
and len(text) > 0:
685 return [t
for t
in self.
matrices_.
keys()
if t.startswith(text)]
692 print(
"searchInWorkflow name regexp")
695 print(
"Unknown workflow")
700 pattern = re.compile(args[1])
702 print(
"Failed to compile regexp %s" % args[1])
705 for wfl
in self.
matrices_[args[0]].workFlows:
706 if re.match(pattern, wfl.nameId):
707 print(
"%s %s" % (Fore.BLUE +
str(wfl.numId) + Fore.RESET,
708 Fore.GREEN + wfl.nameId + Fore.RESET))
710 print(
"Found %s compatible workflows inside %s" % (Fore.RED +
str(counter) + Fore.RESET,
711 Fore.YELLOW +
str(args[0])) + Fore.RESET)
714 print(
"\n".
join([
"search search_regexp\n",
715 "This command will search for a match within all workflows registered.",
716 "The search is done on both the workflow name and the names of steps registered to it."]))
721 print(
"search regexp")
727 print(
"\n".
join([
"dumpWorkflowId [wfl-id1 [...]]\n",
728 "Dumps the details (cmsDriver commands for all steps) of the space-separated workflow-ids in input."]))
733 print(
"dumpWorkflowId [wfl-id1 [...]]")
741 for wfl
in mrd.workFlows:
742 if wfl.numId ==
float(wflid):
745 print(Fore.GREEN +
str(wfl.numId) + Fore.RESET +
" " + Fore.YELLOW + wfl.nameId + Fore.RESET)
746 for i,s
in enumerate(wfl.cmds):
747 print(fmt % (Fore.RED +
str(i+1) + Fore.RESET,
749 print(
"\nWorkflow found in %s." % key)
751 print(
"Workflow also found in %s." % key)
758 if opt.raw
and opt.show:
def performInjectionOptionTest(opt)
def do_predefined(self, arg)
def do_showWorkflow(self, arg)
def complete_showWorkflow(self, text, line, start_idx, end_idx)
def help_showWorkflow(self)
def help_dumpWorkflowId(self)
def complete_predefined(self, text, line, start_idx, end_idx)
def help_searchInWorkflow(self)
def do_dumpWorkflowId(self, arg)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
def help_runWorkflow(self)
def help_predefined(self)
static std::string join(char **cmd)
def complete_searchInWorkflow(self, text, line, start_idx, end_idx)
def complete_runWorkflow(self, text, line, start_idx, end_idx)
def do_runWorkflow(self, arg)
def do_searchInWorkflow(self, arg)