2 from __future__
import print_function
5 from Configuration.PyReleaseValidation.MatrixReader
import MatrixReader
6 from Configuration.PyReleaseValidation.MatrixRunner
import MatrixRunner
7 from Configuration.PyReleaseValidation.MatrixInjector
import MatrixInjector,performInjectionOptionTest
14 mrd.showRaw(opt.useInput, opt.refRel, opt.fromScratch, opt.raw, opt.step1Only, selected=opt.testList)
23 mrd.prepare(opt.useInput, opt.refRel, opt.fromScratch)
27 definedWf = [dwf.numId
for dwf
in mrd.workFlows]
28 definedSet = set(definedWf)
29 testSet = set(opt.testList)
30 undefSet = testSet - definedSet
31 if len(undefSet)>0:
raise ValueError(
'Undefined workflows: '+
', '.
join(
map(str,list(undefSet))))
32 duplicates = [wf
for wf
in testSet
if definedWf.count(wf)>1 ]
33 if len(duplicates)>0:
raise ValueError(
'Duplicated workflows: '+
', '.
join(
map(str,list(duplicates))))
37 mrd.show(opt.testList, opt.extended, opt.cafVeto)
38 if opt.testList :
print(
'selected items:', opt.testList)
40 mRunnerHi =
MatrixRunner(mrd.workFlows, opt.nProcs, opt.nThreads)
41 ret = mRunnerHi.runTests(opt)
45 print(
'Cannot go on with wmagent injection with failing workflows')
47 wfInjector =
MatrixInjector(opt,mode=opt.wmcontrol,options=opt.wmoptions)
48 ret= wfInjector.prepare(mrd,
57 if __name__ ==
'__main__':
134 'jetmc': [5.1, 13, 15, 25, 38, 39],
135 'metmc' : [5.1, 15, 25, 37, 38, 39],
136 'muonmc' : [5.1, 124.4, 124.5, 20, 21, 22, 23, 25, 30],
141 usage =
'usage: runTheMatrix.py --show -s ' 143 parser = argparse.ArgumentParser(usage,formatter_class=argparse.ArgumentDefaultsHelpFormatter)
145 parser.add_argument(
'-b',
'--batchName',
146 help=
'relval batch: suffix to be appended to Campaign name',
150 parser.add_argument(
'-m',
'--memoryOffset',
151 help=
'memory of the wf for single core',
156 parser.add_argument(
'--addMemPerCore',
157 help=
'increase of memory per each n > 1 core: memory(n_core) = memoryOffset + (n_core-1) * memPerCore',
162 parser.add_argument(
'-j',
'--nproc',
163 help=
'number of processes. 0 Will use 4 processes, not execute anything but create the wfs',
168 parser.add_argument(
'-t',
'--nThreads',
169 help=
'number of threads per process to use in cmsRun.',
174 parser.add_argument(
'--nStreams',
175 help=
'number of streams to use in cmsRun.',
180 parser.add_argument(
'--nEvents',
181 help=
'number of events to process in cmsRun. If 0 will use the standard 10 events.',
186 parser.add_argument(
'--numberEventsInLuminosityBlock',
187 help=
'number of events in a luminosity block',
188 dest=
'numberEventsInLuminosityBlock',
192 parser.add_argument(
'-n',
'--showMatrix',
193 help=
'Only show the worflows. Use --ext to show more',
198 parser.add_argument(
'-e',
'--extended',
199 help=
'Show details of workflows, used with --show',
204 parser.add_argument(
'-s',
'--selected',
205 help=
'Run a pre-defined selected matrix of wf. Deprecated, please use -l limited',
210 parser.add_argument(
'-l',
'--list',
211 help=
'Comma separated list of workflow to be shown or ran. Possible keys are also '+
str(predefinedSet.keys())+
'. and wild card like muon, or mc',
215 parser.add_argument(
'-f',
'--failed-from',
216 help=
'Provide a matrix report to specify the workflows to be run again. Augments the -l option if specified already',
220 parser.add_argument(
'-r',
'--raw',
221 help=
'Temporary dump the .txt needed for prodAgent interface. To be discontinued soon. Argument must be the name of the set (standard, pileup,...)',
224 parser.add_argument(
'-i',
'--useInput',
225 help=
'Use recyling where available. Either all, or a comma separated list of wf number.',
227 type=
lambda x: x.split(
','),
230 parser.add_argument(
'-w',
'--what',
231 help=
'Specify the set to be used. Argument must be the name of a set (standard, pileup,...) or multiple sets separated by commas (--what standard,pileup )',
235 parser.add_argument(
'--step1',
236 help=
'Used with --raw. Limit the production to step1',
240 parser.add_argument(
'--maxSteps',
241 help=
'Only run maximum on maxSteps. Used when we are only interested in first n steps.',
246 parser.add_argument(
'--fromScratch',
247 help=
'Comma separated list of wf to be run without recycling. all is not supported as default.',
249 type=
lambda x: x.split(
','),
252 parser.add_argument(
'--refRelease',
253 help=
'Allow to modify the recycling dataset version',
257 parser.add_argument(
'--wmcontrol',
258 help=
'Create the workflows for injection to WMAgent. In the WORKING. -wmcontrol init will create the the workflows, -wmcontrol test will dryRun a test, -wmcontrol submit will submit to wmagent',
259 choices=[
'init',
'test',
'submit',
'force'],
263 parser.add_argument(
'--revertDqmio',
264 help=
'When submitting workflows to wmcontrol, force DQM outout to use pool and not DQMIO',
265 choices=[
'yes',
'no'],
269 parser.add_argument(
'--optionswm',
270 help=
'Specify a few things for wm injection',
274 parser.add_argument(
'--keep',
275 help=
'allow to specify for which comma separated steps the output is needed',
278 parser.add_argument(
'--label',
279 help=
'allow to give a special label to the output dataset name',
282 parser.add_argument(
'--command',
283 help=
'provide a way to add additional command to all of the cmsDriver commands in the matrix',
288 parser.add_argument(
'--apply',
289 help=
'allow to use the --command only for 1 comma separeated',
293 parser.add_argument(
'--workflow',
294 help=
'define a workflow to be created or altered from the matrix',
299 parser.add_argument(
'--dryRun',
300 help=
'do not run the wf at all',
305 parser.add_argument(
'--testbed',
306 help=
'workflow injection to cmswebtest (you need dedicated rqmgr account)',
311 parser.add_argument(
'--noCafVeto',
312 help=
'Run from any source, ignoring the CAF label',
315 action=
'store_false')
317 parser.add_argument(
'--overWrite',
318 help=
'Change the content of a step for another. List of pairs.',
322 parser.add_argument(
'--noRun',
323 help=
'Remove all run list selection from wfs',
328 parser.add_argument(
'--das-options',
329 help=
'Options to be passed to dasgoclient.',
334 parser.add_argument(
'--job-reports',
335 help=
'Dump framework job reports',
340 parser.add_argument(
'--ibeos',
341 help=
'Use IB EOS site configuration',
346 parser.add_argument(
'--sites',
347 help=
'Run DAS query to get data from a specific site. Set it to empty string to search all sites.',
349 default=
'T2_CH_CERN',
352 parser.add_argument(
'--interactive',
353 help=
"Open the Matrix interactive shell",
357 parser.add_argument(
'--dbs-url',
358 help=
'Overwrite DbsUrl value in JSON submitted to ReqMgr2',
363 gpugroup = parser.add_argument_group(
'GPU-related options',
'These options are only meaningful when --gpu is used, and is not set to forbidden.')
365 gpugroup.add_argument(
'--gpu',
'--requires-gpu',
366 help=
'Enable GPU workflows. Possible options are "forbidden" (default), "required" (implied if no argument is given), or "optional".',
368 choices=[
'forbidden',
'optional',
'required'],
374 gpugroup.add_argument(
'--gpu-memory',
375 help=
'Specify the minimum amount of GPU memory required by the job, in MB.',
380 gpugroup.add_argument(
'--cuda-capabilities',
381 help=
'Specify a comma-separated list of CUDA "compute capabilities", or GPU hardware architectures, that the job can use.',
382 dest=
'CUDACapabilities',
383 type=
lambda x: x.split(
','),
384 default=
'6.0,6.1,6.2,7.0,7.2,7.5,8.0,8.6')
387 cudart_version =
None 388 libcudart = os.path.realpath(os.path.expandvars(
'$CMSSW_RELEASE_BASE/external/$SCRAM_ARCH/lib/libcudart.so'))
389 if os.path.isfile(libcudart):
390 cudart_basename = os.path.basename(libcudart)
391 cudart_version =
'.'.
join(cudart_basename.split(
'.')[2:4])
392 gpugroup.add_argument(
'--cuda-runtime',
393 help=
'Specify major and minor version of the CUDA runtime used to build the application.',
395 default=cudart_version)
397 gpugroup.add_argument(
'--force-gpu-name',
398 help=
'Request a specific GPU model, e.g. "Tesla T4" or "NVIDIA GeForce RTX 2080". The default behaviour is to accept any supported GPU.',
402 gpugroup.add_argument(
'--force-cuda-driver-version',
403 help=
'Request a specific CUDA driver version, e.g. 470.57.02. The default behaviour is to accept any supported CUDA driver version.',
404 dest=
'CUDADriverVersion',
407 gpugroup.add_argument(
'--force-cuda-runtime-version',
408 help=
'Request a specific CUDA runtime version, e.g. 11.4. The default behaviour is to accept any supported CUDA runtime version.',
409 dest=
'CUDARuntimeVersion',
412 opt = parser.parse_args()
413 if opt.command: opt.command =
' '.
join(opt.command)
414 os.environ[
"CMSSW_DAS_QUERY_SITES"]=opt.dasSites
417 with open(opt.failed_from,
'r') as report: 418 for report_line
in report:
419 if 'FAILED' in report_line:
420 to_run,_=report_line.split(
'_',1)
421 rerunthese.append(to_run)
423 opt.testList+=
','.
join([
'']+rerunthese)
425 opt.testList =
','.
join(rerunthese)
428 from subprocess
import getstatusoutput
as run_cmd
430 ibeos_cache = os.path.join(os.getenv(
"LOCALRT"),
"ibeos_cache.txt")
431 if not os.path.exists(ibeos_cache):
432 err, out = run_cmd(
"curl -L -s -o %s https://raw.githubusercontent.com/cms-sw/cms-sw.github.io/master/das_queries/ibeos.txt" % ibeos_cache)
434 run_cmd(
"rm -f %s" % ibeos_cache)
435 print(
"Error: Unable to download ibeos cache information")
439 for cmssw_env
in [
"CMSSW_BASE",
"CMSSW_RELEASE_BASE" ]:
440 cmssw_base = os.getenv(cmssw_env,
None)
441 if not cmssw_base:
continue 442 cmssw_base = os.path.join(cmssw_base,
"src/Utilities/General/ibeos")
443 if os.path.exists(cmssw_base):
444 os.environ[
"PATH"]=cmssw_base+
":"+os.getenv(
"PATH")
445 os.environ[
"CMS_PATH"]=
"/cvmfs/cms-ib.cern.ch" 446 os.environ[
"SITECONFIG_PATH"]=
"/cvmfs/cms-ib.cern.ch/SITECONF/local" 447 os.environ[
"CMSSW_USE_IBEOS"]=
"true" 448 print(
">> WARNING: You are using SITECONF from /cvmfs/cms-ib.cern.ch")
451 print(
'Deprecated, please use -l limited')
452 if opt.testList: opt.testList+=
',limited' 453 else: opt.testList=
'limited' 461 opt.apply=
map(stepOrIndex,opt.apply.split(
','))
463 opt.keep=
map(stepOrIndex,opt.keep.split(
','))
467 for entry
in opt.testList.split(
','):
468 if not entry:
continue 470 for k
in predefinedSet:
471 if k.lower().startswith(entry.lower())
or k.lower().endswith(entry.lower()):
472 testList.extend(predefinedSet[k])
477 testList.append(
float(entry))
479 print(entry,
'is not a possible selected entry')
481 opt.testList = list(set(testList))
486 opt.overWrite=eval(opt.overWrite)
489 from colorama
import Fore, Style
490 from os
import isatty
495 intro =
"Welcome to the Matrix (? for help)" 499 cmd.Cmd.__init__(self)
504 for what
in tmp.files:
505 what = what.replace(
'relval_',
'')
506 self.
opt_.what = what
509 self.
opt_.fromScratch)
513 """Clear the screen, put prompt at the top""" 517 print(
"Leaving the Matrix")
521 if inp ==
'x' or inp ==
'q':
524 is_pipe =
not isatty(sys.stdin.fileno())
525 print(Fore.RED +
"Error: " + Fore.RESET +
"unrecognized command.")
531 print(
"\n".
join([
"predefined [predef1 [...]]\n",
532 "Run w/o argument, it will print the list of known predefined workflows.",
533 "Run with space-separated predefined workflows, it will print the workflow-ids registered to them"]))
536 if text
and len(text) > 0:
537 return [t
for t
in predefinedSet.keys()
if t.startswith(text)]
539 return predefinedSet.keys()
542 """Print the list of predefined workflows""" 543 print(
"List of predefined workflows")
545 for w
in arg.split():
546 if w
in predefinedSet.keys():
547 print(
"Predefined Set: %s" % w)
548 print(predefinedSet[w])
550 print(
"Unknown Set: %s" % w)
552 print(
"[ " + Fore.RED +
", ".
join([
str(k)
for k
in predefinedSet.keys()]) + Fore.RESET +
" ]")
555 print(
"\n".
join([
"showWorkflow [workflow1 [...]]\n",
556 "Run w/o arguments, it will print the list of registered macro-workflows.",
557 "Run with space-separated workflows, it will print the full list of workflow-ids registered to them"]))
560 if text
and len(text) > 0:
561 return [t
for t
in self.
matrices_.
keys()
if t.startswith(text)]
567 print(
"Available workflows:")
569 print(Fore.RED + Style.BRIGHT + k)
570 print(Style.RESET_ALL)
572 selected = arg.split()
575 print(
"Unknown workflow %s: skipping" % k)
578 print(
"%s %s" % (Fore.BLUE +
str(wfl.numId) + Fore.RESET,
579 Fore.GREEN + wfl.nameId + Fore.RESET))
580 print(
"%s contains %d workflows" % (Fore.RED + k + Fore.RESET, len(self.
matrices_[k].workFlows)))
586 print(Fore.RED + Style.BRIGHT +
"Wrong number of parameters passed")
587 print(Style.RESET_ALL)
589 workflow_class = args[0]
590 workflow_id = args[1]
591 passed_down_args = list()
593 passed_down_args = args[2:]
594 print(Fore.YELLOW + Style.BRIGHT +
"Running with the following options:\n")
595 print(Fore.GREEN + Style.BRIGHT +
"Workflow class: {}".
format(workflow_class))
596 print(Fore.GREEN + Style.BRIGHT +
"Workflow ID: {}".
format(workflow_id))
597 print(Fore.GREEN + Style.BRIGHT +
"Additional runTheMatrix options: {}".
format(passed_down_args))
598 print(Style.RESET_ALL)
600 print(Fore.RED + Style.BRIGHT +
"Unknown workflow selected: {}".
format(workflow_class))
601 print(
"Available workflows:")
603 print(Fore.RED + Style.BRIGHT + k)
604 print(Style.RESET_ALL)
606 wflnums = [x.numId
for x
in self.
matrices_[workflow_class].workFlows]
607 if float(workflow_id)
not in wflnums:
608 print(Fore.RED + Style.BRIGHT +
"Unknown workflow {}".
format(workflow_id))
609 print(Fore.GREEN + Style.BRIGHT)
611 print(Style.RESET_ALL)
615 if self.
processes_[workflow_id][0].poll()
is None:
616 print(Fore.RED + Style.BRIGHT +
"Workflow {} already running!".
format(workflow_id))
617 print(Style.RESET_ALL)
621 lognames = [
'stdout',
'stderr']
622 logfiles =
tuple(
'%s_%s_%s.log' % (workflow_class, workflow_id, name)
for name
in lognames)
623 stdout = open(logfiles[0],
'w')
624 stderr = open(logfiles[1],
'w')
625 command = (
'runTheMatrix.py',
'-w', workflow_class,
'-l', workflow_id)
626 if len(passed_down_args) > 0:
627 command +=
tuple(passed_down_args)
629 p = subprocess.Popen(command,
632 self.
processes_[workflow_id] = (p, time.time())
636 if text
and len(text) > 0:
637 return [t
for t
in self.
matrices_.
keys()
if t.startswith(text)]
642 print(
"\n".
join([
"runWorkflow workflow_class workflow_id\n",
643 "This command will launch a new and independent process that invokes",
645 "runTheMatrix.py -w workflow_class -l workflow_id [runTheMatrix.py options]",
646 "\nYou can specify just one workflow_class and workflow_id per invocation.",
647 "The job will continue even after quitting the interactive session.",
648 "stdout and stderr of the new process will be automatically",
649 "redirected to 2 logfiles whose names contain the workflow_class",
650 "and workflow_id. Mutiple command can be issued one after the other.",
651 "The working directory of the new process will be the directory",
652 "from which the interactive session has started.",
653 "Autocompletion is available for workflow_class, but",
654 "not for workflow_id. Supplying a wrong workflow_class or",
655 "a non-existing workflow_id for a valid workflow_class",
656 "will trigger an error and no process will be invoked.",
657 "The interactive shell will keep track of all active processes",
658 "and will prevent the accidental resubmission of an already",
662 print(Fore.GREEN + Style.BRIGHT +
"List of jobs:")
665 print(Fore.YELLOW + Style.BRIGHT +
"Active job: {} since {:.2f} seconds.".
format(w, time.time() - self.
processes_[w][1]))
667 print(Fore.RED + Style.BRIGHT +
"Done job: {}".
format(w))
668 print(Style.RESET_ALL)
671 print(
"\n".
join([
"Print a full list of active and done jobs submitted",
672 "in the ongoing interactive session"]))
675 print(
"\n".
join([
"searchInWorkflow wfl_name search_regexp\n",
676 "This command will search for a match within all workflows registered to wfl_name.",
677 "The search is done on both the workflow name and the names of steps registered to it."]))
680 if text
and len(text) > 0:
681 return [t
for t
in self.
matrices_.
keys()
if t.startswith(text)]
688 print(
"searchInWorkflow name regexp")
691 print(
"Unknown workflow")
696 pattern = re.compile(args[1])
698 print(
"Failed to compile regexp %s" % args[1])
701 for wfl
in self.
matrices_[args[0]].workFlows:
702 if re.match(pattern, wfl.nameId):
703 print(
"%s %s" % (Fore.BLUE +
str(wfl.numId) + Fore.RESET,
704 Fore.GREEN + wfl.nameId + Fore.RESET))
706 print(
"Found %s compatible workflows inside %s" % (Fore.RED +
str(counter) + Fore.RESET,
707 Fore.YELLOW +
str(args[0])) + Fore.RESET)
710 print(
"\n".
join([
"search search_regexp\n",
711 "This command will search for a match within all workflows registered.",
712 "The search is done on both the workflow name and the names of steps registered to it."]))
717 print(
"search regexp")
723 print(
"\n".
join([
"dumpWorkflowId [wfl-id1 [...]]\n",
724 "Dumps the details (cmsDriver commands for all steps) of the space-separated workflow-ids in input."]))
729 print(
"dumpWorkflowId [wfl-id1 [...]]")
737 for wfl
in mrd.workFlows:
738 if wfl.numId ==
float(wflid):
741 print(Fore.GREEN +
str(wfl.numId) + Fore.RESET +
" " + Fore.YELLOW + wfl.nameId + Fore.RESET)
742 for i,s
in enumerate(wfl.cmds):
743 print(fmt % (Fore.RED +
str(i+1) + Fore.RESET,
745 print(
"\nWorkflow found in %s." % key)
747 print(
"Workflow also found in %s." % key)
754 if opt.raw
and opt.show:
def performInjectionOptionTest(opt)
def do_predefined(self, arg)
def do_showWorkflow(self, arg)
def complete_showWorkflow(self, text, line, start_idx, end_idx)
def help_showWorkflow(self)
def help_dumpWorkflowId(self)
def complete_predefined(self, text, line, start_idx, end_idx)
def help_searchInWorkflow(self)
def do_dumpWorkflowId(self, arg)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
def help_runWorkflow(self)
def help_predefined(self)
static std::string join(char **cmd)
def complete_searchInWorkflow(self, text, line, start_idx, end_idx)
def complete_runWorkflow(self, text, line, start_idx, end_idx)
def do_runWorkflow(self, arg)
def do_searchInWorkflow(self, arg)