CMS 3D CMS Logo

personalPlayback.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 
3 from __future__ import print_function
4 import os
5 import sys
6 import argparse
7 import asyncore
8 import pickle
9 import logging
10 import subprocess
11 import shutil
12 import re
13 import collections
14 import json
15 import tempfile
16 import signal
17 import time
18 import glob
19 
20 # Utilities
21 log_format = '%(asctime)s: %(name)-20s - %(levelname)-8s - %(message)s'
22 logging.basicConfig(format=log_format, level=logging.INFO)
23 root_log = logging.getLogger()
24 
25 class Applet(object):
26  def __init__(self, name, opts, **kwargs):
27  self.name = name
28  self.opts = opts
29  self.kwargs = kwargs
30 
31  self.do_init()
32 
33  def write(self, fp):
34  self.control_fp = fp
35 
36  with open(fp, "wb") as f:
37  pickle.dump(self, f)
38 
39  self.log.info("Written control file: %s", fp)
40 
41  @staticmethod
42  def read(fp):
43  with open(fp, "rb") as f:
44  return pickle.load(f)
45 
46  @property
47  def log(self):
48  return logging.getLogger(self.name)
49 
50  def do_init(self):
51  pass
52 
53  def do_exec(self):
54  pass
55 
57  import ctypes
58  libc = ctypes.CDLL("libc.so.6")
59  PR_SET_PDEATHSIG = 1
60  libc.prctl(PR_SET_PDEATHSIG, signal.SIGKILL)
61 
62 # Actual implementation of the workers
63 
65  re_pattern = re.compile(r'run([0-9]+)_ls([0-9]+)_stream([A-Za-z0-9]+)_([A-Za-z0-9_-]+)\.jsn')
66 
67  def discover_files(self):
68  self.lumi_found = {}
69 
70  files_found = set()
71  streams_found = set()
72  run_found = None
73  for f in os.listdir(self.input):
74  r = self.re_pattern.match(f)
75  if r:
76  run, lumi, stream, stream_source = r.groups()
77  run, lumi = int(run), int(lumi)
78 
79  if run_found is None:
80  run_found = run
81  elif run_found != run:
82  raise Exception("Files from multiple runs are not (yet) supported for as playback input.")
83 
84  lumi_dct = self.lumi_found.setdefault(lumi, { 'streams': {} })
85  lumi_dct["streams"][stream] = (f, stream_source)
86  files_found.add(f)
87  streams_found.add(stream)
88 
89  if run_found is None:
90  raise Exception("Playback files not found.")
91 
92  if self.run < 0:
93  self.run = run_found
94 
95  self.log.info("Found run %s, will map output to run %s", run_found, self.run)
96  self.log.info("Found %d lumisections with %d files", len(self.lumi_found), len(files_found))
97  self.log.info("Found %d streams: %s", len(streams_found), list(streams_found))
98 
99  self.lumi_order = list(self.lumi_found.keys())
100  self.lumi_order.sort()
101  self.log.info("Lumi order: %s", str(self.lumi_order))
102 
103  def do_init(self):
104  # check if our input directory is okay
105  self.input = self.opts.playback
106  self.ramdisk = self.opts.work_ramdisk
107  self.run = self.opts.run
108  self.log.info("Using input directory: %s", self.input)
109 
110  self.discover_files()
111 
112  self.output = os.path.join(self.ramdisk, "run%06d" % self.run)
113  if not os.path.isdir(self.output):
114  os.makedirs(self.output)
115  self.log.info("Using output directory: %s", self.output)
116 
117  self.global_file = os.path.join(self.ramdisk, ".run%06d.global" % self.run)
118  self.log.info("Writing: %s", self.global_file)
119  with open(self.global_file, "w") as f:
120  f.write("run_key = pp_run")
121 
122  self.lumi_backlog = collections.deque()
125 
126  def do_create_lumi(self):
127  orig_lumi = self.lumi_order[(self.next_lumi_index - 1) % len(self.lumi_order)]
128  play_lumi = self.next_lumi_index;
129  self.next_lumi_index += 1
130 
131  self.log.info("Start copying lumi (original) %06d -> %06d (playback)", orig_lumi, play_lumi)
132 
133  lumi_dct = self.lumi_found[orig_lumi]
134  streams = lumi_dct["streams"]
135 
136  def ijoin(f):
137  return os.path.join(self.input, f)
138 
139  def ojoin(f):
140  return os.path.join(self.output, f)
141 
142  written_files = set()
143  for stream, v in streams.items():
144  jsn_orig_fn, stream_source = v
145  jsn_play_fn = "run%06d_ls%04d_stream%s_%s.jsn" % (self.run, play_lumi, stream, stream_source)
146 
147  # define dat filename
148  ext = "dat"
149  if stream.startswith("streamDQMHistograms"):
150  ext = "pb"
151  dat_play_fn = "run%06d_ls%04d_stream%s_%s.%s" % (self.run, play_lumi, stream, stream_source, ext)
152 
153  # read the original file name, for copying
154  with open(ijoin(jsn_orig_fn), 'r') as f:
155  jsn_data = json.load(f)
156  dat_orig_fn = jsn_data["data"][3]
157 
158  # copy the data file
159  if os.path.exists(ijoin(dat_orig_fn)):
160  self.log.info("C: %s -> %s", dat_orig_fn, dat_play_fn)
161  shutil.copyfile(ijoin(dat_orig_fn), ojoin(dat_play_fn))
162 
163  written_files.add(dat_play_fn)
164  else:
165  log.warning("Dat file is missing: %s", dat_orig_fn)
166 
167  # write a new json file point to a different data file
168  # this has to be atomic!
169  jsn_data["data"][3] = dat_play_fn
170 
171  f = tempfile.NamedTemporaryFile(prefix=jsn_play_fn+ ".", suffix=".tmp", dir = self.output, delete=False)
172  tmp_fp = f.name
173  json.dump(jsn_data, f)
174  f.close()
175 
176  os.rename(tmp_fp, ojoin(jsn_play_fn))
177  written_files.add(jsn_play_fn)
178 
179  self.log.info("Copied %d files for lumi %06d", len(written_files), play_lumi)
180 
181  self.lumi_backlog.append((play_lumi, written_files))
182  while len(self.lumi_backlog) > self.lumi_backlog_size:
183  old_lumi, files_to_delete = self.lumi_backlog.popleft()
184 
185  self.log.info("Deleting %d files for old lumi %06d", len(files_to_delete), old_lumi)
186  for f in files_to_delete:
187  os.unlink(ojoin(f))
188 
189  def do_exec(self):
190  last_write = 0
191  lumi_produced = 0
192 
193  while True:
194  time.sleep(1)
195 
196  now = time.time()
197  if (now - last_write) > self.opts.playback_time_lumi:
198  last_write = now
199 
200  if self.opts.playback_nlumi > -1 and lumi_produced >= self.opts.playback_nlumi:
201  break
202 
203  self.do_create_lumi()
204  lumi_produced += 1
205 
206  # write eor
207  eor_fn = "run%06d_ls0000_EoR.jsn" % (self.run, )
208  eor_fp = os.path.join(self.output, eor_fn)
209  with open(eor_fp, "w"):
210  pass
211 
212  self.log.info("Wrote EoR: %s", eor_fp)
213 
214 start_dqm_job = """
215 #!/bin/env /bin/bash
216 set -x #echo on
217 TODAY=$(date)
218 logname="/var/log/hltd/pid/hlt_run$4_pid$$.log"
219 lognamez="/var/log/hltd/pid/hlt_run$4_pid$$_gzip.log.gz"
220 #override the noclobber option by using >| operator for redirection - then keep appending to log
221 echo startDqmRun invoked $TODAY with arguments $1 $2 $3 $4 $5 $6 $7 $8 >| $logname
222 export http_proxy="http://cmsproxy.cms:3128"
223 export https_proxy="https://cmsproxy.cms:3128/"
224 export NO_PROXY=".cms"
225 export SCRAM_ARCH=$2
226 cd $1
227 cd base
228 source cmsset_default.sh >> $logname
229 cd $1
230 cd current
231 pwd >> $logname 2>&1
232 eval `scram runtime -sh`;
233 cd $3;
234 pwd >> $logname 2>&1
235 #exec esMonitoring.py -z $lognamez cmsRun `readlink $6` runInputDir=$5 runNumber=$4 $7 $8 >> $logname 2>&1
236 exec esMonitoring.py cmsRun `readlink $6` runInputDir=$5 runNumber=$4 $7 $8
237 """
238 
239 start_dqm_job = start_dqm_job.replace("/var/log/hltd/pid/", '{log_path}/')
240 start_dqm_job = start_dqm_job.replace(" cmsRun ", ' {cmsRun} ')
241 
242 
243 RunDesc = collections.namedtuple('Run', ['run', 'run_fp', 'global_fp', 'global_param'])
244 RunState = collections.namedtuple('RunState', ['desc', 'proc'])
245 
247  def _set_name(self):
248  x = os.path.basename(self.cfg_file)
249  x = re.sub(r'(.*)\.py', r'\1', x)
250  x = re.sub(r'(.*)_cfg', r'\1', x)
251  x = re.sub(r'(.*)-live', r'\1', x)
252  x = re.sub(r'(.*)_sourceclient', r'\1', x)
253  x = re.sub(r'(.*)_dqm', r'\1', x)
254 
255  x = "".join([c for c in x if c.isalnum()])
256  self.tag = x
257  self.name = "cmssw_%s" % x
258 
259  def _find_release(self):
260  fp = os.path.realpath(self.cfg_file)
261  while len(fp) > 3:
262  bn = os.path.basename(fp)
263  fp = os.path.dirname(fp)
264 
265  if bn == "src":
266  break
267 
268  if len(fp) <= 3:
269  raise Exception("Could not find the cmssw release area.")
270 
271  self.cmsenv_path = fp
272  self.log.info("cmsenv path: %s", self.cmsenv_path)
273 
274  def _prepare_files(self):
275  self.home_path = os.path.join(self.opts.work_home, "%s_%s" % (self.name, hex(id(self))))
276  self.home_path = os.path.realpath(self.home_path)
277  os.makedirs(self.home_path)
278 
279  self.log_path = self.opts.work_logs
280  self.log.info("logs path: %s", self.log_path)
281 
282  self.exec_file = os.path.join(self.home_path, "startDqmRun.sh")
283  self.log.info("Creating: %s", self.exec_file)
284  f = open(self.exec_file, "w")
285  template = start_dqm_job
286  body = template.format(log_path=self.log_path, cmsRun=self.opts.cmsRun)
287  f.write(body)
288  f.close()
289  os.chmod(self.exec_file, 0o755)
290 
291  cmsset_globs = ["/afs/cern.ch/cms/cmsset_default.sh", "/home/dqm*local/base/cmsset_default.sh"]
292  cmsset_target = None
293  for t in cmsset_globs:
294  files = glob.glob(t)
295  for f in files:
296  cmsset_target = f
297  break
298 
299  if cmsset_target is not None:
300  base = os.path.join(self.home_path, "base")
301  os.makedirs(base)
302 
303  cmsset_link = os.path.join(base, "cmsset_default.sh")
304  self.log.info("Linking : %s -> %s", cmsset_link, cmsset_target)
305  os.symlink(cmsset_target, cmsset_link)
306  else:
307  self.log.warning("Couldn't find cmsset_default.sh, source it yourself!")
308 
309  current_link = os.path.join(self.home_path, "current")
310  target = os.path.relpath(self.cmsenv_path, self.home_path)
311  self.log.info("Linking : %s -> %s", current_link, target)
312  os.symlink(target, current_link)
313 
314  # check if current is outside the release directory
315  # otherwise scram gets stuck forever
316  cp = os.path.commonprefix([self.home_path, self.cmsenv_path])
317  if self.cmsenv_path == cp:
318  self.log.error("Working directory (incl. control directory), have to be outside the cmssw release. Otherwise scram fails due to recursive links.")
319  raise Exception("Invalid home_path: %s" % self.home_path)
320 
321  output_link = os.path.join(self.home_path, "output")
322  output_target = os.path.realpath(self.opts.work_output)
323  target = os.path.relpath(output_target, self.home_path)
324  self.log.info("Linking : %s -> %s", output_link, target)
325  os.symlink(target, output_link)
326  self.output_path = output_link
327 
328  cfg_link = os.path.join(self.home_path, os.path.basename(self.cfg_file))
329  target = self.cfg_fp
330  self.log.info("Linking : %s -> %s", cfg_link, target)
331  os.symlink(target, cfg_link)
332  self.cfg_link = cfg_link
333 
334 
335  def do_init(self):
336  # check if our input directory is okay
337  self.ramdisk = self.opts.work_ramdisk
338  self.run = self.opts.run
339  self.cfg_file = self.kwargs["cfg_file"]
340 
341  if not os.path.isfile(self.cfg_file):
342  raise Exception("Configuration file not found: %s" % self.cfg_file)
343 
344  self.cfg_fp = os.path.realpath(self.cfg_file)
345  self.ramdisk_fp = os.path.realpath(self.ramdisk)
346 
347  self._set_name()
348  self._find_release()
349  self._prepare_files()
350 
351  def make_args(self, run):
352  args = []
353  args.append("bash") # arg 0
354  args.append(self.exec_file) # arg 0
355  args.append(self.home_path) # home path
356  args.append("slc6_amd64_gcc491") # release
357  args.append(self.output_path) # cwd/output path
358  args.append(str(run)) # run
359  args.append(self.ramdisk_fp) # ramdisk
360  args.append(self.cfg_link) # cmsRun arg 1
361  args.append("runkey=pp_run") # cmsRun arg 2
362 
363  return args
364 
365  def discover_latest(self):
366  re_run = re.compile(r'run([0-9]+)')
367  re_global = re.compile(r'\.run([0-9]+)\.global')
368 
369  # find runs
370  runs = {}
371  globals = {}
372  for x in os.listdir(self.ramdisk):
373  m = re_run.match(x)
374  if m:
375  runs[int(m.group(1))] = x
376 
377  m = re_global.match(x)
378  if m:
379  globals[int(m.group(1))] = x
380 
381  # find max global for which there is a run directory
382  run_set = set(runs.keys())
383  run_set = run_set.intersection(globals.keys())
384 
385  if self.opts.run < 0:
386  largest = max(run_set)
387  else:
388  largest = self.opts.run
389 
390  #self.log.info("Largest: %s", largest)
391  global_fp = os.path.join(self.ramdisk, globals[largest])
392  with open(global_fp, "r") as f:
393  global_param = f.read()
394 
395  return RunDesc(
396  run = largest,
397  run_fp = os.path.join(self.ramdisk, runs[largest]),
398  global_fp = global_fp,
399  global_param = global_param,
400  )
401 
402  def start_run(self, current):
403  old_state = self.current_state
404 
405  # kill the old run
406  # nope, since it involves eof and i am lazy
407  if old_state:
408  return
409 
410  args = self.make_args(current.run)
411  self.log.info("Executing: %s", " ".join(args))
412  proc = subprocess.Popen(args, preexec_fn=preexec_kill_on_pdeath)
413  self.current_state = RunState(desc=current, proc=proc)
414 
415  def do_exec(self):
416  time.sleep(1)
417 
418  self.current_state = None
419 
420  while True:
421  latest = self.discover_latest()
422  if self.current_state is None or latest != self.current_state.desc:
423  self.log.info("Found latest run: %s", latest)
424 
425  self.start_run(latest)
426 
427  if not self.current_state:
428  self.log.info("Run not found, waiting 1 sec.")
429  else:
430  r = self.current_state.proc.poll()
431  if r is not None:
432  self.log.info("Process exitted: %s", r)
433 
434  return 0
435 
436  time.sleep(1)
437 
438 import getpass
439 if __name__ == "__main__":
440  if len(sys.argv) == 2 and sys.argv[-1].endswith(".pkl"):
441  f = sys.argv[-1]
442  obj = Applet.read(f)
443 
444  ret = obj.do_exec()
445  sys.exit(ret if ret else 0)
446 
447  # control -> interal files and home directory for the run
448  subdirectories = ["ramdisk", "output", "control", "home", "logs", "dqm_monitoring"]
449  username = getpass.getuser()
450 
451  parser = argparse.ArgumentParser(description="Emulate DQM@P5 environment and launch cmssw jobs.")
452  #parser.add_argument('-q', action='store_true', help="Don't write to stdout, just the log file.")
453  #parser.add_argument("log", type=str, help="Filename to write.", metavar="<logfile.gz>")
454 
455  parser.add_argument("--work", "-w", type=str, help="Working directory (used for inputs,outputs,monitoring and logs).", default="/tmp/pplay." + username)
456  parser.add_argument("--clean", "-c", action="store_true", help="Clean work directories (if they are not set).", default=False)
457  parser.add_argument("--dry", "-n", action="store_true", help="Do not execute, just init.", default=False)
458 
459  work_group = parser.add_argument_group('Paths', 'Path options for cmssw jobs, auto generated if not specified.')
460  for subdirectory in subdirectories:
461  work_group.add_argument("--work_%s" % subdirectory, type=str, help="Path for %s directory." % subdirectory, default=None)
462 
463  playback_group = parser.add_argument_group('Playback', 'Playback configuration/parameters.')
464  playback_group.add_argument("--playback", "-p", type=str, metavar="PLAYBACK_INPUT_DIR", help="Enable playback (emulate file delivery, otherwise set work_input).", default=None)
465  playback_group.add_argument("--playback_nlumi", type=int, help="Number of lumis to deliver, -1 for forever.", default=-1)
466  playback_group.add_argument("--playback_time_lumi", type=float, help="Number of seconds between lumisections.", default=23.3)
467 
468  run_group = parser.add_argument_group('Run', 'Run configuration/parameters.')
469  run_group.add_argument("--run", type=int, help="Run number, -1 for autodiscovery.", default=-1)
470  run_group.add_argument("--cmsRun", type=str, help="cmsRun command to run, for igprof and so on.", default="cmsRun")
471 
472  parser.add_argument('cmssw_configs', metavar='cmssw_cfg.py', type=str, nargs='*', help='List of cmssw jobs (clients).')
473 
474  args = parser.parse_args()
475 
476  if len(args.cmssw_configs) and args.cmssw_configs[0] == "--":
477  # compat with 2.6
478  args.cmssw_configs = args.cmssw_configs[1:]
479 
480  for subdirectory in subdirectories:
481  if getattr(args, "work_" + subdirectory) is None:
482  setattr(args, "work_" + subdirectory, os.path.join(args.work, subdirectory))
483 
484  path = getattr(args, "work_" + subdirectory)
485  if args.clean and os.path.isdir(path):
486  root_log.info("Removing directory: %s", path)
487  shutil.rmtree(path)
488 
489  path = getattr(args, "work_" + subdirectory)
490  if not os.path.isdir(path):
491  os.makedirs(path)
492 
493  root_log.info("Using directory: %s", path)
494 
495  print("*"*80)
496  print(args)
497  print("*"*80)
498 
499  applets = []
500 
501  if args.playback:
502  # launch playback service
503  playback = Playback("playback_emu", opts=args)
504  applets.append(playback)
505 
506  for cfg in args.cmssw_configs:
507  cfg_a = FrameworkJob("framework_job", opts=args, cfg_file=cfg)
508  applets.append(cfg_a)
509 
510  if len(applets) == 0:
511  sys.stderr.write("At least one process should be specified, use --playback and/or cmssw_configs options.\n")
512 
513  # serialize them into control directory
514  for a in applets:
515  fn = "%s_%s.pkl" % (a.name, hex(id(a)))
516  a.write(os.path.join(args.work_control, fn))
517 
518  if args.dry:
519  sys.exit(0)
520 
521  # launch each in a separate subprocess
522  for a in applets:
523  fp = a.control_fp
524 
525  args = [os.path.realpath(__file__), fp]
526  a.control_proc = subprocess.Popen(args, preexec_fn=preexec_kill_on_pdeath)
527 
528  for a in applets:
529  # wait till everything finishes
530  a.control_proc.wait()
531 
static const TGPicture * info(bool iBackgroundIsBlack)
def __init__(self, name, opts, kwargs)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
static std::string join(char **cmd)
Definition: RemoteFile.cc:21