10 from collections
import namedtuple
11 from collections
import defaultdict
12 from multiprocessing.pool
import ThreadPool
14 Sequence = namedtuple(
"Sequence", [
"seqname",
"step",
"era",
"scenario",
"mc",
"data",
"fast"])
25 INFILE =
"/store/data/Run2018A/EGamma/RAW/v1/000/315/489/00000/004D960A-EA4C-E811-A908-FA163ED1F481.root" 28 BLACKLIST=
'^(TriggerResults|.*_step|DQMoutput|siPixelDigis)$' 33 @functools.lru_cache(maxsize=
None)
39 wd = tempfile.mkdtemp()
42 with open(wd +
"/gdb",
"w"):
44 os.chmod(wd +
"/gdb", 0o700)
45 env = os.environ.copy()
46 env[
"PATH"] = wd +
":" + env[
"PATH"]
52 "--conditions",
"auto:run2_data",
53 "-s", seq.step+sep+seq.seqname,
55 "--mc" if seq.mc
else "",
"--data" if seq.data
else "",
"--fast" if seq.fast
else "",
56 "--era" if seq.era
else "", seq.era,
57 "--eventcontent",
"DQM",
"--scenario" if seq.scenario
else "", seq.scenario,
58 "--datatier",
"DQMIO",
59 "--customise_commands",
'process.Tracer = cms.Service("Tracer")',
60 "--filein", INFILE,
"-n",
"0",
61 "--python_filename",
"cmssw_cfg.py",
"--no_exec" 64 driverargs = [x
for x
in driverargs
if x]
65 subprocess.check_call(driverargs, cwd=wd, stdout=2)
68 proc = subprocess.Popen([
"cmsRun",
"cmssw_cfg.py"], stderr=subprocess.STDOUT, stdout=subprocess.PIPE, cwd=wd, env=env)
69 tracedump, _ = proc.communicate()
72 if proc.returncode
and seq.step
not in (
"HARVESTING",
"ALCAHARVEST"):
73 raise Exception(
"cmsRun failed for cmsDriver command %s" % driverargs)
75 lines = tracedump.splitlines()
76 labelre = re.compile(b
"[+]+ starting: constructing module with label '(\w+)'")
77 blacklistre = re.compile(BLACKLIST)
80 m = labelre.match(line)
82 label = m.group(1).
decode()
83 if blacklistre.match(label):
87 modules = set(modules)
90 configdump = subprocess.check_output([
"edmConfigDump",
"cmssw_cfg.py"], cwd=wd)
91 lines = configdump.splitlines()
92 modulere = re.compile(b
'process[.](.*) = cms.ED.*\("(.*)",')
100 modconfig[inconfig] += b
'\n' + line
105 m = modulere.match(line)
107 label = m.group(1).
decode()
108 plugin = m.group(2).
decode()
110 modclass[label] = plugin
111 modconfig[label] = line
115 plugininfo = tp.map(getplugininfo, modclass.values())
120 return modconfig, modclass, dict(plugininfo)
124 @functools.lru_cache(maxsize=
None)
126 plugindump = subprocess.check_output([
"edmPluginHelp",
"-p", pluginname])
127 line = plugindump.splitlines()[0].
decode()
129 pluginre = re.compile(
".* " + pluginname +
".*[(]((\w+)::)?(\w+)[)]")
130 m = pluginre.match(line)
133 return (pluginname, (
"",
""))
135 return (pluginname, (m.group(2), m.group(3)))
140 for label
in modclass.keys():
145 row.append(modclass[label])
147 row.append(
"::".
join(plugininfo[modclass[label]]))
149 row.append(modconfig[label].
decode())
150 out.append(
tuple(row))
151 for row
in sorted(set(out)):
156 SEQFIELDS =
",".
join(Sequence._fields)
157 SEQPLACEHOLDER =
",".
join([
"?" for f
in Sequence._fields])
159 CREATE TABLE IF NOT EXISTS plugin(classname, edmfamily, edmbase); 160 CREATE UNIQUE INDEX IF NOT EXISTS plugins ON plugin(classname); 161 CREATE TABLE IF NOT EXISTS module(id INTEGER PRIMARY KEY, classname, instancename, variation, config); 162 CREATE UNIQUE INDEX IF NOT EXISTS modules ON module(instancename, variation); 163 CREATE UNIQUE INDEX IF NOT EXISTS configs ON module(config); 164 CREATE TABLE IF NOT EXISTS sequence(id INTEGER PRIMARY KEY, {SEQFIELDS}); 165 CREATE UNIQUE INDEX IF NOT EXISTS squences ON sequence({SEQFIELDS}); 166 CREATE TABLE IF NOT EXISTS workflow(wfid, sequenceid); 167 CREATE UNIQUE INDEX IF NOT EXISTS wrokflows ON workflow(sequenceid, wfid); 168 CREATE TABLE IF NOT EXISTS sequencemodule(moduleid, sequenceid); 172 with sqlite3.connect(DBFILE)
as db:
174 cur.executescript(DBSCHEMA)
176 seqid = list(cur.execute(f
"SELECT id FROM sequence WHERE ({SEQFIELDS}) = ({SEQPLACEHOLDER});", (seq)))
180 cur.execute(
"BEGIN;")
182 cur.execute(
"CREATE TEMP TABLE newmodules(instancename, classname, config);")
183 cur.executemany(
"INSERT INTO newmodules VALUES (?, ?, ?)", ((label, modclass[label], modconfig[label])
for label
in modconfig))
186 INSERT OR IGNORE INTO module(classname, instancename, variation, config) 187 SELECT classname, instancename, 188 (SELECT count(*) FROM module AS existing WHERE existing.instancename = newmodules.instancename), 189 config FROM newmodules; 193 cur.executemany(
"INSERT OR IGNORE INTO plugin VALUES (?, ?, ?);", ((plugin, edm[0], edm[1])
for plugin, edm
in plugininfo.items()))
195 cur.execute(f
"INSERT OR FAIL INTO sequence({SEQFIELDS}) VALUES({SEQPLACEHOLDER});", (seq))
196 seqid = list(cur.execute(f
"SELECT id FROM sequence WHERE ({SEQFIELDS}) = ({SEQPLACEHOLDER});", (seq)))
198 cur.executemany(
"INSERT INTO sequencemodule SELECT id, ? FROM module WHERE config = ?;", ((seqid, modconfig[label])
for label
in modconfig))
199 cur.execute(
"COMMIT;")
202 with sqlite3.connect(DBFILE)
as db:
204 cur.execute(
"BEGIN;")
205 cur.executescript(DBSCHEMA)
206 pairs = [[wf] + list(seq)
for wf, seqlist
in seqs.items()
for seq
in seqlist]
207 cur.executemany(f
"INSERT OR IGNORE INTO workflow SELECT ?, (SELECT id FROM sequence WHERE ({SEQFIELDS}) = ({SEQPLACEHOLDER}));", pairs)
208 cur.execute(
"COMMIT;")
216 sequences = defaultdict(list)
219 stepdump = subprocess.check_output([
"runTheMatrix.py",
"-l",
str(wfnumber),
"-ne"])
221 stepdump = subprocess.check_output([
"runTheMatrix.py",
"-ne"])
223 lines = stepdump.splitlines()
225 workflowre = re.compile(b
"^([0-9]+.[0-9]+) ")
228 m = workflowre.match(line)
230 workflow = m.group(1).
decode()
234 if not b
'cmsDriver.py' in line:
continue 236 args = list(reversed(line.decode().
split(
" ")))
247 if item ==
'--scenario':
248 scenario = args.pop()
257 steps = step.split(
",")
259 s = step.split(
":")[0]
260 if s
in RELEVANTSTEPS:
263 seqs = step.split(
":")[1]
264 for seq
in seqs.split(
"+"):
265 sequences[workflow].
append(
Sequence(seq, s, era, scenario, mc, data, fast))
267 sequences[workflow].
append(
Sequence(
"", s, era, scenario, mc, data, fast))
272 tasks = [stp.map_async(
lambda seq: (seq,
inspectsequence(seq)), [seq])
for seq
in seqs]
286 if not t.successful():
287 print(
"Task failed.")
299 db = sqlite3.connect(DBFILE)
302 return (seq.step +
":" + seq.seqname +
" " + seq.era +
" " + seq.scenario
303 + (
" --mc" if seq.mc
else "") + (
" --data" if seq.data
else "")
304 + (
" --fast" if seq.fast
else ""))
309 out.append(
"<H2>Sequences</H2><ul>")
310 out.append(
"""<p> A sequence name, given as <em>STEP:@sequencename</em> here, does not uniquely identify a sequence. 311 The modules on the sequence might depend on other cmsDriver options, such as Era, Scenario, Data vs. MC, etc. 312 This tool lists parameter combinations that were observed. However, sequences with identical contents are grouped 313 on this page. The default sequence, used when no explicit sequence is apssed to cmsDriver, is noted as <em>STEP:</em>.</p>""")
314 rows = cur.execute(f
"SELECT seqname, step, count(*) FROM sequence GROUP BY seqname, step ORDER BY seqname, step;")
316 seqname, step, count = row
318 out += showseq(step, seqname)
319 out.append(f
' </li>')
322 out.append(
"<H2>Modules</H2><ul>")
323 rows = cur.execute(f
"SELECT classname, edmfamily, edmbase FROM plugin ORDER BY edmfamily, edmbase, classname")
325 classname, edmfamily, edmbase = row
326 if not edmfamily: edmfamily =
"<em>legacy</em>" 327 out.append(f
' <li>{edmfamily}::{edmbase} <a href="/plugin/{classname}/">{classname}</a></li>')
331 def showseq(step, seqname):
335 out.append(f
' <a href="/seq/{step}:{seqname}/">{step}:{seqname}</a>')
339 rows = cur.execute(f
"SELECT {SEQFIELDS}, moduleid, id FROM sequence INNER JOIN sequencemodule ON sequenceid = id WHERE seqname = ? and step = ?;", (seqname, step))
341 seqs = defaultdict(list)
348 variations = defaultdict(list)
349 for seq, mods
in seqs.items():
353 for mods, seqs
in variations.items():
355 out.append(f
' <li>({count} modules):')
358 out.append(f
'<br><a href="/seqid/{seqid}">' + formatseq(seq) +
'</a>')
360 rows = cur.execute(
"SELECT wfid FROM workflow WHERE sequenceid = ?;", (seqid,))
361 out.append(f
'<em>Used on workflows: ' +
", ".
join(wfid
for wfid,
in rows) +
"</em>")
366 def showseqid(seqid):
371 rows = cur.execute(f
"SELECT {SEQFIELDS} FROM sequence WHERE id = ?;", (seqid,))
372 seq = formatseq(
Sequence(*list(rows)[0]))
373 out.append(f
"<h2>Modules on {seq}:</h2><ul>")
374 rows = cur.execute(
"SELECT wfid FROM workflow WHERE sequenceid = ?;", (seqid,))
375 out.append(
"<p><em>Used on workflows: " +
", ".
join(wfid
for wfid,
in rows) +
"</em></p>")
376 rows = cur.execute(
""" 377 SELECT classname, instancename, variation, moduleid 378 FROM sequencemodule INNER JOIN module ON moduleid = module.id 379 WHERE sequenceid = ?;""", (seqid,))
381 classname, instancename, variation, moduleid = row
382 out.append(f
'<li>{instancename} ' + (f
'<sub>{variation}</sub>' if variation
else '') + f
' : <a href="/plugin/{classname}/">{classname}</a></li>')
387 def showclass(classname):
392 out.append(f
"<h2>Plugin {classname}</h2>")
395 rows = cur.execute(
"SELECT edmfamily, edmbase FROM plugin WHERE classname = ?;", (classname,))
396 edmfamily, edmbase = list(rows)[0]
397 islegcay =
not edmfamily
398 if islegcay: edmfamily =
"<em>legacy</em>" 399 out.append(f
"<p>{classname} is a <b>{edmfamily}::{edmbase}</b>.</p>")
400 out.append(
"""<p>A module with a given label can have different configuration depending on options such as Era, 401 Scenario, Data vs. MC etc. If multiple configurations for the same name were found, they are listed separately 402 here and denoted using subscripts.</p>""")
403 if (edmbase !=
"EDProducer" and not (islegcay
and edmbase ==
"EDAnalyzer"))
or (islegcay
and edmbase ==
"EDProducer"):
404 out.append(f
"<p>This is not a DQM module.</p>")
407 rows = cur.execute(
""" 408 SELECT module.id, instancename, variation, sequenceid, step, seqname 409 FROM module INNER JOIN sequencemodule ON moduleid = module.id INNER JOIN sequence ON sequence.id == sequenceid 410 WHERE classname = ? ORDER BY instancename, variation, step, seqname;""", (classname,))
412 seqsformod = defaultdict(list)
415 id, instancename, variation, sequenceid, step, seqname = row
416 liformod[id] = f
'<a href="/config/{id}">{instancename}' + (f
"<sub>{variation}</sub>" if variation
else '') +
"</a>" 417 seqsformod[id].
append((sequenceid, f
"{step}:{seqname}"))
418 for id, li
in liformod.items():
419 out.append(
"<li>" + li +
' Used here: ' +
", ".
join(f
'<a href="/seqid/{seqid}">{name}</a>' for seqid, name
in seqsformod[id]) +
'.</li>')
423 def showconfig(modid):
428 rows = cur.execute(f
"SELECT config FROM module WHERE id = ?;", (modid,))
429 config = list(rows)[0][0]
431 out.append(config.decode())
436 (re.compile(
'/$'), index),
437 (re.compile(
'/seq/(\w+):([@\w]*)/$'), showseq),
438 (re.compile(
'/seqid/(\d+)$'), showseqid),
439 (re.compile(
'/config/(\d+)$'), showconfig),
440 (re.compile(
'/plugin/(.*)/$'), showclass),
444 class Handler(http.server.SimpleHTTPRequestHandler):
448 for pattern, func
in ROUTES:
449 m = pattern.match(self.path)
455 self.send_response(200,
"Here you go")
456 self.send_header(
"Content-Type",
"text/html; charset=utf-8")
458 self.wfile.
write(b
"""<html><style> 463 self.wfile.
write(res)
464 self.wfile.
write(b
"</body></html>")
466 self.send_response(400,
"Something went wrong")
467 self.send_header(
"Content-Type",
"text/plain; charset=utf-8")
469 self.wfile.
write(b
"I don't understand this request.")
471 trace = traceback.format_exc()
472 self.send_response(500,
"Things went very wrong")
473 self.send_header(
"Content-Type",
"text/plain; charset=utf-8")
475 self.wfile.
write(trace.encode(
"utf8"))
477 server_address = (
'', 8000)
478 httpd = http.server.HTTPServer(server_address, Handler)
479 print(
"Serving at http://localhost:8000/ ...")
480 httpd.serve_forever()
483 if __name__ ==
"__main__":
486 parser = argparse.ArgumentParser(description=
'Collect information about DQM sequences.')
487 parser.add_argument(
"--sequence", default=
"", help=
"Name of the sequence")
488 parser.add_argument(
"--step", default=
"DQM", help=
"cmsDriver step that the sequence applies to")
489 parser.add_argument(
"--era", default=
"Run2_2018", help=
"CMSSW Era to use")
490 parser.add_argument(
"--scenario", default=
"pp", help=
"cmsDriver scenario")
491 parser.add_argument(
"--data", default=
False, action=
"store_true", help=
"Pass --data to cmsDriver.")
492 parser.add_argument(
"--mc", default=
False, action=
"store_true", help=
"Pass --mc to cmsDriver.")
493 parser.add_argument(
"--fast", default=
False, action=
"store_true", help=
"Pass --fast to cmsDriver.")
494 parser.add_argument(
"--workflow", default=
None, help=
"Ignore other options and inspect this workflow instead (implies --sqlite).")
495 parser.add_argument(
"--runTheMatrix", default=
False, action=
"store_true", help=
"Ignore other options and inspect the full matrix instea (implies --sqlite).")
496 parser.add_argument(
"--steps", default=
"ALCA,ALCAPRODUCER,ALCAHARVEST,DQM,HARVESTING,VALIDATION", help=
"Which workflow steps to inspect from runTheMatrix.")
497 parser.add_argument(
"--sqlite", default=
False, action=
"store_true", help=
"Write information to SQLite DB instead of stdout.")
498 parser.add_argument(
"--dbfile", default=
"sequences.db", help=
"Name of the DB file to use.")
499 parser.add_argument(
"--infile", default=INFILE, help=
"LFN/PFN of input file to use. Default is %s" % INFILE)
500 parser.add_argument(
"--threads", default=
None, type=int, help=
"Use a fixed number of threads (default is #cores).")
501 parser.add_argument(
"--limit", default=
None, type=int, help=
"Process only this many sequences.")
502 parser.add_argument(
"--offset", default=
None, type=int, help=
"Process sequences starting from this index. Used with --limit to divide the work into jobs.")
503 parser.add_argument(
"--showpluginlabel", default=
False, action=
"store_true", help=
"Print the module label for each plugin (default).")
504 parser.add_argument(
"--showplugintype", default=
False, action=
"store_true", help=
"Print the base class for each plugin.")
505 parser.add_argument(
"--showpluginclass", default=
False, action=
"store_true", help=
"Print the class name for each plugin.")
506 parser.add_argument(
"--showpluginconfig", default=
False, action=
"store_true", help=
"Print the config dump for each plugin.")
507 parser.add_argument(
"--serve", default=
False, action=
"store_true", help=
"Ignore other options and instead serve HTML UI from SQLite DB.")
509 args = parser.parse_args()
511 RELEVANTSTEPS += args.steps.split(
",")
515 tp = ThreadPool(args.threads)
516 stp = ThreadPool(args.threads)
521 elif args.workflow
or args.runTheMatrix:
524 seqset = set(sum(seqs.values(), []))
526 seqset = list(sorted(seqset))[args.offset:]
528 seqset = list(sorted(seqset))[:args.limit]
530 print(
"Analyzing %d seqs..." % len(seqset))
536 seq =
Sequence(args.sequence, args.step, args.era, args.scenario, args.mc, args.data, args.fast)
542 if not (args.showpluginlabel
or args.showpluginclass
or args.showplugintype
or args.showpluginconfig):
543 args.showpluginlabel =
True 544 formatsequenceinfo(modconfig, modclass, plugininfo, args.showpluginlabel, args.showpluginclass, args.showplugintype, args.showpluginconfig)
def inspectworkflows(wfnumber)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
def storesequenceinfo(seq, modconfig, modclass, plugininfo)
def split(sequence, size)
def formatsequenceinfo(modconfig, modclass, plugininfo, showlabel, showclass, showtype, showconfig)
static std::string join(char **cmd)
bool decode(bool &, std::string_view)
def getplugininfo(pluginname)