2 from __future__
import print_function
5 from Configuration.PyReleaseValidation.MatrixReader
import MatrixReader
6 from Configuration.PyReleaseValidation.MatrixRunner
import MatrixRunner
7 from Configuration.PyReleaseValidation.MatrixInjector
import MatrixInjector,performInjectionOptionTest
14 mrd.showRaw(opt.useInput, opt.refRel, opt.fromScratch, opt.raw, opt.step1Only, selected=opt.testList)
23 mrd.prepare(opt.useInput, opt.refRel, opt.fromScratch)
27 definedWf = [dwf.numId
for dwf
in mrd.workFlows]
28 definedSet = set(definedWf)
29 testSet = set(opt.testList)
30 undefSet = testSet - definedSet
31 if len(undefSet)>0:
raise ValueError(
'Undefined workflows: '+
', '.
join(
map(str,list(undefSet))))
32 duplicates = [wf
for wf
in testSet
if definedWf.count(wf)>1 ]
33 if len(duplicates)>0:
raise ValueError(
'Duplicated workflows: '+
', '.
join(
map(str,list(duplicates))))
37 mrd.show(opt.testList, opt.extended, opt.cafVeto)
38 if opt.testList :
print(
'selected items:', opt.testList)
40 mRunnerHi =
MatrixRunner(mrd.workFlows, opt.nProcs, opt.nThreads)
41 ret = mRunnerHi.runTests(opt)
45 print(
'Cannot go on with wmagent injection with failing workflows')
47 wfInjector =
MatrixInjector(opt,mode=opt.wmcontrol,options=opt.wmoptions)
48 ret= wfInjector.prepare(mrd,
57 if __name__ ==
'__main__':
137 'jetmc': [5.1, 13, 15, 25, 38, 39],
138 'metmc' : [5.1, 15, 25, 37, 38, 39],
139 'muonmc' : [5.1, 124.4, 124.5, 20, 21, 22, 23, 25, 30],
144 usage =
'usage: runTheMatrix.py --show -s ' 146 parser = argparse.ArgumentParser(usage,formatter_class=argparse.ArgumentDefaultsHelpFormatter)
148 parser.add_argument(
'-b',
'--batchName',
149 help=
'relval batch: suffix to be appended to Campaign name',
153 parser.add_argument(
'-m',
'--memoryOffset',
154 help=
'memory of the wf for single core',
159 parser.add_argument(
'--addMemPerCore',
160 help=
'increase of memory per each n > 1 core: memory(n_core) = memoryOffset + (n_core-1) * memPerCore',
165 parser.add_argument(
'-j',
'--nproc',
166 help=
'number of processes. 0 Will use 4 processes, not execute anything but create the wfs',
171 parser.add_argument(
'-t',
'--nThreads',
172 help=
'number of threads per process to use in cmsRun.',
177 parser.add_argument(
'--nStreams',
178 help=
'number of streams to use in cmsRun.',
183 parser.add_argument(
'--nEvents',
184 help=
'number of events to process in cmsRun. If 0 will use the standard 10 events.',
189 parser.add_argument(
'--numberEventsInLuminosityBlock',
190 help=
'number of events in a luminosity block',
191 dest=
'numberEventsInLuminosityBlock',
195 parser.add_argument(
'-n',
'--showMatrix',
196 help=
'Only show the worflows. Use --ext to show more',
201 parser.add_argument(
'-e',
'--extended',
202 help=
'Show details of workflows, used with --show',
207 parser.add_argument(
'-s',
'--selected',
208 help=
'Run a pre-defined selected matrix of wf. Deprecated, please use -l limited',
213 parser.add_argument(
'-l',
'--list',
214 help=
'Comma separated list of workflow to be shown or ran. Possible keys are also '+
str(predefinedSet.keys())+
'. and wild card like muon, or mc',
218 parser.add_argument(
'-f',
'--failed-from',
219 help=
'Provide a matrix report to specify the workflows to be run again. Augments the -l option if specified already',
223 parser.add_argument(
'-r',
'--raw',
224 help=
'Temporary dump the .txt needed for prodAgent interface. To be discontinued soon. Argument must be the name of the set (standard, pileup,...)',
227 parser.add_argument(
'-i',
'--useInput',
228 help=
'Use recyling where available. Either all, or a comma separated list of wf number.',
230 type=
lambda x: x.split(
','),
233 parser.add_argument(
'-w',
'--what',
234 help=
'Specify the set to be used. Argument must be the name of a set (standard, pileup,...) or multiple sets separated by commas (--what standard,pileup )',
238 parser.add_argument(
'--step1',
239 help=
'Used with --raw. Limit the production to step1',
243 parser.add_argument(
'--maxSteps',
244 help=
'Only run maximum on maxSteps. Used when we are only interested in first n steps.',
249 parser.add_argument(
'--fromScratch',
250 help=
'Comma separated list of wf to be run without recycling. all is not supported as default.',
252 type=
lambda x: x.split(
','),
255 parser.add_argument(
'--refRelease',
256 help=
'Allow to modify the recycling dataset version',
260 parser.add_argument(
'--wmcontrol',
261 help=
'Create the workflows for injection to WMAgent. In the WORKING. -wmcontrol init will create the the workflows, -wmcontrol test will dryRun a test, -wmcontrol submit will submit to wmagent',
262 choices=[
'init',
'test',
'submit',
'force'],
266 parser.add_argument(
'--revertDqmio',
267 help=
'When submitting workflows to wmcontrol, force DQM outout to use pool and not DQMIO',
268 choices=[
'yes',
'no'],
272 parser.add_argument(
'--optionswm',
273 help=
'Specify a few things for wm injection',
277 parser.add_argument(
'--keep',
278 help=
'allow to specify for which comma separated steps the output is needed',
281 parser.add_argument(
'--label',
282 help=
'allow to give a special label to the output dataset name',
285 parser.add_argument(
'--command',
286 help=
'provide a way to add additional command to all of the cmsDriver commands in the matrix',
291 parser.add_argument(
'--apply',
292 help=
'allow to use the --command only for 1 comma separeated',
296 parser.add_argument(
'--workflow',
297 help=
'define a workflow to be created or altered from the matrix',
302 parser.add_argument(
'--dryRun',
303 help=
'do not run the wf at all',
308 parser.add_argument(
'--testbed',
309 help=
'workflow injection to cmswebtest (you need dedicated rqmgr account)',
314 parser.add_argument(
'--noCafVeto',
315 help=
'Run from any source, ignoring the CAF label',
318 action=
'store_false')
320 parser.add_argument(
'--overWrite',
321 help=
'Change the content of a step for another. List of pairs.',
325 parser.add_argument(
'--noRun',
326 help=
'Remove all run list selection from wfs',
331 parser.add_argument(
'--das-options',
332 help=
'Options to be passed to dasgoclient.',
337 parser.add_argument(
'--job-reports',
338 help=
'Dump framework job reports',
343 parser.add_argument(
'--ibeos',
344 help=
'Use IB EOS site configuration',
349 parser.add_argument(
'--sites',
350 help=
'Run DAS query to get data from a specific site. Set it to empty string to search all sites.',
352 default=
'T2_CH_CERN',
355 parser.add_argument(
'--interactive',
356 help=
"Open the Matrix interactive shell",
360 parser.add_argument(
'--dbs-url',
361 help=
'Overwrite DbsUrl value in JSON submitted to ReqMgr2',
366 gpugroup = parser.add_argument_group(
'GPU-related options',
'These options are only meaningful when --gpu is used, and is not set to forbidden.')
368 gpugroup.add_argument(
'--gpu',
'--requires-gpu',
369 help=
'Enable GPU workflows. Possible options are "forbidden" (default), "required" (implied if no argument is given), or "optional".',
371 choices=[
'forbidden',
'optional',
'required'],
377 gpugroup.add_argument(
'--gpu-memory',
378 help=
'Specify the minimum amount of GPU memory required by the job, in MB.',
383 gpugroup.add_argument(
'--cuda-capabilities',
384 help=
'Specify a comma-separated list of CUDA "compute capabilities", or GPU hardware architectures, that the job can use.',
385 dest=
'CUDACapabilities',
386 type=
lambda x: x.split(
','),
387 default=
'6.0,6.1,6.2,7.0,7.2,7.5,8.0,8.6')
390 cudart_version =
None 391 libcudart = os.path.realpath(os.path.expandvars(
'$CMSSW_RELEASE_BASE/external/$SCRAM_ARCH/lib/libcudart.so'))
392 if os.path.isfile(libcudart):
393 cudart_basename = os.path.basename(libcudart)
394 cudart_version =
'.'.
join(cudart_basename.split(
'.')[2:4])
395 gpugroup.add_argument(
'--cuda-runtime',
396 help=
'Specify major and minor version of the CUDA runtime used to build the application.',
398 default=cudart_version)
400 gpugroup.add_argument(
'--force-gpu-name',
401 help=
'Request a specific GPU model, e.g. "Tesla T4" or "NVIDIA GeForce RTX 2080". The default behaviour is to accept any supported GPU.',
405 gpugroup.add_argument(
'--force-cuda-driver-version',
406 help=
'Request a specific CUDA driver version, e.g. 470.57.02. The default behaviour is to accept any supported CUDA driver version.',
407 dest=
'CUDADriverVersion',
410 gpugroup.add_argument(
'--force-cuda-runtime-version',
411 help=
'Request a specific CUDA runtime version, e.g. 11.4. The default behaviour is to accept any supported CUDA runtime version.',
412 dest=
'CUDARuntimeVersion',
415 opt = parser.parse_args()
416 if opt.command: opt.command =
' '.
join(opt.command)
417 os.environ[
"CMSSW_DAS_QUERY_SITES"]=opt.dasSites
420 with open(opt.failed_from,
'r') as report: 421 for report_line
in report:
422 if 'FAILED' in report_line:
423 to_run,_=report_line.split(
'_',1)
424 rerunthese.append(to_run)
426 opt.testList+=
','.
join([
'']+rerunthese)
428 opt.testList =
','.
join(rerunthese)
431 from subprocess
import getstatusoutput
as run_cmd
433 ibeos_cache = os.path.join(os.getenv(
"LOCALRT"),
"ibeos_cache.txt")
434 if not os.path.exists(ibeos_cache):
435 err, out = run_cmd(
"curl -L -s -o %s https://raw.githubusercontent.com/cms-sw/cms-sw.github.io/master/das_queries/ibeos.txt" % ibeos_cache)
437 run_cmd(
"rm -f %s" % ibeos_cache)
438 print(
"Error: Unable to download ibeos cache information")
442 for cmssw_env
in [
"CMSSW_BASE",
"CMSSW_RELEASE_BASE" ]:
443 cmssw_base = os.getenv(cmssw_env,
None)
444 if not cmssw_base:
continue 445 cmssw_base = os.path.join(cmssw_base,
"src/Utilities/General/ibeos")
446 if os.path.exists(cmssw_base):
447 os.environ[
"PATH"]=cmssw_base+
":"+os.getenv(
"PATH")
448 os.environ[
"CMS_PATH"]=
"/cvmfs/cms-ib.cern.ch" 449 os.environ[
"SITECONFIG_PATH"]=
"/cvmfs/cms-ib.cern.ch/SITECONF/local" 450 os.environ[
"CMSSW_USE_IBEOS"]=
"true" 451 print(
">> WARNING: You are using SITECONF from /cvmfs/cms-ib.cern.ch")
454 print(
'Deprecated, please use -l limited')
455 if opt.testList: opt.testList+=
',limited' 456 else: opt.testList=
'limited' 464 opt.apply=
map(stepOrIndex,opt.apply.split(
','))
466 opt.keep=
map(stepOrIndex,opt.keep.split(
','))
470 for entry
in opt.testList.split(
','):
471 if not entry:
continue 473 for k
in predefinedSet:
474 if k.lower().startswith(entry.lower())
or k.lower().endswith(entry.lower()):
475 testList.extend(predefinedSet[k])
480 testList.append(
float(entry))
482 print(entry,
'is not a possible selected entry')
484 opt.testList = list(set(testList))
489 opt.overWrite=eval(opt.overWrite)
492 from colorama
import Fore, Style
493 from os
import isatty
498 intro =
"Welcome to the Matrix (? for help)" 502 cmd.Cmd.__init__(self)
507 for what
in tmp.files:
508 what = what.replace(
'relval_',
'')
509 self.
opt_.what = what
512 self.
opt_.fromScratch)
516 """Clear the screen, put prompt at the top""" 520 print(
"Leaving the Matrix")
524 if inp ==
'x' or inp ==
'q':
527 is_pipe =
not isatty(sys.stdin.fileno())
528 print(Fore.RED +
"Error: " + Fore.RESET +
"unrecognized command.")
534 print(
"\n".
join([
"predefined [predef1 [...]]\n",
535 "Run w/o argument, it will print the list of known predefined workflows.",
536 "Run with space-separated predefined workflows, it will print the workflow-ids registered to them"]))
539 if text
and len(text) > 0:
540 return [t
for t
in predefinedSet.keys()
if t.startswith(text)]
542 return predefinedSet.keys()
545 """Print the list of predefined workflows""" 546 print(
"List of predefined workflows")
548 for w
in arg.split():
549 if w
in predefinedSet.keys():
550 print(
"Predefined Set: %s" % w)
551 print(predefinedSet[w])
553 print(
"Unknown Set: %s" % w)
555 print(
"[ " + Fore.RED +
", ".
join([
str(k)
for k
in predefinedSet.keys()]) + Fore.RESET +
" ]")
558 print(
"\n".
join([
"showWorkflow [workflow1 [...]]\n",
559 "Run w/o arguments, it will print the list of registered macro-workflows.",
560 "Run with space-separated workflows, it will print the full list of workflow-ids registered to them"]))
563 if text
and len(text) > 0:
564 return [t
for t
in self.
matrices_.
keys()
if t.startswith(text)]
570 print(
"Available workflows:")
572 print(Fore.RED + Style.BRIGHT + k)
573 print(Style.RESET_ALL)
575 selected = arg.split()
578 print(
"Unknown workflow %s: skipping" % k)
581 print(
"%s %s" % (Fore.BLUE +
str(wfl.numId) + Fore.RESET,
582 Fore.GREEN + wfl.nameId + Fore.RESET))
583 print(
"%s contains %d workflows" % (Fore.RED + k + Fore.RESET, len(self.
matrices_[k].workFlows)))
589 print(Fore.RED + Style.BRIGHT +
"Wrong number of parameters passed")
590 print(Style.RESET_ALL)
592 workflow_class = args[0]
593 workflow_id = args[1]
594 passed_down_args = list()
596 passed_down_args = args[2:]
597 print(Fore.YELLOW + Style.BRIGHT +
"Running with the following options:\n")
598 print(Fore.GREEN + Style.BRIGHT +
"Workflow class: {}".
format(workflow_class))
599 print(Fore.GREEN + Style.BRIGHT +
"Workflow ID: {}".
format(workflow_id))
600 print(Fore.GREEN + Style.BRIGHT +
"Additional runTheMatrix options: {}".
format(passed_down_args))
601 print(Style.RESET_ALL)
603 print(Fore.RED + Style.BRIGHT +
"Unknown workflow selected: {}".
format(workflow_class))
604 print(
"Available workflows:")
606 print(Fore.RED + Style.BRIGHT + k)
607 print(Style.RESET_ALL)
609 wflnums = [x.numId
for x
in self.
matrices_[workflow_class].workFlows]
610 if float(workflow_id)
not in wflnums:
611 print(Fore.RED + Style.BRIGHT +
"Unknown workflow {}".
format(workflow_id))
612 print(Fore.GREEN + Style.BRIGHT)
614 print(Style.RESET_ALL)
618 if self.
processes_[workflow_id][0].poll()
is None:
619 print(Fore.RED + Style.BRIGHT +
"Workflow {} already running!".
format(workflow_id))
620 print(Style.RESET_ALL)
624 lognames = [
'stdout',
'stderr']
625 logfiles =
tuple(
'%s_%s_%s.log' % (workflow_class, workflow_id, name)
for name
in lognames)
626 stdout = open(logfiles[0],
'w')
627 stderr = open(logfiles[1],
'w')
628 command = (
'runTheMatrix.py',
'-w', workflow_class,
'-l', workflow_id)
629 if len(passed_down_args) > 0:
630 command +=
tuple(passed_down_args)
632 p = subprocess.Popen(command,
635 self.
processes_[workflow_id] = (p, time.time())
639 if text
and len(text) > 0:
640 return [t
for t
in self.
matrices_.
keys()
if t.startswith(text)]
645 print(
"\n".
join([
"runWorkflow workflow_class workflow_id\n",
646 "This command will launch a new and independent process that invokes",
648 "runTheMatrix.py -w workflow_class -l workflow_id [runTheMatrix.py options]",
649 "\nYou can specify just one workflow_class and workflow_id per invocation.",
650 "The job will continue even after quitting the interactive session.",
651 "stdout and stderr of the new process will be automatically",
652 "redirected to 2 logfiles whose names contain the workflow_class",
653 "and workflow_id. Mutiple command can be issued one after the other.",
654 "The working directory of the new process will be the directory",
655 "from which the interactive session has started.",
656 "Autocompletion is available for workflow_class, but",
657 "not for workflow_id. Supplying a wrong workflow_class or",
658 "a non-existing workflow_id for a valid workflow_class",
659 "will trigger an error and no process will be invoked.",
660 "The interactive shell will keep track of all active processes",
661 "and will prevent the accidental resubmission of an already",
665 print(Fore.GREEN + Style.BRIGHT +
"List of jobs:")
668 print(Fore.YELLOW + Style.BRIGHT +
"Active job: {} since {:.2f} seconds.".
format(w, time.time() - self.
processes_[w][1]))
670 print(Fore.RED + Style.BRIGHT +
"Done job: {}".
format(w))
671 print(Style.RESET_ALL)
674 print(
"\n".
join([
"Print a full list of active and done jobs submitted",
675 "in the ongoing interactive session"]))
678 print(
"\n".
join([
"searchInWorkflow wfl_name search_regexp\n",
679 "This command will search for a match within all workflows registered to wfl_name.",
680 "The search is done on both the workflow name and the names of steps registered to it."]))
683 if text
and len(text) > 0:
684 return [t
for t
in self.
matrices_.
keys()
if t.startswith(text)]
691 print(
"searchInWorkflow name regexp")
694 print(
"Unknown workflow")
699 pattern = re.compile(args[1])
701 print(
"Failed to compile regexp %s" % args[1])
704 for wfl
in self.
matrices_[args[0]].workFlows:
705 if re.match(pattern, wfl.nameId):
706 print(
"%s %s" % (Fore.BLUE +
str(wfl.numId) + Fore.RESET,
707 Fore.GREEN + wfl.nameId + Fore.RESET))
709 print(
"Found %s compatible workflows inside %s" % (Fore.RED +
str(counter) + Fore.RESET,
710 Fore.YELLOW +
str(args[0])) + Fore.RESET)
713 print(
"\n".
join([
"search search_regexp\n",
714 "This command will search for a match within all workflows registered.",
715 "The search is done on both the workflow name and the names of steps registered to it."]))
720 print(
"search regexp")
726 print(
"\n".
join([
"dumpWorkflowId [wfl-id1 [...]]\n",
727 "Dumps the details (cmsDriver commands for all steps) of the space-separated workflow-ids in input."]))
732 print(
"dumpWorkflowId [wfl-id1 [...]]")
740 for wfl
in mrd.workFlows:
741 if wfl.numId ==
float(wflid):
744 print(Fore.GREEN +
str(wfl.numId) + Fore.RESET +
" " + Fore.YELLOW + wfl.nameId + Fore.RESET)
745 for i,s
in enumerate(wfl.cmds):
746 print(fmt % (Fore.RED +
str(i+1) + Fore.RESET,
748 print(
"\nWorkflow found in %s." % key)
750 print(
"Workflow also found in %s." % key)
757 if opt.raw
and opt.show:
def performInjectionOptionTest(opt)
def do_predefined(self, arg)
def do_showWorkflow(self, arg)
def complete_showWorkflow(self, text, line, start_idx, end_idx)
def help_showWorkflow(self)
def help_dumpWorkflowId(self)
def complete_predefined(self, text, line, start_idx, end_idx)
def help_searchInWorkflow(self)
def do_dumpWorkflowId(self, arg)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
def help_runWorkflow(self)
def help_predefined(self)
static std::string join(char **cmd)
def complete_searchInWorkflow(self, text, line, start_idx, end_idx)
def complete_runWorkflow(self, text, line, start_idx, end_idx)
def do_runWorkflow(self, arg)
def do_searchInWorkflow(self, arg)