3 from __future__
import print_function
21 log_format =
'%(asctime)s: %(name)-20s - %(levelname)-8s - %(message)s' 22 logging.basicConfig(format=log_format, level=logging.INFO)
23 root_log = logging.getLogger()
36 with open(fp,
"wb")
as f:
39 self.
log.
info(
"Written control file: %s", fp)
43 with open(fp,
"rb")
as f:
48 return logging.getLogger(self.
name)
58 libc = ctypes.CDLL(
"libc.so.6")
60 libc.prctl(PR_SET_PDEATHSIG, signal.SIGKILL)
65 re_pattern = re.compile(
r'run([0-9]+)_ls([0-9]+)_stream([A-Za-z0-9]+)_([A-Za-z0-9_-]+)\.jsn')
73 for f
in os.listdir(self.
input):
76 run, lumi, stream, stream_source = r.groups()
77 run, lumi =
int(run),
int(lumi)
81 elif run_found != run:
82 raise Exception(
"Files from multiple runs are not (yet) supported for as playback input.")
84 lumi_dct = self.
lumi_found.setdefault(lumi, {
'streams': {} })
85 lumi_dct[
"streams"][stream] = (f, stream_source)
87 streams_found.add(stream)
90 raise Exception(
"Playback files not found.")
95 self.
log.
info(
"Found run %s, will map output to run %s", run_found, self.
run)
96 self.
log.
info(
"Found %d lumisections with %d files", len(self.
lumi_found), len(files_found))
97 self.
log.
info(
"Found %d streams: %s", len(streams_found), list(streams_found))
113 if not os.path.isdir(self.
output):
120 f.write(
"run_key = pp_run")
131 self.
log.
info(
"Start copying lumi (original) %06d -> %06d (playback)", orig_lumi, play_lumi)
134 streams = lumi_dct[
"streams"]
137 return os.path.join(self.
input, f)
140 return os.path.join(self.
output, f)
142 written_files = set()
143 for stream, v
in streams.items():
144 jsn_orig_fn, stream_source = v
145 jsn_play_fn =
"run%06d_ls%04d_stream%s_%s.jsn" % (self.
run, play_lumi, stream, stream_source)
149 if stream.startswith(
"streamDQMHistograms"):
151 dat_play_fn =
"run%06d_ls%04d_stream%s_%s.%s" % (self.
run, play_lumi, stream, stream_source, ext)
154 with open(ijoin(jsn_orig_fn),
'r') as f: 155 jsn_data = json.load(f) 156 dat_orig_fn = jsn_data["data"][3]
159 if os.path.exists(ijoin(dat_orig_fn)):
160 self.
log.
info(
"C: %s -> %s", dat_orig_fn, dat_play_fn)
161 shutil.copyfile(ijoin(dat_orig_fn), ojoin(dat_play_fn))
163 written_files.add(dat_play_fn)
165 log.warning(
"Dat file is missing: %s", dat_orig_fn)
169 jsn_data[
"data"][3] = dat_play_fn
171 f = tempfile.NamedTemporaryFile(prefix=jsn_play_fn+
".", suffix=
".tmp", dir = self.
output, delete=
False)
173 json.dump(jsn_data, f)
176 os.rename(tmp_fp, ojoin(jsn_play_fn))
177 written_files.add(jsn_play_fn)
179 self.
log.
info(
"Copied %d files for lumi %06d", len(written_files), play_lumi)
185 self.
log.
info(
"Deleting %d files for old lumi %06d", len(files_to_delete), old_lumi)
186 for f
in files_to_delete:
197 if (now - last_write) > self.
opts.playback_time_lumi:
200 if self.
opts.playback_nlumi > -1
and lumi_produced >= self.
opts.playback_nlumi:
207 eor_fn =
"run%06d_ls0000_EoR.jsn" % (self.
run, )
208 eor_fp = os.path.join(self.
output, eor_fn)
209 with open(eor_fp,
"w"):
212 self.
log.
info(
"Wrote EoR: %s", eor_fp)
218 logname="/var/log/hltd/pid/hlt_run$4_pid$$.log" 219 lognamez="/var/log/hltd/pid/hlt_run$4_pid$$_gzip.log.gz" 220 #override the noclobber option by using >| operator for redirection - then keep appending to log 221 echo startDqmRun invoked $TODAY with arguments $1 $2 $3 $4 $5 $6 $7 $8 >| $logname 222 export http_proxy="http://cmsproxy.cms:3128" 223 export https_proxy="https://cmsproxy.cms:3128/" 224 export NO_PROXY=".cms" 228 source cmsset_default.sh >> $logname 232 eval `scram runtime -sh`; 235 #exec esMonitoring.py -z $lognamez cmsRun `readlink $6` runInputDir=$5 runNumber=$4 $7 $8 >> $logname 2>&1 236 exec esMonitoring.py cmsRun `readlink $6` runInputDir=$5 runNumber=$4 $7 $8 239 start_dqm_job = start_dqm_job.replace(
"/var/log/hltd/pid/",
'{log_path}/')
240 start_dqm_job = start_dqm_job.replace(
" cmsRun ",
' {cmsRun} ')
243 RunDesc = collections.namedtuple(
'Run', [
'run',
'run_fp',
'global_fp',
'global_param'])
244 RunState = collections.namedtuple(
'RunState', [
'desc',
'proc'])
249 x = re.sub(
r'(.*)\.py',
r'\1', x)
250 x = re.sub(
r'(.*)_cfg',
r'\1', x)
251 x = re.sub(
r'(.*)-live',
r'\1', x)
252 x = re.sub(
r'(.*)_sourceclient',
r'\1', x)
253 x = re.sub(
r'(.*)_dqm',
r'\1', x)
255 x =
"".
join([c
for c
in x
if c.isalnum()])
260 fp = os.path.realpath(self.
cfg_file)
262 bn = os.path.basename(fp)
263 fp = os.path.dirname(fp)
269 raise Exception(
"Could not find the cmssw release area.")
285 template = start_dqm_job
286 body = template.format(log_path=self.
log_path, cmsRun=self.
opts.cmsRun)
291 cmsset_globs = [
"/afs/cern.ch/cms/cmsset_default.sh",
"/home/dqm*local/base/cmsset_default.sh"]
293 for t
in cmsset_globs:
299 if cmsset_target
is not None:
300 base = os.path.join(self.
home_path,
"base")
303 cmsset_link = os.path.join(base,
"cmsset_default.sh")
304 self.
log.
info(
"Linking : %s -> %s", cmsset_link, cmsset_target)
305 os.symlink(cmsset_target, cmsset_link)
307 self.
log.
warning(
"Couldn't find cmsset_default.sh, source it yourself!")
309 current_link = os.path.join(self.
home_path,
"current")
311 self.
log.
info(
"Linking : %s -> %s", current_link, target)
312 os.symlink(target, current_link)
318 self.
log.
error(
"Working directory (incl. control directory), have to be outside the cmssw release. Otherwise scram fails due to recursive links.")
321 output_link = os.path.join(self.
home_path,
"output")
322 output_target = os.path.realpath(self.
opts.work_output)
323 target = os.path.relpath(output_target, self.
home_path)
324 self.
log.
info(
"Linking : %s -> %s", output_link, target)
325 os.symlink(target, output_link)
330 self.
log.
info(
"Linking : %s -> %s", cfg_link, target)
331 os.symlink(target, cfg_link)
341 if not os.path.isfile(self.
cfg_file):
356 args.append(
"slc6_amd64_gcc491")
358 args.append(
str(run))
361 args.append(
"runkey=pp_run")
366 re_run = re.compile(
r'run([0-9]+)')
367 re_global = re.compile(
r'\.run([0-9]+)\.global')
372 for x
in os.listdir(self.
ramdisk):
375 runs[
int(m.group(1))] = x
377 m = re_global.match(x)
379 globals[
int(m.group(1))] = x
382 run_set = set(runs.keys())
383 run_set = run_set.intersection(globals.keys())
385 if self.
opts.run < 0:
386 largest =
max(run_set)
388 largest = self.
opts.run
391 global_fp = os.path.join(self.
ramdisk, globals[largest])
392 with open(global_fp,
"r") as f: 393 global_param = f.read() 397 run_fp = os.path.join(self.
ramdisk, runs[largest]),
398 global_fp = global_fp,
399 global_param = global_param,
412 proc = subprocess.Popen(args, preexec_fn=preexec_kill_on_pdeath)
423 self.
log.
info(
"Found latest run: %s", latest)
428 self.
log.
info(
"Run not found, waiting 1 sec.")
432 self.
log.
info(
"Process exitted: %s", r)
439 if __name__ ==
"__main__":
440 if len(sys.argv) == 2
and sys.argv[-1].endswith(
".pkl"):
445 sys.exit(ret
if ret
else 0)
448 subdirectories = [
"ramdisk",
"output",
"control",
"home",
"logs",
"dqm_monitoring"]
449 username = getpass.getuser()
451 parser = argparse.ArgumentParser(description=
"Emulate DQM@P5 environment and launch cmssw jobs.")
455 parser.add_argument(
"--work",
"-w", type=str, help=
"Working directory (used for inputs,outputs,monitoring and logs).", default=
"/tmp/pplay." + username)
456 parser.add_argument(
"--clean",
"-c", action=
"store_true", help=
"Clean work directories (if they are not set).", default=
False)
457 parser.add_argument(
"--dry",
"-n", action=
"store_true", help=
"Do not execute, just init.", default=
False)
459 work_group = parser.add_argument_group(
'Paths',
'Path options for cmssw jobs, auto generated if not specified.')
460 for subdirectory
in subdirectories:
461 work_group.add_argument(
"--work_%s" % subdirectory, type=str, help=
"Path for %s directory." % subdirectory, default=
None)
463 playback_group = parser.add_argument_group(
'Playback',
'Playback configuration/parameters.')
464 playback_group.add_argument(
"--playback",
"-p", type=str, metavar=
"PLAYBACK_INPUT_DIR", help=
"Enable playback (emulate file delivery, otherwise set work_input).", default=
None)
465 playback_group.add_argument(
"--playback_nlumi", type=int, help=
"Number of lumis to deliver, -1 for forever.", default=-1)
466 playback_group.add_argument(
"--playback_time_lumi", type=float, help=
"Number of seconds between lumisections.", default=23.3)
468 run_group = parser.add_argument_group(
'Run',
'Run configuration/parameters.')
469 run_group.add_argument(
"--run", type=int, help=
"Run number, -1 for autodiscovery.", default=-1)
470 run_group.add_argument(
"--cmsRun", type=str, help=
"cmsRun command to run, for igprof and so on.", default=
"cmsRun")
472 parser.add_argument(
'cmssw_configs', metavar=
'cmssw_cfg.py', type=str, nargs=
'*', help=
'List of cmssw jobs (clients).')
474 args = parser.parse_args()
476 if len(args.cmssw_configs)
and args.cmssw_configs[0] ==
"--":
478 args.cmssw_configs = args.cmssw_configs[1:]
480 for subdirectory
in subdirectories:
481 if getattr(args,
"work_" + subdirectory)
is None:
482 setattr(args,
"work_" + subdirectory, os.path.join(args.work, subdirectory))
484 path = getattr(args,
"work_" + subdirectory)
485 if args.clean
and os.path.isdir(path):
486 root_log.info(
"Removing directory: %s", path)
489 path = getattr(args,
"work_" + subdirectory)
490 if not os.path.isdir(path):
493 root_log.info(
"Using directory: %s", path)
504 applets.append(playback)
506 for cfg
in args.cmssw_configs:
508 applets.append(cfg_a)
510 if len(applets) == 0:
511 sys.stderr.write(
"At least one process should be specified, use --playback and/or cmssw_configs options.\n")
515 fn =
"%s_%s.pkl" % (a.name, hex(
id(a)))
516 a.write(os.path.join(args.work_control, fn))
525 args = [os.path.realpath(__file__), fp]
526 a.control_proc = subprocess.Popen(args, preexec_fn=preexec_kill_on_pdeath)
530 a.control_proc.wait()
def preexec_kill_on_pdeath()
def __init__(self, name, opts, kwargs)
def discover_latest(self)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
static std::string join(char **cmd)
std::pair< typename Association::data_type::first_type, double > match(Reference key, Association association, bool bestMatchByMaxValue)
Generic matching function.
def start_run(self, current)