2 from __future__
import print_function
5 from Configuration.PyReleaseValidation.MatrixReader
import MatrixReader
6 from Configuration.PyReleaseValidation.MatrixRunner
import MatrixRunner
7 from Configuration.PyReleaseValidation.MatrixInjector
import MatrixInjector,performInjectionOptionTest
14 mrd.showRaw(opt.useInput, opt.refRel, opt.fromScratch, opt.raw, opt.step1Only, selected=opt.testList)
23 mrd.prepare(opt.useInput, opt.refRel, opt.fromScratch)
27 definedWf = [dwf.numId
for dwf
in mrd.workFlows]
28 definedSet = set(definedWf)
29 testSet = set(opt.testList)
30 undefSet = testSet - definedSet
31 if len(undefSet)>0:
raise ValueError(
'Undefined workflows: '+
', '.
join(
map(str,list(undefSet))))
32 duplicates = [wf
for wf
in testSet
if definedWf.count(wf)>1 ]
33 if len(duplicates)>0:
raise ValueError(
'Duplicated workflows: '+
', '.
join(
map(str,list(duplicates))))
37 mrd.show(opt.testList, opt.extended, opt.cafVeto)
38 if opt.testList :
print(
'selected items:', opt.testList)
40 mRunnerHi =
MatrixRunner(mrd.workFlows, opt.nProcs, opt.nThreads)
41 ret = mRunnerHi.runTests(opt)
45 print(
'Cannot go on with wmagent injection with failing workflows')
47 wfInjector =
MatrixInjector(opt,mode=opt.wmcontrol,options=opt.wmoptions)
48 ret= wfInjector.prepare(mrd,
57 if __name__ ==
'__main__':
135 'jetmc': [5.1, 13, 15, 25, 38, 39],
136 'metmc' : [5.1, 15, 25, 37, 38, 39],
137 'muonmc' : [5.1, 124.4, 124.5, 20, 21, 22, 23, 25, 30],
142 usage =
'usage: runTheMatrix.py --show -s ' 144 parser = argparse.ArgumentParser(usage,formatter_class=argparse.ArgumentDefaultsHelpFormatter)
146 parser.add_argument(
'-b',
'--batchName',
147 help=
'relval batch: suffix to be appended to Campaign name',
151 parser.add_argument(
'-m',
'--memoryOffset',
152 help=
'memory of the wf for single core',
157 parser.add_argument(
'--addMemPerCore',
158 help=
'increase of memory per each n > 1 core: memory(n_core) = memoryOffset + (n_core-1) * memPerCore',
163 parser.add_argument(
'-j',
'--nproc',
164 help=
'number of processes. 0 Will use 4 processes, not execute anything but create the wfs',
169 parser.add_argument(
'-t',
'--nThreads',
170 help=
'number of threads per process to use in cmsRun.',
175 parser.add_argument(
'--nStreams',
176 help=
'number of streams to use in cmsRun.',
181 parser.add_argument(
'--nEvents',
182 help=
'number of events to process in cmsRun. If 0 will use the standard 10 events.',
187 parser.add_argument(
'--numberEventsInLuminosityBlock',
188 help=
'number of events in a luminosity block',
189 dest=
'numberEventsInLuminosityBlock',
193 parser.add_argument(
'-n',
'--showMatrix',
194 help=
'Only show the worflows. Use --ext to show more',
199 parser.add_argument(
'-e',
'--extended',
200 help=
'Show details of workflows, used with --show',
205 parser.add_argument(
'-s',
'--selected',
206 help=
'Run a pre-defined selected matrix of wf. Deprecated, please use -l limited',
211 parser.add_argument(
'-l',
'--list',
212 help=
'Comma separated list of workflow to be shown or ran. Possible keys are also '+
str(predefinedSet.keys())+
'. and wild card like muon, or mc',
216 parser.add_argument(
'-f',
'--failed-from',
217 help=
'Provide a matrix report to specify the workflows to be run again. Augments the -l option if specified already',
221 parser.add_argument(
'-r',
'--raw',
222 help=
'Temporary dump the .txt needed for prodAgent interface. To be discontinued soon. Argument must be the name of the set (standard, pileup,...)',
225 parser.add_argument(
'-i',
'--useInput',
226 help=
'Use recyling where available. Either all, or a comma separated list of wf number.',
228 type=
lambda x: x.split(
','),
231 parser.add_argument(
'-w',
'--what',
232 help=
'Specify the set to be used. Argument must be the name of a set (standard, pileup,...) or multiple sets separated by commas (--what standard,pileup )',
236 parser.add_argument(
'--step1',
237 help=
'Used with --raw. Limit the production to step1',
241 parser.add_argument(
'--maxSteps',
242 help=
'Only run maximum on maxSteps. Used when we are only interested in first n steps.',
247 parser.add_argument(
'--fromScratch',
248 help=
'Comma separated list of wf to be run without recycling. all is not supported as default.',
250 type=
lambda x: x.split(
','),
253 parser.add_argument(
'--refRelease',
254 help=
'Allow to modify the recycling dataset version',
258 parser.add_argument(
'--wmcontrol',
259 help=
'Create the workflows for injection to WMAgent. In the WORKING. -wmcontrol init will create the the workflows, -wmcontrol test will dryRun a test, -wmcontrol submit will submit to wmagent',
260 choices=[
'init',
'test',
'submit',
'force'],
264 parser.add_argument(
'--revertDqmio',
265 help=
'When submitting workflows to wmcontrol, force DQM outout to use pool and not DQMIO',
266 choices=[
'yes',
'no'],
270 parser.add_argument(
'--optionswm',
271 help=
'Specify a few things for wm injection',
275 parser.add_argument(
'--keep',
276 help=
'allow to specify for which comma separated steps the output is needed',
279 parser.add_argument(
'--label',
280 help=
'allow to give a special label to the output dataset name',
283 parser.add_argument(
'--command',
284 help=
'provide a way to add additional command to all of the cmsDriver commands in the matrix',
289 parser.add_argument(
'--apply',
290 help=
'allow to use the --command only for 1 comma separeated',
294 parser.add_argument(
'--workflow',
295 help=
'define a workflow to be created or altered from the matrix',
300 parser.add_argument(
'--dryRun',
301 help=
'do not run the wf at all',
306 parser.add_argument(
'--testbed',
307 help=
'workflow injection to cmswebtest (you need dedicated rqmgr account)',
312 parser.add_argument(
'--noCafVeto',
313 help=
'Run from any source, ignoring the CAF label',
316 action=
'store_false')
318 parser.add_argument(
'--overWrite',
319 help=
'Change the content of a step for another. List of pairs.',
323 parser.add_argument(
'--noRun',
324 help=
'Remove all run list selection from wfs',
329 parser.add_argument(
'--das-options',
330 help=
'Options to be passed to dasgoclient.',
335 parser.add_argument(
'--job-reports',
336 help=
'Dump framework job reports',
341 parser.add_argument(
'--ibeos',
342 help=
'Use IB EOS site configuration',
347 parser.add_argument(
'--sites',
348 help=
'Run DAS query to get data from a specific site. Set it to empty string to search all sites.',
350 default=
'T2_CH_CERN',
353 parser.add_argument(
'--interactive',
354 help=
"Open the Matrix interactive shell",
358 parser.add_argument(
'--dbs-url',
359 help=
'Overwrite DbsUrl value in JSON submitted to ReqMgr2',
364 gpugroup = parser.add_argument_group(
'GPU-related options',
'These options are only meaningful when --gpu is used, and is not set to forbidden.')
366 gpugroup.add_argument(
'--gpu',
'--requires-gpu',
367 help=
'Enable GPU workflows. Possible options are "forbidden" (default), "required" (implied if no argument is given), or "optional".',
369 choices=[
'forbidden',
'optional',
'required'],
375 gpugroup.add_argument(
'--gpu-memory',
376 help=
'Specify the minimum amount of GPU memory required by the job, in MB.',
381 gpugroup.add_argument(
'--cuda-capabilities',
382 help=
'Specify a comma-separated list of CUDA "compute capabilities", or GPU hardware architectures, that the job can use.',
383 dest=
'CUDACapabilities',
384 type=
lambda x: x.split(
','),
385 default=
'6.0,6.1,6.2,7.0,7.2,7.5,8.0,8.6')
388 cudart_version =
None 389 libcudart = os.path.realpath(os.path.expandvars(
'$CMSSW_RELEASE_BASE/external/$SCRAM_ARCH/lib/libcudart.so'))
390 if os.path.isfile(libcudart):
391 cudart_basename = os.path.basename(libcudart)
392 cudart_version =
'.'.
join(cudart_basename.split(
'.')[2:4])
393 gpugroup.add_argument(
'--cuda-runtime',
394 help=
'Specify major and minor version of the CUDA runtime used to build the application.',
396 default=cudart_version)
398 gpugroup.add_argument(
'--force-gpu-name',
399 help=
'Request a specific GPU model, e.g. "Tesla T4" or "NVIDIA GeForce RTX 2080". The default behaviour is to accept any supported GPU.',
403 gpugroup.add_argument(
'--force-cuda-driver-version',
404 help=
'Request a specific CUDA driver version, e.g. 470.57.02. The default behaviour is to accept any supported CUDA driver version.',
405 dest=
'CUDADriverVersion',
408 gpugroup.add_argument(
'--force-cuda-runtime-version',
409 help=
'Request a specific CUDA runtime version, e.g. 11.4. The default behaviour is to accept any supported CUDA runtime version.',
410 dest=
'CUDARuntimeVersion',
413 opt = parser.parse_args()
414 if opt.command: opt.command =
' '.
join(opt.command)
415 os.environ[
"CMSSW_DAS_QUERY_SITES"]=opt.dasSites
418 with open(opt.failed_from,
'r') as report: 419 for report_line
in report:
420 if 'FAILED' in report_line:
421 to_run,_=report_line.split(
'_',1)
422 rerunthese.append(to_run)
424 opt.testList+=
','.
join([
'']+rerunthese)
426 opt.testList =
','.
join(rerunthese)
429 from subprocess
import getstatusoutput
as run_cmd
431 ibeos_cache = os.path.join(os.getenv(
"LOCALRT"),
"ibeos_cache.txt")
432 if not os.path.exists(ibeos_cache):
433 err, out = run_cmd(
"curl -L -s -o %s https://raw.githubusercontent.com/cms-sw/cms-sw.github.io/master/das_queries/ibeos.txt" % ibeos_cache)
435 run_cmd(
"rm -f %s" % ibeos_cache)
436 print(
"Error: Unable to download ibeos cache information")
440 for cmssw_env
in [
"CMSSW_BASE",
"CMSSW_RELEASE_BASE" ]:
441 cmssw_base = os.getenv(cmssw_env,
None)
442 if not cmssw_base:
continue 443 cmssw_base = os.path.join(cmssw_base,
"src/Utilities/General/ibeos")
444 if os.path.exists(cmssw_base):
445 os.environ[
"PATH"]=cmssw_base+
":"+os.getenv(
"PATH")
446 os.environ[
"CMS_PATH"]=
"/cvmfs/cms-ib.cern.ch" 447 os.environ[
"SITECONFIG_PATH"]=
"/cvmfs/cms-ib.cern.ch/SITECONF/local" 448 os.environ[
"CMSSW_USE_IBEOS"]=
"true" 449 print(
">> WARNING: You are using SITECONF from /cvmfs/cms-ib.cern.ch")
452 print(
'Deprecated, please use -l limited')
453 if opt.testList: opt.testList+=
',limited' 454 else: opt.testList=
'limited' 462 opt.apply=
map(stepOrIndex,opt.apply.split(
','))
464 opt.keep=
map(stepOrIndex,opt.keep.split(
','))
468 for entry
in opt.testList.split(
','):
469 if not entry:
continue 471 for k
in predefinedSet:
472 if k.lower().startswith(entry.lower())
or k.lower().endswith(entry.lower()):
473 testList.extend(predefinedSet[k])
478 testList.append(
float(entry))
480 print(entry,
'is not a possible selected entry')
482 opt.testList = list(set(testList))
487 opt.overWrite=eval(opt.overWrite)
490 from colorama
import Fore, Style
491 from os
import isatty
496 intro =
"Welcome to the Matrix (? for help)" 500 cmd.Cmd.__init__(self)
505 for what
in tmp.files:
506 what = what.replace(
'relval_',
'')
507 self.
opt_.what = what
510 self.
opt_.fromScratch)
514 """Clear the screen, put prompt at the top""" 518 print(
"Leaving the Matrix")
522 if inp ==
'x' or inp ==
'q':
525 is_pipe =
not isatty(sys.stdin.fileno())
526 print(Fore.RED +
"Error: " + Fore.RESET +
"unrecognized command.")
532 print(
"\n".
join([
"predefined [predef1 [...]]\n",
533 "Run w/o argument, it will print the list of known predefined workflows.",
534 "Run with space-separated predefined workflows, it will print the workflow-ids registered to them"]))
537 if text
and len(text) > 0:
538 return [t
for t
in predefinedSet.keys()
if t.startswith(text)]
540 return predefinedSet.keys()
543 """Print the list of predefined workflows""" 544 print(
"List of predefined workflows")
546 for w
in arg.split():
547 if w
in predefinedSet.keys():
548 print(
"Predefined Set: %s" % w)
549 print(predefinedSet[w])
551 print(
"Unknown Set: %s" % w)
553 print(
"[ " + Fore.RED +
", ".
join([
str(k)
for k
in predefinedSet.keys()]) + Fore.RESET +
" ]")
556 print(
"\n".
join([
"showWorkflow [workflow1 [...]]\n",
557 "Run w/o arguments, it will print the list of registered macro-workflows.",
558 "Run with space-separated workflows, it will print the full list of workflow-ids registered to them"]))
561 if text
and len(text) > 0:
562 return [t
for t
in self.
matrices_.
keys()
if t.startswith(text)]
568 print(
"Available workflows:")
570 print(Fore.RED + Style.BRIGHT + k)
571 print(Style.RESET_ALL)
573 selected = arg.split()
576 print(
"Unknown workflow %s: skipping" % k)
579 print(
"%s %s" % (Fore.BLUE +
str(wfl.numId) + Fore.RESET,
580 Fore.GREEN + wfl.nameId + Fore.RESET))
581 print(
"%s contains %d workflows" % (Fore.RED + k + Fore.RESET, len(self.
matrices_[k].workFlows)))
587 print(Fore.RED + Style.BRIGHT +
"Wrong number of parameters passed")
588 print(Style.RESET_ALL)
590 workflow_class = args[0]
591 workflow_id = args[1]
592 passed_down_args = list()
594 passed_down_args = args[2:]
595 print(Fore.YELLOW + Style.BRIGHT +
"Running with the following options:\n")
596 print(Fore.GREEN + Style.BRIGHT +
"Workflow class: {}".
format(workflow_class))
597 print(Fore.GREEN + Style.BRIGHT +
"Workflow ID: {}".
format(workflow_id))
598 print(Fore.GREEN + Style.BRIGHT +
"Additional runTheMatrix options: {}".
format(passed_down_args))
599 print(Style.RESET_ALL)
601 print(Fore.RED + Style.BRIGHT +
"Unknown workflow selected: {}".
format(workflow_class))
602 print(
"Available workflows:")
604 print(Fore.RED + Style.BRIGHT + k)
605 print(Style.RESET_ALL)
607 wflnums = [x.numId
for x
in self.
matrices_[workflow_class].workFlows]
608 if float(workflow_id)
not in wflnums:
609 print(Fore.RED + Style.BRIGHT +
"Unknown workflow {}".
format(workflow_id))
610 print(Fore.GREEN + Style.BRIGHT)
612 print(Style.RESET_ALL)
616 if self.
processes_[workflow_id][0].poll()
is None:
617 print(Fore.RED + Style.BRIGHT +
"Workflow {} already running!".
format(workflow_id))
618 print(Style.RESET_ALL)
622 lognames = [
'stdout',
'stderr']
623 logfiles =
tuple(
'%s_%s_%s.log' % (workflow_class, workflow_id, name)
for name
in lognames)
624 stdout = open(logfiles[0],
'w')
625 stderr = open(logfiles[1],
'w')
626 command = (
'runTheMatrix.py',
'-w', workflow_class,
'-l', workflow_id)
627 if len(passed_down_args) > 0:
628 command +=
tuple(passed_down_args)
630 p = subprocess.Popen(command,
633 self.
processes_[workflow_id] = (p, time.time())
637 if text
and len(text) > 0:
638 return [t
for t
in self.
matrices_.
keys()
if t.startswith(text)]
643 print(
"\n".
join([
"runWorkflow workflow_class workflow_id\n",
644 "This command will launch a new and independent process that invokes",
646 "runTheMatrix.py -w workflow_class -l workflow_id [runTheMatrix.py options]",
647 "\nYou can specify just one workflow_class and workflow_id per invocation.",
648 "The job will continue even after quitting the interactive session.",
649 "stdout and stderr of the new process will be automatically",
650 "redirected to 2 logfiles whose names contain the workflow_class",
651 "and workflow_id. Mutiple command can be issued one after the other.",
652 "The working directory of the new process will be the directory",
653 "from which the interactive session has started.",
654 "Autocompletion is available for workflow_class, but",
655 "not for workflow_id. Supplying a wrong workflow_class or",
656 "a non-existing workflow_id for a valid workflow_class",
657 "will trigger an error and no process will be invoked.",
658 "The interactive shell will keep track of all active processes",
659 "and will prevent the accidental resubmission of an already",
663 print(Fore.GREEN + Style.BRIGHT +
"List of jobs:")
666 print(Fore.YELLOW + Style.BRIGHT +
"Active job: {} since {:.2f} seconds.".
format(w, time.time() - self.
processes_[w][1]))
668 print(Fore.RED + Style.BRIGHT +
"Done job: {}".
format(w))
669 print(Style.RESET_ALL)
672 print(
"\n".
join([
"Print a full list of active and done jobs submitted",
673 "in the ongoing interactive session"]))
676 print(
"\n".
join([
"searchInWorkflow wfl_name search_regexp\n",
677 "This command will search for a match within all workflows registered to wfl_name.",
678 "The search is done on both the workflow name and the names of steps registered to it."]))
681 if text
and len(text) > 0:
682 return [t
for t
in self.
matrices_.
keys()
if t.startswith(text)]
689 print(
"searchInWorkflow name regexp")
692 print(
"Unknown workflow")
697 pattern = re.compile(args[1])
699 print(
"Failed to compile regexp %s" % args[1])
702 for wfl
in self.
matrices_[args[0]].workFlows:
703 if re.match(pattern, wfl.nameId):
704 print(
"%s %s" % (Fore.BLUE +
str(wfl.numId) + Fore.RESET,
705 Fore.GREEN + wfl.nameId + Fore.RESET))
707 print(
"Found %s compatible workflows inside %s" % (Fore.RED +
str(counter) + Fore.RESET,
708 Fore.YELLOW +
str(args[0])) + Fore.RESET)
711 print(
"\n".
join([
"search search_regexp\n",
712 "This command will search for a match within all workflows registered.",
713 "The search is done on both the workflow name and the names of steps registered to it."]))
718 print(
"search regexp")
724 print(
"\n".
join([
"dumpWorkflowId [wfl-id1 [...]]\n",
725 "Dumps the details (cmsDriver commands for all steps) of the space-separated workflow-ids in input."]))
730 print(
"dumpWorkflowId [wfl-id1 [...]]")
738 for wfl
in mrd.workFlows:
739 if wfl.numId ==
float(wflid):
742 print(Fore.GREEN +
str(wfl.numId) + Fore.RESET +
" " + Fore.YELLOW + wfl.nameId + Fore.RESET)
743 for i,s
in enumerate(wfl.cmds):
744 print(fmt % (Fore.RED +
str(i+1) + Fore.RESET,
746 print(
"\nWorkflow found in %s." % key)
748 print(
"Workflow also found in %s." % key)
755 if opt.raw
and opt.show:
def performInjectionOptionTest(opt)
def do_predefined(self, arg)
def do_showWorkflow(self, arg)
def complete_showWorkflow(self, text, line, start_idx, end_idx)
def help_showWorkflow(self)
def help_dumpWorkflowId(self)
def complete_predefined(self, text, line, start_idx, end_idx)
def help_searchInWorkflow(self)
def do_dumpWorkflowId(self, arg)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
def help_runWorkflow(self)
def help_predefined(self)
static std::string join(char **cmd)
def complete_searchInWorkflow(self, text, line, start_idx, end_idx)
def complete_runWorkflow(self, text, line, start_idx, end_idx)
def do_runWorkflow(self, arg)
def do_searchInWorkflow(self, arg)