3 from __future__
import print_function
21 log_format =
'%(asctime)s: %(name)-20s - %(levelname)-8s - %(message)s' 22 logging.basicConfig(format=log_format, level=logging.INFO)
23 root_log = logging.getLogger()
36 with open(fp,
"wb")
as f:
39 self.log.info(
"Written control file: %s", fp)
43 with open(fp,
"rb")
as f:
48 return logging.getLogger(self.
name)
58 libc = ctypes.CDLL(
"libc.so.6")
60 libc.prctl(PR_SET_PDEATHSIG, signal.SIGKILL)
65 re_pattern = re.compile(
r'run([0-9]+)_ls([0-9]+)_stream([A-Za-z0-9]+)_([A-Za-z0-9_-]+)\.jsn')
73 for f
in os.listdir(self.
input):
74 r = self.re_pattern.match(f)
76 run, lumi, stream, stream_source = r.groups()
77 run, lumi =
int(run),
int(lumi)
81 elif run_found != run:
82 raise Exception(
"Files from multiple runs are not (yet) supported for as playback input.")
84 lumi_dct = self.lumi_found.setdefault(lumi, {
'streams': {} })
85 lumi_dct[
"streams"][stream] = (f, stream_source)
87 streams_found.add(stream)
90 raise Exception(
"Playback files not found.")
95 self.log.info(
"Found run %s, will map output to run %s", run_found, self.
run)
96 self.log.info(
"Found %d lumisections with %d files", len(self.
lumi_found), len(files_found))
97 self.log.info(
"Found %d streams: %s", len(streams_found),
list(streams_found))
100 self.lumi_order.sort()
107 self.
run = self.opts.run
108 self.log.info(
"Using input directory: %s", self.
input)
113 if not os.path.isdir(self.
output):
115 self.log.info(
"Using output directory: %s", self.
output)
120 f.write(
"run_key = pp_run")
131 self.log.info(
"Start copying lumi (original) %06d -> %06d (playback)", orig_lumi, play_lumi)
134 streams = lumi_dct[
"streams"]
137 return os.path.join(self.
input, f)
140 return os.path.join(self.
output, f)
142 written_files = set()
143 for stream, v
in streams.items():
144 jsn_orig_fn, stream_source = v
145 jsn_play_fn =
"run%06d_ls%04d_stream%s_%s.jsn" % (self.
run, play_lumi, stream, stream_source)
149 if stream.startswith(
"streamDQMHistograms"):
151 dat_play_fn =
"run%06d_ls%04d_stream%s_%s.%s" % (self.
run, play_lumi, stream, stream_source, ext)
154 with open(ijoin(jsn_orig_fn),
'r') as f: 155 jsn_data = json.load(f) 156 dat_orig_fn = jsn_data["data"][3]
159 if os.path.exists(ijoin(dat_orig_fn)):
160 self.log.info(
"C: %s -> %s", dat_orig_fn, dat_play_fn)
161 shutil.copyfile(ijoin(dat_orig_fn), ojoin(dat_play_fn))
163 written_files.add(dat_play_fn)
165 log.warning(
"Dat file is missing: %s", dat_orig_fn)
169 jsn_data[
"data"][3] = dat_play_fn
171 f = tempfile.NamedTemporaryFile(prefix=jsn_play_fn+
".", suffix=
".tmp", dir = self.
output, delete=
False)
173 json.dump(jsn_data, f)
176 os.rename(tmp_fp, ojoin(jsn_play_fn))
177 written_files.add(jsn_play_fn)
179 self.log.info(
"Copied %d files for lumi %06d", len(written_files), play_lumi)
181 self.lumi_backlog.append((play_lumi, written_files))
183 old_lumi, files_to_delete = self.lumi_backlog.popleft()
185 self.log.info(
"Deleting %d files for old lumi %06d", len(files_to_delete), old_lumi)
186 for f
in files_to_delete:
197 if (now - last_write) > self.opts.playback_time_lumi:
200 if self.opts.playback_nlumi > -1
and lumi_produced >= self.opts.playback_nlumi:
207 eor_fn =
"run%06d_ls0000_EoR.jsn" % (self.
run, )
208 eor_fp = os.path.join(self.
output, eor_fn)
209 with open(eor_fp,
"w"):
212 self.log.info(
"Wrote EoR: %s", eor_fp)
218 logname="/var/log/hltd/pid/hlt_run$4_pid$$.log" 219 lognamez="/var/log/hltd/pid/hlt_run$4_pid$$_gzip.log.gz" 220 #override the noclobber option by using >| operator for redirection - then keep appending to log 221 echo startDqmRun invoked $TODAY with arguments $1 $2 $3 $4 $5 $6 $7 $8 >| $logname 222 export http_proxy="http://cmsproxy.cms:3128" 223 export https_proxy="https://cmsproxy.cms:3128/" 224 export NO_PROXY=".cms" 228 source cmsset_default.sh >> $logname 232 eval `scram runtime -sh`; 235 #exec esMonitoring.py -z $lognamez cmsRun `readlink $6` runInputDir=$5 runNumber=$4 $7 $8 >> $logname 2>&1 236 exec esMonitoring.py cmsRun `readlink $6` runInputDir=$5 runNumber=$4 $7 $8 239 start_dqm_job = start_dqm_job.replace(
"/var/log/hltd/pid/",
'{log_path}/')
240 start_dqm_job = start_dqm_job.replace(
" cmsRun ",
' {cmsRun} ')
243 RunDesc = collections.namedtuple(
'Run', [
'run',
'run_fp',
'global_fp',
'global_param'])
244 RunState = collections.namedtuple(
'RunState', [
'desc',
'proc'])
249 x = re.sub(
r'(.*)\.py',
r'\1', x)
250 x = re.sub(
r'(.*)_cfg',
r'\1', x)
251 x = re.sub(
r'(.*)-live',
r'\1', x)
252 x = re.sub(
r'(.*)_sourceclient',
r'\1', x)
253 x = re.sub(
r'(.*)_dqm',
r'\1', x)
255 x =
"".
join([c
for c
in x
if c.isalnum()])
260 fp = os.path.realpath(self.
cfg_file)
262 bn = os.path.basename(fp)
263 fp = os.path.dirname(fp)
269 raise Exception(
"Could not find the cmssw release area.")
275 self.
home_path = os.path.join(self.opts.work_home,
"%s_%s" % (self.
name, hex(
id(self))))
280 self.log.info(
"logs path: %s", self.
log_path)
283 self.log.info(
"Creating: %s", self.
exec_file)
285 template = start_dqm_job
286 body = template.format(log_path=self.
log_path, cmsRun=self.opts.cmsRun)
291 cmsset_globs = [
"/afs/cern.ch/cms/cmsset_default.sh",
"/home/dqm*local/base/cmsset_default.sh"]
293 for t
in cmsset_globs:
299 if cmsset_target
is not None:
300 base = os.path.join(self.
home_path,
"base")
303 cmsset_link = os.path.join(base,
"cmsset_default.sh")
304 self.log.info(
"Linking : %s -> %s", cmsset_link, cmsset_target)
305 os.symlink(cmsset_target, cmsset_link)
307 self.log.warning(
"Couldn't find cmsset_default.sh, source it yourself!")
309 current_link = os.path.join(self.
home_path,
"current")
311 self.log.info(
"Linking : %s -> %s", current_link, target)
312 os.symlink(target, current_link)
318 self.log.error(
"Working directory (incl. control directory), have to be outside the cmssw release. Otherwise scram fails due to recursive links.")
321 output_link = os.path.join(self.
home_path,
"output")
322 output_target = os.path.realpath(self.opts.work_output)
323 target = os.path.relpath(output_target, self.
home_path)
324 self.log.info(
"Linking : %s -> %s", output_link, target)
325 os.symlink(target, output_link)
330 self.log.info(
"Linking : %s -> %s", cfg_link, target)
331 os.symlink(target, cfg_link)
341 if not os.path.isfile(self.
cfg_file):
356 args.append(
"slc6_amd64_gcc491")
358 args.append(
str(run))
361 args.append(
"runkey=pp_run")
366 re_run = re.compile(
r'run([0-9]+)')
367 re_global = re.compile(
r'\.run([0-9]+)\.global')
372 for x
in os.listdir(self.
ramdisk):
375 runs[
int(m.group(1))] = x
377 m = re_global.match(x)
379 globals[
int(m.group(1))] = x
382 run_set = set(runs.keys())
383 run_set = run_set.intersection(globals.keys())
385 if self.opts.run < 0:
386 largest =
max(run_set)
388 largest = self.opts.run
391 global_fp = os.path.join(self.
ramdisk, globals[largest])
392 with open(global_fp,
"r") as f: 393 global_param = f.read() 397 run_fp = os.path.join(self.
ramdisk, runs[largest]),
398 global_fp = global_fp,
399 global_param = global_param,
411 self.log.info(
"Executing: %s",
" ".
join(args))
412 proc = subprocess.Popen(args, preexec_fn=preexec_kill_on_pdeath)
422 if self.
current_state is None or latest != self.current_state.desc:
423 self.log.info(
"Found latest run: %s", latest)
428 self.log.info(
"Run not found, waiting 1 sec.")
430 r = self.current_state.proc.poll()
432 self.log.info(
"Process exitted: %s", r)
439 if __name__ ==
"__main__":
440 if len(sys.argv) == 2
and sys.argv[-1].endswith(
".pkl"):
445 sys.exit(ret
if ret
else 0)
448 subdirectories = [
"ramdisk",
"output",
"control",
"home",
"logs",
"dqm_monitoring"]
449 username = getpass.getuser()
451 parser = argparse.ArgumentParser(description=
"Emulate DQM@P5 environment and launch cmssw jobs.")
455 parser.add_argument(
"--work",
"-w", type=str, help=
"Working directory (used for inputs,outputs,monitoring and logs).", default=
"/tmp/pplay." + username)
456 parser.add_argument(
"--clean",
"-c", action=
"store_true", help=
"Clean work directories (if they are not set).", default=
False)
457 parser.add_argument(
"--dry",
"-n", action=
"store_true", help=
"Do not execute, just init.", default=
False)
459 work_group = parser.add_argument_group(
'Paths',
'Path options for cmssw jobs, auto generated if not specified.')
460 for subdirectory
in subdirectories:
461 work_group.add_argument(
"--work_%s" % subdirectory, type=str, help=
"Path for %s directory." % subdirectory, default=
None)
463 playback_group = parser.add_argument_group(
'Playback',
'Playback configuration/parameters.')
464 playback_group.add_argument(
"--playback",
"-p", type=str, metavar=
"PLAYBACK_INPUT_DIR", help=
"Enable playback (emulate file delivery, otherwise set work_input).", default=
None)
465 playback_group.add_argument(
"--playback_nlumi", type=int, help=
"Number of lumis to deliver, -1 for forever.", default=-1)
466 playback_group.add_argument(
"--playback_time_lumi", type=float, help=
"Number of seconds between lumisections.", default=23.3)
468 run_group = parser.add_argument_group(
'Run',
'Run configuration/parameters.')
469 run_group.add_argument(
"--run", type=int, help=
"Run number, -1 for autodiscovery.", default=-1)
470 run_group.add_argument(
"--cmsRun", type=str, help=
"cmsRun command to run, for igprof and so on.", default=
"cmsRun")
472 parser.add_argument(
'cmssw_configs', metavar=
'cmssw_cfg.py', type=str, nargs=
'*', help=
'List of cmssw jobs (clients).')
474 args = parser.parse_args()
476 if len(args.cmssw_configs)
and args.cmssw_configs[0] ==
"--":
478 args.cmssw_configs = args.cmssw_configs[1:]
480 for subdirectory
in subdirectories:
481 if getattr(args,
"work_" + subdirectory)
is None:
482 setattr(args,
"work_" + subdirectory, os.path.join(args.work, subdirectory))
484 path = getattr(args,
"work_" + subdirectory)
485 if args.clean
and os.path.isdir(path):
486 root_log.info(
"Removing directory: %s", path)
489 path = getattr(args,
"work_" + subdirectory)
490 if not os.path.isdir(path):
493 root_log.info(
"Using directory: %s", path)
504 applets.append(playback)
506 for cfg
in args.cmssw_configs:
508 applets.append(cfg_a)
510 if len(applets) == 0:
511 sys.stderr.write(
"At least one process should be specified, use --playback and/or cmssw_configs options.\n")
515 fn =
"%s_%s.pkl" % (a.name, hex(
id(a)))
516 a.write(os.path.join(args.work_control, fn))
525 args = [os.path.realpath(__file__), fp]
526 a.control_proc = subprocess.Popen(args, preexec_fn=preexec_kill_on_pdeath)
530 a.control_proc.wait()
def preexec_kill_on_pdeath()
S & print(S &os, JobReport::InputFile const &f)
def __init__(self, name, opts, kwargs)
def discover_latest(self)
static std::string join(char **cmd)
def start_run(self, current)
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run