20 log_format =
'%(asctime)s: %(name)-20s - %(levelname)-8s - %(message)s' 21 logging.basicConfig(format=log_format, level=logging.INFO)
22 root_log = logging.getLogger()
35 with open(fp,
"wb")
as f:
38 self.log.info(
"Written control file: %s", fp)
42 with open(fp,
"rb")
as f:
47 return logging.getLogger(self.
name)
57 libc = ctypes.CDLL(
"libc.so.6")
59 libc.prctl(PR_SET_PDEATHSIG, signal.SIGKILL)
64 re_pattern = re.compile(
r'run([0-9]+)_ls([0-9]+)_stream([A-Za-z0-9]+)_([A-Za-z0-9_-]+)\.jsn')
72 for f
in os.listdir(self.
input):
73 r = self.re_pattern.match(f)
75 run, lumi, stream, stream_source = r.groups()
76 run, lumi =
int(run),
int(lumi)
80 elif run_found != run:
81 raise Exception(
"Files from multiple runs are not (yet) supported for as playback input.")
83 lumi_dct = self.lumi_found.setdefault(lumi, {
'streams': {} })
84 lumi_dct[
"streams"][stream] = (f, stream_source)
86 streams_found.add(stream)
89 raise Exception(
"Playback files not found.")
94 self.log.info(
"Found run %s, will map output to run %s", run_found, self.
run)
95 self.log.info(
"Found %d lumisections with %d files", len(self.
lumi_found), len(files_found))
96 self.log.info(
"Found %d streams: %s", len(streams_found),
list(streams_found))
99 self.lumi_order.sort()
106 self.
run = self.opts.run
107 self.log.info(
"Using input directory: %s", self.
input)
112 if not os.path.isdir(self.
output):
114 self.log.info(
"Using output directory: %s", self.
output)
119 f.write(
"run_key = pp_run")
130 self.log.info(
"Start copying lumi (original) %06d -> %06d (playback)", orig_lumi, play_lumi)
133 streams = lumi_dct[
"streams"]
136 return os.path.join(self.
input, f)
139 return os.path.join(self.
output, f)
141 written_files = set()
142 for stream, v
in streams.items():
143 jsn_orig_fn, stream_source = v
144 jsn_play_fn =
"run%06d_ls%04d_stream%s_%s.jsn" % (self.
run, play_lumi, stream, stream_source)
148 if stream.startswith(
"streamDQMHistograms"):
150 dat_play_fn =
"run%06d_ls%04d_stream%s_%s.%s" % (self.
run, play_lumi, stream, stream_source, ext)
153 with open(ijoin(jsn_orig_fn),
'r') as f: 154 jsn_data = json.load(f) 155 dat_orig_fn = jsn_data["data"][3]
158 if os.path.exists(ijoin(dat_orig_fn)):
159 self.log.info(
"C: %s -> %s", dat_orig_fn, dat_play_fn)
160 shutil.copyfile(ijoin(dat_orig_fn), ojoin(dat_play_fn))
162 written_files.add(dat_play_fn)
164 log.warning(
"Dat file is missing: %s", dat_orig_fn)
168 jsn_data[
"data"][3] = dat_play_fn
170 f = tempfile.NamedTemporaryFile(prefix=jsn_play_fn+
".", suffix=
".tmp", dir = self.
output, delete=
False)
172 json.dump(jsn_data, f)
175 os.rename(tmp_fp, ojoin(jsn_play_fn))
176 written_files.add(jsn_play_fn)
178 self.log.info(
"Copied %d files for lumi %06d", len(written_files), play_lumi)
180 self.lumi_backlog.append((play_lumi, written_files))
182 old_lumi, files_to_delete = self.lumi_backlog.popleft()
184 self.log.info(
"Deleting %d files for old lumi %06d", len(files_to_delete), old_lumi)
185 for f
in files_to_delete:
196 if (now - last_write) > self.opts.playback_time_lumi:
199 if self.opts.playback_nlumi > -1
and lumi_produced >= self.opts.playback_nlumi:
206 eor_fn =
"run%06d_ls0000_EoR.jsn" % (self.
run, )
207 eor_fp = os.path.join(self.
output, eor_fn)
208 with open(eor_fp,
"w"):
211 self.log.info(
"Wrote EoR: %s", eor_fp)
217 logname="/var/log/hltd/pid/hlt_run$4_pid$$.log" 218 lognamez="/var/log/hltd/pid/hlt_run$4_pid$$_gzip.log.gz" 219 #override the noclobber option by using >| operator for redirection - then keep appending to log 220 echo startDqmRun invoked $TODAY with arguments $1 $2 $3 $4 $5 $6 $7 $8 >| $logname 221 export http_proxy="http://cmsproxy.cms:3128" 222 export https_proxy="https://cmsproxy.cms:3128/" 223 export NO_PROXY=".cms" 227 source cmsset_default.sh >> $logname 231 eval `scram runtime -sh`; 234 #exec esMonitoring.py -z $lognamez cmsRun `readlink $6` runInputDir=$5 runNumber=$4 $7 $8 >> $logname 2>&1 235 exec esMonitoring.py cmsRun `readlink $6` runInputDir=$5 runNumber=$4 $7 $8 238 start_dqm_job = start_dqm_job.replace(
"/var/log/hltd/pid/",
'{log_path}/')
239 start_dqm_job = start_dqm_job.replace(
" cmsRun ",
' {cmsRun} ')
242 RunDesc = collections.namedtuple(
'Run', [
'run',
'run_fp',
'global_fp',
'global_param'])
243 RunState = collections.namedtuple(
'RunState', [
'desc',
'proc'])
248 x = re.sub(
r'(.*)\.py',
r'\1', x)
249 x = re.sub(
r'(.*)_cfg',
r'\1', x)
250 x = re.sub(
r'(.*)-live',
r'\1', x)
251 x = re.sub(
r'(.*)_sourceclient',
r'\1', x)
252 x = re.sub(
r'(.*)_dqm',
r'\1', x)
254 x =
"".
join([c
for c
in x
if c.isalnum()])
259 fp = os.path.realpath(self.
cfg_file)
261 bn = os.path.basename(fp)
262 fp = os.path.dirname(fp)
268 raise Exception(
"Could not find the cmssw release area.")
274 self.
home_path = os.path.join(self.opts.work_home,
"%s_%s" % (self.
name, hex(
id(self))))
279 self.log.info(
"logs path: %s", self.
log_path)
282 self.log.info(
"Creating: %s", self.
exec_file)
284 template = start_dqm_job
285 body = template.format(log_path=self.
log_path, cmsRun=self.opts.cmsRun)
290 cmsset_globs = [
"/afs/cern.ch/cms/cmsset_default.sh",
"/home/dqm*local/base/cmsset_default.sh"]
292 for t
in cmsset_globs:
298 if cmsset_target
is not None:
299 base = os.path.join(self.
home_path,
"base")
302 cmsset_link = os.path.join(base,
"cmsset_default.sh")
303 self.log.info(
"Linking : %s -> %s", cmsset_link, cmsset_target)
304 os.symlink(cmsset_target, cmsset_link)
306 self.log.warning(
"Couldn't find cmsset_default.sh, source it yourself!")
308 current_link = os.path.join(self.
home_path,
"current")
310 self.log.info(
"Linking : %s -> %s", current_link, target)
311 os.symlink(target, current_link)
317 self.log.error(
"Working directory (incl. control directory), have to be outside the cmssw release. Otherwise scram fails due to recursive links.")
320 output_link = os.path.join(self.
home_path,
"output")
321 output_target = os.path.realpath(self.opts.work_output)
322 target = os.path.relpath(output_target, self.
home_path)
323 self.log.info(
"Linking : %s -> %s", output_link, target)
324 os.symlink(target, output_link)
329 self.log.info(
"Linking : %s -> %s", cfg_link, target)
330 os.symlink(target, cfg_link)
340 if not os.path.isfile(self.
cfg_file):
355 args.append(
"slc6_amd64_gcc491")
357 args.append(
str(run))
360 args.append(
"runkey=pp_run")
365 re_run = re.compile(
r'run([0-9]+)')
366 re_global = re.compile(
r'\.run([0-9]+)\.global')
371 for x
in os.listdir(self.
ramdisk):
374 runs[
int(m.group(1))] = x
376 m = re_global.match(x)
378 globals[
int(m.group(1))] = x
381 run_set = set(runs.keys())
382 run_set = run_set.intersection(globals.keys())
384 if self.opts.run < 0:
385 largest =
max(run_set)
387 largest = self.opts.run
390 global_fp = os.path.join(self.
ramdisk, globals[largest])
391 with open(global_fp,
"r") as f: 392 global_param = f.read() 396 run_fp = os.path.join(self.
ramdisk, runs[largest]),
397 global_fp = global_fp,
398 global_param = global_param,
410 self.log.info(
"Executing: %s",
" ".
join(args))
411 proc = subprocess.Popen(args, preexec_fn=preexec_kill_on_pdeath)
421 if self.
current_state is None or latest != self.current_state.desc:
422 self.log.info(
"Found latest run: %s", latest)
427 self.log.info(
"Run not found, waiting 1 sec.")
429 r = self.current_state.proc.poll()
431 self.log.info(
"Process exitted: %s", r)
438 if __name__ ==
"__main__":
439 if len(sys.argv) == 2
and sys.argv[-1].endswith(
".pkl"):
444 sys.exit(ret
if ret
else 0)
447 subdirectories = [
"ramdisk",
"output",
"control",
"home",
"logs",
"dqm_monitoring"]
448 username = getpass.getuser()
454 parser.add_argument(
"--work",
"-w", type=str, help=
"Working directory (used for inputs,outputs,monitoring and logs).", default=
"/tmp/pplay." + username)
455 parser.add_argument(
"--clean",
"-c", action=
"store_true", help=
"Clean work directories (if they are not set).", default=
False)
456 parser.add_argument(
"--dry",
"-n", action=
"store_true", help=
"Do not execute, just init.", default=
False)
458 work_group = parser.add_argument_group(
'Paths',
'Path options for cmssw jobs, auto generated if not specified.')
459 for subdirectory
in subdirectories:
460 work_group.add_argument(
"--work_%s" % subdirectory, type=str, help=
"Path for %s directory." % subdirectory, default=
None)
462 playback_group = parser.add_argument_group(
'Playback',
'Playback configuration/parameters.')
463 playback_group.add_argument(
"--playback",
"-p", type=str, metavar=
"PLAYBACK_INPUT_DIR", help=
"Enable playback (emulate file delivery, otherwise set work_input).", default=
None)
464 playback_group.add_argument(
"--playback_nlumi", type=int, help=
"Number of lumis to deliver, -1 for forever.", default=-1)
465 playback_group.add_argument(
"--playback_time_lumi", type=float, help=
"Number of seconds between lumisections.", default=23.3)
467 run_group = parser.add_argument_group(
'Run',
'Run configuration/parameters.')
468 run_group.add_argument(
"--run", type=int, help=
"Run number, -1 for autodiscovery.", default=-1)
469 run_group.add_argument(
"--cmsRun", type=str, help=
"cmsRun command to run, for igprof and so on.", default=
"cmsRun")
471 parser.add_argument(
'cmssw_configs', metavar=
'cmssw_cfg.py', type=str, nargs=
'*', help=
'List of cmssw jobs (clients).')
473 args = parser.parse_args()
475 if len(args.cmssw_configs)
and args.cmssw_configs[0] ==
"--":
477 args.cmssw_configs = args.cmssw_configs[1:]
479 for subdirectory
in subdirectories:
480 if getattr(args,
"work_" + subdirectory)
is None:
481 setattr(args,
"work_" + subdirectory, os.path.join(args.work, subdirectory))
483 path = getattr(args,
"work_" + subdirectory)
484 if args.clean
and os.path.isdir(path):
485 root_log.info(
"Removing directory: %s", path)
488 path = getattr(args,
"work_" + subdirectory)
489 if not os.path.isdir(path):
492 root_log.info(
"Using directory: %s", path)
502 playback =
Playback(
"playback_emu", opts=args)
503 applets.append(playback)
505 for cfg
in args.cmssw_configs:
506 cfg_a =
FrameworkJob(
"framework_job", opts=args, cfg_file=cfg)
507 applets.append(cfg_a)
509 if len(applets) == 0:
510 sys.stderr.write(
"At least one process should be specified, use --playback and/or cmssw_configs options.\n")
514 fn =
"%s_%s.pkl" % (a.name, hex(
id(a)))
515 a.write(os.path.join(args.work_control, fn))
524 args = [os.path.realpath(__file__), fp]
525 a.control_proc = subprocess.Popen(args, preexec_fn=preexec_kill_on_pdeath)
529 a.control_proc.wait()
def preexec_kill_on_pdeath()
def __init__(self, name, opts, kwargs)
def discover_latest(self)
static std::string join(char **cmd)
def start_run(self, current)
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run