2 from __future__
import print_function
5 from Configuration.PyReleaseValidation.MatrixReader
import MatrixReader
6 from Configuration.PyReleaseValidation.MatrixRunner
import MatrixRunner
7 from Configuration.PyReleaseValidation.MatrixInjector
import MatrixInjector,performInjectionOptionTest
14 mrd.showRaw(opt.useInput, opt.refRel, opt.fromScratch, opt.raw, opt.step1Only, selected=opt.testList)
23 mrd.prepare(opt.useInput, opt.refRel, opt.fromScratch)
27 definedWf = [dwf.numId
for dwf
in mrd.workFlows]
28 definedSet = set(definedWf)
29 testSet = set(opt.testList)
30 undefSet = testSet - definedSet
31 if len(undefSet)>0:
raise ValueError(
'Undefined workflows: '+
', '.
join(
map(str,list(undefSet))))
32 duplicates = [wf
for wf
in testSet
if definedWf.count(wf)>1 ]
33 if len(duplicates)>0:
raise ValueError(
'Duplicated workflows: '+
', '.
join(
map(str,list(duplicates))))
37 mrd.show(opt.testList, opt.extended, opt.cafVeto)
38 if opt.testList :
print(
'testListected items:', opt.testList)
40 mRunnerHi =
MatrixRunner(mrd.workFlows, opt.nProcs, opt.nThreads)
41 ret = mRunnerHi.runTests(opt)
45 print(
'Cannot go on with wmagent injection with failing workflows')
47 wfInjector =
MatrixInjector(opt,mode=opt.wmcontrol,options=opt.wmoptions)
48 ret= wfInjector.prepare(mrd,
57 if __name__ ==
'__main__':
111 'jetmc': [5.1, 13, 15, 25, 38, 39],
112 'metmc' : [5.1, 15, 25, 37, 38, 39],
113 'muonmc' : [5.1, 124.4, 124.5, 20, 21, 22, 23, 25, 30],
118 usage =
'usage: runTheMatrix.py --show -s ' 120 parser = argparse.ArgumentParser(usage,formatter_class=argparse.ArgumentDefaultsHelpFormatter)
122 parser.add_argument(
'-b',
'--batchName',
123 help=
'relval batch: suffix to be appended to Campaign name',
127 parser.add_argument(
'-m',
'--memoryOffset',
128 help=
'memory of the wf for single core',
133 parser.add_argument(
'--addMemPerCore',
134 help=
'increase of memory per each n > 1 core: memory(n_core) = memoryOffset + (n_core-1) * memPerCore',
139 parser.add_argument(
'-j',
'--nproc',
140 help=
'number of processes. 0 Will use 4 processes, not execute anything but create the wfs',
145 parser.add_argument(
'-t',
'--nThreads',
146 help=
'number of threads per process to use in cmsRun.',
151 parser.add_argument(
'--nStreams',
152 help=
'number of streams to use in cmsRun.',
157 parser.add_argument(
'--nEvents',
158 help=
'number of events to process in cmsRun. If 0 will use the standard 10 events.',
163 parser.add_argument(
'--numberEventsInLuminosityBlock',
164 help=
'number of events in a luminosity block',
165 dest=
'numberEventsInLuminosityBlock',
169 parser.add_argument(
'-n',
'--showMatrix',
170 help=
'Only show the worflows. Use --ext to show more',
175 parser.add_argument(
'-e',
'--extended',
176 help=
'Show details of workflows, used with --show',
181 parser.add_argument(
'-s',
'--selected',
182 help=
'Run a pre-defined selected matrix of wf. Deprecated, please use -l limited',
187 parser.add_argument(
'-l',
'--list',
188 help=
'Comma separated list of workflow to be shown or ran. Possible keys are also '+
str(predefinedSet.keys())+
'. and wild card like muon, or mc',
192 parser.add_argument(
'-f',
'--failed-from',
193 help=
'Provide a matrix report to specify the workflows to be run again. Augments the -l option if specified already',
197 parser.add_argument(
'-r',
'--raw',
198 help=
'Temporary dump the .txt needed for prodAgent interface. To be discontinued soon. Argument must be the name of the set (standard, pileup,...)',
201 parser.add_argument(
'-i',
'--useInput',
202 help=
'Use recyling where available. Either all, or a comma separated list of wf number.',
204 type=
lambda x: x.split(
','),
207 parser.add_argument(
'-w',
'--what',
208 help=
'Specify the set to be used. Argument must be the name of a set (standard, pileup,...) or multiple sets separated by commas (--what standard,pileup )',
212 parser.add_argument(
'--step1',
213 help=
'Used with --raw. Limit the production to step1',
217 parser.add_argument(
'--maxSteps',
218 help=
'Only run maximum on maxSteps. Used when we are only interested in first n steps.',
223 parser.add_argument(
'--fromScratch',
224 help=
'Comma separated list of wf to be run without recycling. all is not supported as default.',
226 type=
lambda x: x.split(
','),
229 parser.add_argument(
'--refRelease',
230 help=
'Allow to modify the recycling dataset version',
234 parser.add_argument(
'--wmcontrol',
235 help=
'Create the workflows for injection to WMAgent. In the WORKING. -wmcontrol init will create the the workflows, -wmcontrol test will dryRun a test, -wmcontrol submit will submit to wmagent',
236 choices=[
'init',
'test',
'submit',
'force'],
240 parser.add_argument(
'--revertDqmio',
241 help=
'When submitting workflows to wmcontrol, force DQM outout to use pool and not DQMIO',
242 choices=[
'yes',
'no'],
246 parser.add_argument(
'--optionswm',
247 help=
'Specify a few things for wm injection',
251 parser.add_argument(
'--keep',
252 help=
'allow to specify for which comma separated steps the output is needed',
255 parser.add_argument(
'--label',
256 help=
'allow to give a special label to the output dataset name',
259 parser.add_argument(
'--command',
260 help=
'provide a way to add additional command to all of the cmsDriver commands in the matrix',
265 parser.add_argument(
'--apply',
266 help=
'allow to use the --command only for 1 comma separeated',
270 parser.add_argument(
'--workflow',
271 help=
'define a workflow to be created or altered from the matrix',
276 parser.add_argument(
'--dryRun',
277 help=
'do not run the wf at all',
282 parser.add_argument(
'--testbed',
283 help=
'workflow injection to cmswebtest (you need dedicated rqmgr account)',
288 parser.add_argument(
'--noCafVeto',
289 help=
'Run from any source, ignoring the CAF label',
292 action=
'store_false')
294 parser.add_argument(
'--overWrite',
295 help=
'Change the content of a step for another. List of pairs.',
299 parser.add_argument(
'--noRun',
300 help=
'Remove all run list selection from wfs',
305 parser.add_argument(
'--das-options',
306 help=
'Options to be passed to dasgoclient.',
311 parser.add_argument(
'--job-reports',
312 help=
'Dump framework job reports',
317 parser.add_argument(
'--ibeos',
318 help=
'Use IB EOS site configuration',
323 parser.add_argument(
'--sites',
324 help=
'Run DAS query to get data from a specific site. Set it to empty string to search all sites.',
326 default=
'T2_CH_CERN',
329 parser.add_argument(
'--interactive',
330 help=
"Open the Matrix interactive shell",
334 parser.add_argument(
'--dbs-url',
335 help=
'Overwrite DbsUrl value in JSON submitted to ReqMgr2',
340 gpugroup = parser.add_argument_group(
'GPU-related options',
'These options are only meaningful when --gpu is used, and is not set to forbidden.')
342 gpugroup.add_argument(
'--gpu',
'--requires-gpu',
343 help=
'Enable GPU workflows. Possible options are "forbidden" (default), "required" (implied if no argument is given), or "optional".',
345 choices=[
'forbidden',
'optional',
'required'],
351 gpugroup.add_argument(
'--gpu-memory',
352 help=
'Specify the minimum amount of GPU memory required by the job, in MB.',
357 gpugroup.add_argument(
'--cuda-capabilities',
358 help=
'Specify a comma-separated list of CUDA "compute capabilities", or GPU hardware architectures, that the job can use.',
359 dest=
'CUDACapabilities',
360 type=
lambda x: x.split(
','),
361 default=
'6.0,6.1,6.2,7.0,7.2,7.5,8.0,8.6')
364 cudart_version =
None 365 libcudart = os.path.realpath(os.path.expandvars(
'$CMSSW_RELEASE_BASE/external/$SCRAM_ARCH/lib/libcudart.so'))
366 if os.path.isfile(libcudart):
367 cudart_basename = os.path.basename(libcudart)
368 cudart_version =
'.'.
join(cudart_basename.split(
'.')[2:4])
369 gpugroup.add_argument(
'--cuda-runtime',
370 help=
'Specify major and minor version of the CUDA runtime used to build the application.',
372 default=cudart_version)
374 gpugroup.add_argument(
'--force-gpu-name',
375 help=
'Request a specific GPU model, e.g. "Tesla T4" or "NVIDIA GeForce RTX 2080". The default behaviour is to accept any supported GPU.',
379 gpugroup.add_argument(
'--force-cuda-driver-version',
380 help=
'Request a specific CUDA driver version, e.g. 470.57.02. The default behaviour is to accept any supported CUDA driver version.',
381 dest=
'CUDADriverVersion',
384 gpugroup.add_argument(
'--force-cuda-runtime-version',
385 help=
'Request a specific CUDA runtime version, e.g. 11.4. The default behaviour is to accept any supported CUDA runtime version.',
386 dest=
'CUDARuntimeVersion',
389 opt = parser.parse_args()
390 if opt.command: opt.command =
' '.
join(opt.command)
391 os.environ[
"CMSSW_DAS_QUERY_SITES"]=opt.dasSites
394 with open(opt.failed_from,
'r') as report: 395 for report_line
in report:
396 if 'FAILED' in report_line:
397 to_run,_=report_line.split(
'_',1)
398 rerunthese.append(to_run)
400 opt.testList+=
','.
join([
'']+rerunthese)
402 opt.testList =
','.
join(rerunthese)
405 from subprocess
import getstatusoutput
as run_cmd
407 ibeos_cache = os.path.join(os.getenv(
"LOCALRT"),
"ibeos_cache.txt")
408 if not os.path.exists(ibeos_cache):
409 err, out = run_cmd(
"curl -L -s -o %s https://raw.githubusercontent.com/cms-sw/cms-sw.github.io/master/das_queries/ibeos.txt" % ibeos_cache)
411 run_cmd(
"rm -f %s" % ibeos_cache)
412 print(
"Error: Unable to download ibeos cache information")
416 for cmssw_env
in [
"CMSSW_BASE",
"CMSSW_RELEASE_BASE" ]:
417 cmssw_base = os.getenv(cmssw_env,
None)
418 if not cmssw_base:
continue 419 cmssw_base = os.path.join(cmssw_base,
"src/Utilities/General/ibeos")
420 if os.path.exists(cmssw_base):
421 os.environ[
"PATH"]=cmssw_base+
":"+os.getenv(
"PATH")
422 os.environ[
"CMS_PATH"]=
"/cvmfs/cms-ib.cern.ch" 423 os.environ[
"SITECONFIG_PATH"]=
"/cvmfs/cms-ib.cern.ch/SITECONF/local" 424 os.environ[
"CMSSW_USE_IBEOS"]=
"true" 425 print(
">> WARNING: You are using SITECONF from /cvmfs/cms-ib.cern.ch")
428 print(
'Deprecated, please use -l limited')
429 if opt.testList: opt.testList+=
',limited' 430 else: opt.testList=
'limited' 438 opt.apply=
map(stepOrIndex,opt.apply.split(
','))
440 opt.keep=
map(stepOrIndex,opt.keep.split(
','))
444 for entry
in opt.testList.split(
','):
445 if not entry:
continue 447 for k
in predefinedSet:
448 if k.lower().startswith(entry.lower())
or k.lower().endswith(entry.lower()):
449 testList.extend(predefinedSet[k])
454 testList.append(
float(entry))
456 print(entry,
'is not a possible selected entry')
458 opt.testList = list(set(testList))
463 opt.overWrite=eval(opt.overWrite)
466 from colorama
import Fore, Style
467 from os
import isatty
472 intro =
"Welcome to the Matrix (? for help)" 476 cmd.Cmd.__init__(self)
481 for what
in tmp.files:
482 what = what.replace(
'relval_',
'')
483 self.
opt_.what = what
486 self.
opt_.fromScratch)
490 """Clear the screen, put prompt at the top""" 494 print(
"Leaving the Matrix")
498 if inp ==
'x' or inp ==
'q':
501 is_pipe =
not isatty(sys.stdin.fileno())
502 print(Fore.RED +
"Error: " + Fore.RESET +
"unrecognized command.")
508 print(
"\n".
join([
"predefined [predef1 [...]]\n",
509 "Run w/o argument, it will print the list of known predefined workflows.",
510 "Run with space-separated predefined workflows, it will print the workflow-ids registered to them"]))
513 if text
and len(text) > 0:
514 return [t
for t
in predefinedSet.keys()
if t.startswith(text)]
516 return predefinedSet.keys()
519 """Print the list of predefined workflows""" 520 print(
"List of predefined workflows")
522 for w
in arg.split():
523 if w
in predefinedSet.keys():
524 print(
"Predefined Set: %s" % w)
525 print(predefinedSet[w])
527 print(
"Unknown Set: %s" % w)
529 print(
"[ " + Fore.RED +
", ".
join([
str(k)
for k
in predefinedSet.keys()]) + Fore.RESET +
" ]")
532 print(
"\n".
join([
"showWorkflow [workflow1 [...]]\n",
533 "Run w/o arguments, it will print the list of registered macro-workflows.",
534 "Run with space-separated workflows, it will print the full list of workflow-ids registered to them"]))
537 if text
and len(text) > 0:
538 return [t
for t
in self.
matrices_.
keys()
if t.startswith(text)]
544 print(
"Available workflows:")
546 print(Fore.RED + Style.BRIGHT + k)
547 print(Style.RESET_ALL)
549 selected = arg.split()
552 print(
"Unknown workflow %s: skipping" % k)
555 print(
"%s %s" % (Fore.BLUE +
str(wfl.numId) + Fore.RESET,
556 Fore.GREEN + wfl.nameId + Fore.RESET))
557 print(
"%s contains %d workflows" % (Fore.RED + k + Fore.RESET, len(self.
matrices_[k].workFlows)))
563 print(Fore.RED + Style.BRIGHT +
"Wrong number of parameters passed")
564 print(Style.RESET_ALL)
566 workflow_class = args[0]
567 workflow_id = args[1]
568 passed_down_args = list()
570 passed_down_args = args[2:]
571 print(Fore.YELLOW + Style.BRIGHT +
"Running with the following options:\n")
572 print(Fore.GREEN + Style.BRIGHT +
"Workflow class: {}".
format(workflow_class))
573 print(Fore.GREEN + Style.BRIGHT +
"Workflow ID: {}".
format(workflow_id))
574 print(Fore.GREEN + Style.BRIGHT +
"Additional runTheMatrix options: {}".
format(passed_down_args))
575 print(Style.RESET_ALL)
577 print(Fore.RED + Style.BRIGHT +
"Unknown workflow selected: {}".
format(workflow_class))
578 print(
"Available workflows:")
580 print(Fore.RED + Style.BRIGHT + k)
581 print(Style.RESET_ALL)
583 wflnums = [x.numId
for x
in self.
matrices_[workflow_class].workFlows]
584 if float(workflow_id)
not in wflnums:
585 print(Fore.RED + Style.BRIGHT +
"Unknown workflow {}".
format(workflow_id))
586 print(Fore.GREEN + Style.BRIGHT)
588 print(Style.RESET_ALL)
592 if self.
processes_[workflow_id][0].poll()
is None:
593 print(Fore.RED + Style.BRIGHT +
"Workflow {} already running!".
format(workflow_id))
594 print(Style.RESET_ALL)
598 lognames = [
'stdout',
'stderr']
599 logfiles =
tuple(
'%s_%s_%s.log' % (workflow_class, workflow_id, name)
for name
in lognames)
600 stdout = open(logfiles[0],
'w')
601 stderr = open(logfiles[1],
'w')
602 command = (
'runTheMatrix.py',
'-w', workflow_class,
'-l', workflow_id)
603 if len(passed_down_args) > 0:
604 command +=
tuple(passed_down_args)
606 p = subprocess.Popen(command,
609 self.
processes_[workflow_id] = (p, time.time())
613 if text
and len(text) > 0:
614 return [t
for t
in self.
matrices_.
keys()
if t.startswith(text)]
619 print(
"\n".
join([
"runWorkflow workflow_class workflow_id\n",
620 "This command will launch a new and independent process that invokes",
622 "runTheMatrix.py -w workflow_class -l workflow_id [runTheMatrix.py options]",
623 "\nYou can specify just one workflow_class and workflow_id per invocation.",
624 "The job will continue even after quitting the interactive session.",
625 "stdout and stderr of the new process will be automatically",
626 "redirected to 2 logfiles whose names contain the workflow_class",
627 "and workflow_id. Mutiple command can be issued one after the other.",
628 "The working directory of the new process will be the directory",
629 "from which the interactive session has started.",
630 "Autocompletion is available for workflow_class, but",
631 "not for workflow_id. Supplying a wrong workflow_class or",
632 "a non-existing workflow_id for a valid workflow_class",
633 "will trigger an error and no process will be invoked.",
634 "The interactive shell will keep track of all active processes",
635 "and will prevent the accidental resubmission of an already",
639 print(Fore.GREEN + Style.BRIGHT +
"List of jobs:")
642 print(Fore.YELLOW + Style.BRIGHT +
"Active job: {} since {:.2f} seconds.".
format(w, time.time() - self.
processes_[w][1]))
644 print(Fore.RED + Style.BRIGHT +
"Done job: {}".
format(w))
645 print(Style.RESET_ALL)
648 print(
"\n".
join([
"Print a full list of active and done jobs submitted",
649 "in the ongoing interactive session"]))
652 print(
"\n".
join([
"searchInWorkflow wfl_name search_regexp\n",
653 "This command will search for a match within all workflows registered to wfl_name.",
654 "The search is done on both the workflow name and the names of steps registered to it."]))
657 if text
and len(text) > 0:
658 return [t
for t
in self.
matrices_.
keys()
if t.startswith(text)]
665 print(
"searchInWorkflow name regexp")
668 print(
"Unknown workflow")
673 pattern = re.compile(args[1])
675 print(
"Failed to compile regexp %s" % args[1])
678 for wfl
in self.
matrices_[args[0]].workFlows:
679 if re.match(pattern, wfl.nameId):
680 print(
"%s %s" % (Fore.BLUE +
str(wfl.numId) + Fore.RESET,
681 Fore.GREEN + wfl.nameId + Fore.RESET))
683 print(
"Found %s compatible workflows inside %s" % (Fore.RED +
str(counter) + Fore.RESET,
684 Fore.YELLOW +
str(args[0])) + Fore.RESET)
687 print(
"\n".
join([
"search search_regexp\n",
688 "This command will search for a match within all workflows registered.",
689 "The search is done on both the workflow name and the names of steps registered to it."]))
694 print(
"search regexp")
700 print(
"\n".
join([
"dumpWorkflowId [wfl-id1 [...]]\n",
701 "Dumps the details (cmsDriver commands for all steps) of the space-separated workflow-ids in input."]))
706 print(
"dumpWorkflowId [wfl-id1 [...]]")
714 for wfl
in mrd.workFlows:
715 if wfl.numId ==
float(wflid):
718 print(Fore.GREEN +
str(wfl.numId) + Fore.RESET +
" " + Fore.YELLOW + wfl.nameId + Fore.RESET)
719 for i,s
in enumerate(wfl.cmds):
720 print(fmt % (Fore.RED +
str(i+1) + Fore.RESET,
722 print(
"\nWorkflow found in %s." % key)
724 print(
"Workflow also found in %s." % key)
731 if opt.raw
and opt.show:
def performInjectionOptionTest(opt)
def do_predefined(self, arg)
def do_showWorkflow(self, arg)
def complete_showWorkflow(self, text, line, start_idx, end_idx)
def help_showWorkflow(self)
def help_dumpWorkflowId(self)
def complete_predefined(self, text, line, start_idx, end_idx)
def help_searchInWorkflow(self)
def do_dumpWorkflowId(self, arg)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
def help_runWorkflow(self)
def help_predefined(self)
static std::string join(char **cmd)
def complete_searchInWorkflow(self, text, line, start_idx, end_idx)
def complete_runWorkflow(self, text, line, start_idx, end_idx)
def do_runWorkflow(self, arg)
def do_searchInWorkflow(self, arg)