10 from collections
import namedtuple
11 from collections
import defaultdict
12 from multiprocessing.pool
import ThreadPool
14 Sequence = namedtuple(
"Sequence", [
"seqname",
"step",
"era",
"scenario",
"mc",
"data",
"fast"])
25 INFILE =
"/store/data/Run2018A/EGamma/RAW/v1/000/315/489/00000/004D960A-EA4C-E811-A908-FA163ED1F481.root"
28 BLACKLIST=
'^(TriggerResults|.*_step|DQMoutput|siPixelDigis)$'
33 @functools.lru_cache(maxsize=
None)
39 if seq.step
not in (
"HARVESTING",
"ALCAHARVEST"):
42 otherstep =
"RAW2DIGI:siPixelDigis,"
44 wd = tempfile.mkdtemp()
47 with open(wd +
"/gdb",
"w"):
49 os.chmod(wd +
"/gdb", 0o700)
50 env = os.environ.copy()
51 env[
"PATH"] = wd +
":" + env[
"PATH"]
57 "--conditions",
"auto:run2_data",
58 "-s", otherstep+seq.step+sep+seq.seqname,
60 "--mc" if seq.mc
else "",
"--data" if seq.data
else "",
"--fast" if seq.fast
else "",
61 "--era" if seq.era
else "", seq.era,
62 "--eventcontent",
"DQM",
"--scenario" if seq.scenario
else "", seq.scenario,
63 "--datatier",
"DQMIO",
64 "--customise_commands",
'process.Tracer = cms.Service("Tracer")',
65 "--filein", INFILE,
"-n",
"0",
66 "--python_filename",
"cmssw_cfg.py",
"--no_exec"
69 driverargs = [x
for x
in driverargs
if x]
70 subprocess.check_call(driverargs, cwd=wd, stdout=2)
73 proc = subprocess.Popen([
"cmsRun",
"cmssw_cfg.py"], stderr=subprocess.STDOUT, stdout=subprocess.PIPE, cwd=wd, env=env)
74 tracedump, _ = proc.communicate()
77 if proc.returncode
and seq.step
not in (
"HARVESTING",
"ALCAHARVEST"):
78 raise Exception(
"cmsRun failed for cmsDriver command %s" % driverargs)
80 lines = tracedump.splitlines()
81 labelre = re.compile(b
"[+]+ starting: constructing module with label '(\w+)'")
82 blacklistre = re.compile(BLACKLIST)
85 m = labelre.match(line)
87 label = m.group(1).
decode()
88 if blacklistre.match(label):
92 modules = set(modules)
95 configdump = subprocess.check_output([
"edmConfigDump",
"cmssw_cfg.py"], cwd=wd)
96 lines = configdump.splitlines()
97 modulere = re.compile(b
'process[.](.*) = cms.ED.*\("(.*)",')
105 modconfig[inconfig] += b
'\n' + line
110 m = modulere.match(line)
112 label = m.group(1).
decode()
113 plugin = m.group(2).
decode()
115 modclass[label] = plugin
116 modconfig[label] = line
120 plugininfo = tp.map(getplugininfo, modclass.values())
125 return modconfig, modclass, dict(plugininfo)
129 @functools.lru_cache(maxsize=
None)
131 plugindump = subprocess.check_output([
"edmPluginHelp",
"-p", pluginname])
132 line = plugindump.splitlines()[0].
decode()
134 pluginre = re.compile(
".* " + pluginname +
".*[(]((\w+)::)?(\w+)[)]")
135 m = pluginre.match(line)
138 return (pluginname, (
"",
""))
140 return (pluginname, (m.group(2), m.group(3)))
145 for label
in modclass.keys():
150 row.append(modclass[label])
152 row.append(
"::".
join(plugininfo[modclass[label]]))
154 row.append(modconfig[label].
decode())
155 out.append(tuple(row))
156 for row
in sorted(set(out)):
161 SEQFIELDS =
",".
join(Sequence._fields)
162 SEQPLACEHOLDER =
",".
join([
"?" for f
in Sequence._fields])
164 CREATE TABLE IF NOT EXISTS plugin(classname, edmfamily, edmbase);
165 CREATE UNIQUE INDEX IF NOT EXISTS plugins ON plugin(classname);
166 CREATE TABLE IF NOT EXISTS module(id INTEGER PRIMARY KEY, classname, instancename, variation, config);
167 CREATE UNIQUE INDEX IF NOT EXISTS modules ON module(instancename, variation);
168 CREATE UNIQUE INDEX IF NOT EXISTS configs ON module(config);
169 CREATE TABLE IF NOT EXISTS sequence(id INTEGER PRIMARY KEY, {SEQFIELDS});
170 CREATE UNIQUE INDEX IF NOT EXISTS squences ON sequence({SEQFIELDS});
171 CREATE TABLE IF NOT EXISTS workflow(wfid, sequenceid);
172 CREATE UNIQUE INDEX IF NOT EXISTS wrokflows ON workflow(sequenceid, wfid);
173 CREATE TABLE IF NOT EXISTS sequencemodule(moduleid, sequenceid);
177 with sqlite3.connect(DBFILE)
as db:
179 cur.executescript(DBSCHEMA)
181 seqid = list(cur.execute(f
"SELECT id FROM sequence WHERE ({SEQFIELDS}) = ({SEQPLACEHOLDER});", (seq)))
185 cur.execute(
"BEGIN;")
187 cur.execute(
"CREATE TEMP TABLE newmodules(instancename, classname, config);")
188 cur.executemany(
"INSERT INTO newmodules VALUES (?, ?, ?)", ((label, modclass[label], modconfig[label])
for label
in modconfig))
191 INSERT OR IGNORE INTO module(classname, instancename, variation, config)
192 SELECT classname, instancename,
193 (SELECT count(*) FROM module AS existing WHERE existing.instancename = newmodules.instancename),
194 config FROM newmodules;
198 cur.executemany(
"INSERT OR IGNORE INTO plugin VALUES (?, ?, ?);", ((plugin, edm[0], edm[1])
for plugin, edm
in plugininfo.items()))
200 cur.execute(f
"INSERT OR FAIL INTO sequence({SEQFIELDS}) VALUES({SEQPLACEHOLDER});", (seq))
201 seqid = list(cur.execute(f
"SELECT id FROM sequence WHERE ({SEQFIELDS}) = ({SEQPLACEHOLDER});", (seq)))
203 cur.executemany(
"INSERT INTO sequencemodule SELECT id, ? FROM module WHERE config = ?;", ((seqid, modconfig[label])
for label
in modconfig))
204 cur.execute(
"COMMIT;")
207 with sqlite3.connect(DBFILE)
as db:
209 cur.execute(
"BEGIN;")
210 cur.executescript(DBSCHEMA)
211 pairs = [[wf] + list(seq)
for wf, seqlist
in seqs.items()
for seq
in seqlist]
212 cur.executemany(f
"INSERT OR IGNORE INTO workflow SELECT ?, (SELECT id FROM sequence WHERE ({SEQFIELDS}) = ({SEQPLACEHOLDER}));", pairs)
213 cur.execute(
"COMMIT;")
221 sequences = defaultdict(list)
224 stepdump = subprocess.check_output([
"runTheMatrix.py",
"-l",
str(wfnumber),
"-ne"])
226 stepdump = subprocess.check_output([
"runTheMatrix.py",
"-ne"])
228 lines = stepdump.splitlines()
230 workflowre = re.compile(b
"^([0-9]+.[0-9]+) ")
233 m = workflowre.match(line)
235 workflow = m.group(1).
decode()
239 if not b
'cmsDriver.py' in line:
continue
241 args = list(reversed(line.decode().
split(
" ")))
252 if item ==
'--scenario':
253 scenario = args.pop()
262 steps = step.split(
",")
264 s = step.split(
":")[0]
265 if s
in RELEVANTSTEPS:
268 seqs = step.split(
":")[1]
269 for seq
in seqs.split(
"+"):
270 sequences[workflow].
append(
Sequence(seq, s, era, scenario, mc, data, fast))
272 sequences[workflow].
append(
Sequence(
"", s, era, scenario, mc, data, fast))
277 tasks = [stp.map_async(
lambda seq: (seq,
inspectsequence(seq)), [seq])
for seq
in seqs]
291 if not t.successful():
292 print(
"Task failed.")
304 db = sqlite3.connect(DBFILE)
307 return (seq.step +
":" + seq.seqname +
" " + seq.era +
" " + seq.scenario
308 + (
" --mc" if seq.mc
else "") + (
" --data" if seq.data
else "")
309 + (
" --fast" if seq.fast
else ""))
314 out.append(
"<H2>Sequences</H2><ul>")
315 out.append(
"""<p> A sequence name, given as <em>STEP:@sequencename</em> here, does not uniquely identify a sequence.
316 The modules on the sequence might depend on other cmsDriver options, such as Era, Scenario, Data vs. MC, etc.
317 This tool lists parameter combinations that were observed. However, sequences with identical contents are grouped
318 on this page. The default sequence, used when no explicit sequence is apssed to cmsDriver, is noted as <em>STEP:</em>.</p>""")
319 rows = cur.execute(f
"SELECT seqname, step, count(*) FROM sequence GROUP BY seqname, step ORDER BY seqname, step;")
321 seqname, step, count = row
323 out += showseq(step, seqname)
324 out.append(f
' </li>')
327 out.append(
"<H2>Modules</H2><ul>")
328 rows = cur.execute(f
"SELECT classname, edmfamily, edmbase FROM plugin ORDER BY edmfamily, edmbase, classname")
330 classname, edmfamily, edmbase = row
331 if not edmfamily: edmfamily =
"<em>legacy</em>"
332 out.append(f
' <li>{edmfamily}::{edmbase} <a href="/plugin/{classname}/">{classname}</a></li>')
336 def showseq(step, seqname):
340 out.append(f
' <a href="/seq/{step}:{seqname}/">{step}:{seqname}</a>')
344 rows = cur.execute(f
"SELECT {SEQFIELDS}, moduleid, id FROM sequence INNER JOIN sequencemodule ON sequenceid = id WHERE seqname = ? and step = ?;", (seqname, step))
346 seqs = defaultdict(list)
353 variations = defaultdict(list)
354 for seq, mods
in seqs.items():
355 variations[tuple(sorted(mods))].
append(seq)
358 for mods, seqs
in variations.items():
360 out.append(f
' <li>({count} modules):')
363 out.append(f
'<br><a href="/seqid/{seqid}">' + formatseq(seq) +
'</a>')
365 rows = cur.execute(
"SELECT wfid FROM workflow WHERE sequenceid = ?;", (seqid,))
366 out.append(f
'<em>Used on workflows: ' +
", ".
join(wfid
for wfid,
in rows) +
"</em>")
371 def showseqid(seqid):
376 rows = cur.execute(f
"SELECT {SEQFIELDS} FROM sequence WHERE id = ?;", (seqid,))
377 seq = formatseq(
Sequence(*list(rows)[0]))
378 out.append(f
"<h2>Modules on {seq}:</h2><ul>")
379 rows = cur.execute(
"SELECT wfid FROM workflow WHERE sequenceid = ?;", (seqid,))
380 out.append(
"<p><em>Used on workflows: " +
", ".
join(wfid
for wfid,
in rows) +
"</em></p>")
381 rows = cur.execute(
"""
382 SELECT classname, instancename, variation, moduleid
383 FROM sequencemodule INNER JOIN module ON moduleid = module.id
384 WHERE sequenceid = ?;""", (seqid,))
386 classname, instancename, variation, moduleid = row
387 out.append(f
'<li>{instancename} ' + (f
'<sub>{variation}</sub>' if variation
else '') + f
' : <a href="/plugin/{classname}/">{classname}</a></li>')
392 def showclass(classname):
397 out.append(f
"<h2>Plugin {classname}</h2>")
400 rows = cur.execute(
"SELECT edmfamily, edmbase FROM plugin WHERE classname = ?;", (classname,))
401 edmfamily, edmbase = list(rows)[0]
402 islegcay =
not edmfamily
403 if islegcay: edmfamily =
"<em>legacy</em>"
404 out.append(f
"<p>{classname} is a <b>{edmfamily}::{edmbase}</b>.</p>")
405 out.append(
"""<p>A module with a given label can have different configuration depending on options such as Era,
406 Scenario, Data vs. MC etc. If multiple configurations for the same name were found, they are listed separately
407 here and denoted using subscripts.</p>""")
408 if (edmbase !=
"EDProducer" and not (islegcay
and edmbase ==
"EDAnalyzer"))
or (islegcay
and edmbase ==
"EDProducer"):
409 out.append(f
"<p>This is not a DQM module.</p>")
412 rows = cur.execute(
"""
413 SELECT module.id, instancename, variation, sequenceid, step, seqname
414 FROM module INNER JOIN sequencemodule ON moduleid = module.id INNER JOIN sequence ON sequence.id == sequenceid
415 WHERE classname = ? ORDER BY instancename, variation, step, seqname;""", (classname,))
417 seqsformod = defaultdict(list)
420 id, instancename, variation, sequenceid, step, seqname = row
421 liformod[id] = f
'<a href="/config/{id}">{instancename}' + (f
"<sub>{variation}</sub>" if variation
else '') +
"</a>"
422 seqsformod[id].
append((sequenceid, f
"{step}:{seqname}"))
423 for id, li
in liformod.items():
424 out.append(
"<li>" + li +
' Used here: ' +
", ".
join(f
'<a href="/seqid/{seqid}">{name}</a>' for seqid, name
in seqsformod[id]) +
'.</li>')
428 def showconfig(modid):
433 rows = cur.execute(f
"SELECT config FROM module WHERE id = ?;", (modid,))
434 config = list(rows)[0][0]
436 out.append(config.decode())
441 (re.compile(
'/$'), index),
442 (re.compile(
'/seq/(\w+):([@\w]*)/$'), showseq),
443 (re.compile(
'/seqid/(\d+)$'), showseqid),
444 (re.compile(
'/config/(\d+)$'), showconfig),
445 (re.compile(
'/plugin/(.*)/$'), showclass),
449 class Handler(http.server.SimpleHTTPRequestHandler):
453 for pattern, func
in ROUTES:
454 m = pattern.match(self.path)
460 self.send_response(200,
"Here you go")
461 self.send_header(
"Content-Type",
"text/html; charset=utf-8")
463 self.wfile.
write(b
"""<html><style>
468 self.wfile.
write(res)
469 self.wfile.
write(b
"</body></html>")
471 self.send_response(400,
"Something went wrong")
472 self.send_header(
"Content-Type",
"text/plain; charset=utf-8")
474 self.wfile.
write(b
"I don't understand this request.")
476 trace = traceback.format_exc()
477 self.send_response(500,
"Things went very wrong")
478 self.send_header(
"Content-Type",
"text/plain; charset=utf-8")
480 self.wfile.
write(trace.encode(
"utf8"))
482 server_address = (
'', 8000)
483 httpd = http.server.HTTPServer(server_address, Handler)
484 print(
"Serving at http://localhost:8000/ ...")
485 httpd.serve_forever()
488 if __name__ ==
"__main__":
491 parser = argparse.ArgumentParser(description=
'Collect information about DQM sequences.')
492 parser.add_argument(
"--sequence", default=
"", help=
"Name of the sequence")
493 parser.add_argument(
"--step", default=
"DQM", help=
"cmsDriver step that the sequence applies to")
494 parser.add_argument(
"--era", default=
"Run2_2018", help=
"CMSSW Era to use")
495 parser.add_argument(
"--scenario", default=
"pp", help=
"cmsDriver scenario")
496 parser.add_argument(
"--data", default=
False, action=
"store_true", help=
"Pass --data to cmsDriver.")
497 parser.add_argument(
"--mc", default=
False, action=
"store_true", help=
"Pass --mc to cmsDriver.")
498 parser.add_argument(
"--fast", default=
False, action=
"store_true", help=
"Pass --fast to cmsDriver.")
499 parser.add_argument(
"--workflow", default=
None, help=
"Ignore other options and inspect this workflow instead (implies --sqlite).")
500 parser.add_argument(
"--runTheMatrix", default=
False, action=
"store_true", help=
"Ignore other options and inspect the full matrix instea (implies --sqlite).")
501 parser.add_argument(
"--steps", default=
"ALCA,ALCAPRODUCER,ALCAHARVEST,DQM,HARVESTING,VALIDATION", help=
"Which workflow steps to inspect from runTheMatrix.")
502 parser.add_argument(
"--sqlite", default=
False, action=
"store_true", help=
"Write information to SQLite DB instead of stdout.")
503 parser.add_argument(
"--dbfile", default=
"sequences.db", help=
"Name of the DB file to use.")
504 parser.add_argument(
"--threads", default=
None, type=int, help=
"Use a fixed number of threads (default is #cores).")
505 parser.add_argument(
"--limit", default=
None, type=int, help=
"Process only this many sequences.")
506 parser.add_argument(
"--offset", default=
None, type=int, help=
"Process sequences starting from this index. Used with --limit to divide the work into jobs.")
507 parser.add_argument(
"--showpluginlabel", default=
False, action=
"store_true", help=
"Print the module label for each plugin (default).")
508 parser.add_argument(
"--showplugintype", default=
False, action=
"store_true", help=
"Print the base class for each plugin.")
509 parser.add_argument(
"--showpluginclass", default=
False, action=
"store_true", help=
"Print the class name for each plugin.")
510 parser.add_argument(
"--showpluginconfig", default=
False, action=
"store_true", help=
"Print the config dump for each plugin.")
511 parser.add_argument(
"--serve", default=
False, action=
"store_true", help=
"Ignore other options and instead serve HTML UI from SQLite DB.")
513 args = parser.parse_args()
515 RELEVANTSTEPS += args.steps.split(
",")
519 tp = ThreadPool(args.threads)
520 stp = ThreadPool(args.threads)
524 elif args.workflow
or args.runTheMatrix:
527 seqset = set(sum(seqs.values(), []))
529 seqset = list(sorted(seqset))[args.offset:]
531 seqset = list(sorted(seqset))[:args.limit]
533 print(
"Analyzing %d seqs..." % len(seqset))
539 seq =
Sequence(args.sequence, args.step, args.era, args.scenario, args.mc, args.data, args.fast)
545 if not (args.showpluginlabel
or args.showpluginclass
or args.showplugintype
or args.showpluginconfig):
546 args.showpluginlabel =
True
547 formatsequenceinfo(modconfig, modclass, plugininfo, args.showpluginlabel, args.showpluginclass, args.showplugintype, args.showpluginconfig)