CMS 3D CMS Logo

personalPlayback.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 import os
4 import sys
5 import argparse
6 import asyncore
7 import pickle
8 import logging
9 import subprocess
10 import shutil
11 import re
12 import collections
13 import json
14 import tempfile
15 import signal
16 import time
17 import glob
18 
19 # Utilities
20 log_format = '%(asctime)s: %(name)-20s - %(levelname)-8s - %(message)s'
21 logging.basicConfig(format=log_format, level=logging.INFO)
22 root_log = logging.getLogger()
23 
24 class Applet(object):
25  def __init__(self, name, opts, **kwargs):
26  self.name = name
27  self.opts = opts
28  self.kwargs = kwargs
29 
30  self.do_init()
31 
32  def write(self, fp):
33  self.control_fp = fp
34 
35  with open(fp, "wb") as f:
36  pickle.dump(self, f)
37 
38  self.log.info("Written control file: %s", fp)
39 
40  @staticmethod
41  def read(fp):
42  with open(fp, "rb") as f:
43  return pickle.load(f)
44 
45  @property
46  def log(self):
47  return logging.getLogger(self.name)
48 
49  def do_init(self):
50  pass
51 
52  def do_exec(self):
53  pass
54 
56  import ctypes
57  libc = ctypes.CDLL("libc.so.6")
58  PR_SET_PDEATHSIG = 1
59  libc.prctl(PR_SET_PDEATHSIG, signal.SIGKILL)
60 
61 # Actual implementation of the workers
62 
64  re_pattern = re.compile(r'run([0-9]+)_ls([0-9]+)_stream([A-Za-z0-9]+)_([A-Za-z0-9_-]+)\.jsn')
65 
66  def discover_files(self):
67  self.lumi_found = {}
68 
69  files_found = set()
70  streams_found = set()
71  run_found = None
72  for f in os.listdir(self.input):
73  r = self.re_pattern.match(f)
74  if r:
75  run, lumi, stream, stream_source = r.groups()
76  run, lumi = int(run), int(lumi)
77 
78  if run_found is None:
79  run_found = run
80  elif run_found != run:
81  raise Exception("Files from multiple runs are not (yet) supported for as playback input.")
82 
83  lumi_dct = self.lumi_found.setdefault(lumi, { 'streams': {} })
84  lumi_dct["streams"][stream] = (f, stream_source)
85  files_found.add(f)
86  streams_found.add(stream)
87 
88  if run_found is None:
89  raise Exception("Playback files not found.")
90 
91  if self.run < 0:
92  self.run = run_found
93 
94  self.log.info("Found run %s, will map output to run %s", run_found, self.run)
95  self.log.info("Found %d lumisections with %d files", len(self.lumi_found), len(files_found))
96  self.log.info("Found %d streams: %s", len(streams_found), list(streams_found))
97 
98  self.lumi_order = list(self.lumi_found.keys())
99  self.lumi_order.sort()
100  self.log.info("Lumi order: %s", str(self.lumi_order))
101 
102  def do_init(self):
103  # check if our input directory is okay
104  self.input = self.opts.playback
105  self.ramdisk = self.opts.work_ramdisk
106  self.run = self.opts.run
107  self.log.info("Using input directory: %s", self.input)
108 
109  self.discover_files()
110 
111  self.output = os.path.join(self.ramdisk, "run%06d" % self.run)
112  if not os.path.isdir(self.output):
113  os.makedirs(self.output)
114  self.log.info("Using output directory: %s", self.output)
115 
116  self.global_file = os.path.join(self.ramdisk, ".run%06d.global" % self.run)
117  self.log.info("Writing: %s", self.global_file)
118  with open(self.global_file, "w") as f:
119  f.write("run_key = pp_run")
120 
121  self.lumi_backlog = collections.deque()
124 
125  def do_create_lumi(self):
126  orig_lumi = self.lumi_order[(self.next_lumi_index - 1) % len(self.lumi_order)]
127  play_lumi = self.next_lumi_index;
128  self.next_lumi_index += 1
129 
130  self.log.info("Start copying lumi (original) %06d -> %06d (playback)", orig_lumi, play_lumi)
131 
132  lumi_dct = self.lumi_found[orig_lumi]
133  streams = lumi_dct["streams"]
134 
135  def ijoin(f):
136  return os.path.join(self.input, f)
137 
138  def ojoin(f):
139  return os.path.join(self.output, f)
140 
141  written_files = set()
142  for stream, v in streams.items():
143  jsn_orig_fn, stream_source = v
144  jsn_play_fn = "run%06d_ls%04d_stream%s_%s.jsn" % (self.run, play_lumi, stream, stream_source)
145 
146  # define dat filename
147  ext = "dat"
148  if stream.startswith("streamDQMHistograms"):
149  ext = "pb"
150  dat_play_fn = "run%06d_ls%04d_stream%s_%s.%s" % (self.run, play_lumi, stream, stream_source, ext)
151 
152  # read the original file name, for copying
153  with open(ijoin(jsn_orig_fn), 'r') as f:
154  jsn_data = json.load(f)
155  dat_orig_fn = jsn_data["data"][3]
156 
157  # copy the data file
158  if os.path.exists(ijoin(dat_orig_fn)):
159  self.log.info("C: %s -> %s", dat_orig_fn, dat_play_fn)
160  shutil.copyfile(ijoin(dat_orig_fn), ojoin(dat_play_fn))
161 
162  written_files.add(dat_play_fn)
163  else:
164  log.warning("Dat file is missing: %s", dat_orig_fn)
165 
166  # write a new json file point to a different data file
167  # this has to be atomic!
168  jsn_data["data"][3] = dat_play_fn
169 
170  f = tempfile.NamedTemporaryFile(prefix=jsn_play_fn+ ".", suffix=".tmp", dir = self.output, delete=False)
171  tmp_fp = f.name
172  json.dump(jsn_data, f)
173  f.close()
174 
175  os.rename(tmp_fp, ojoin(jsn_play_fn))
176  written_files.add(jsn_play_fn)
177 
178  self.log.info("Copied %d files for lumi %06d", len(written_files), play_lumi)
179 
180  self.lumi_backlog.append((play_lumi, written_files))
181  while len(self.lumi_backlog) > self.lumi_backlog_size:
182  old_lumi, files_to_delete = self.lumi_backlog.popleft()
183 
184  self.log.info("Deleting %d files for old lumi %06d", len(files_to_delete), old_lumi)
185  for f in files_to_delete:
186  os.unlink(ojoin(f))
187 
188  def do_exec(self):
189  last_write = 0
190  lumi_produced = 0
191 
192  while True:
193  time.sleep(1)
194 
195  now = time.time()
196  if (now - last_write) > self.opts.playback_time_lumi:
197  last_write = now
198 
199  if self.opts.playback_nlumi > -1 and lumi_produced >= self.opts.playback_nlumi:
200  break
201 
202  self.do_create_lumi()
203  lumi_produced += 1
204 
205  # write eor
206  eor_fn = "run%06d_ls0000_EoR.jsn" % (self.run, )
207  eor_fp = os.path.join(self.output, eor_fn)
208  with open(eor_fp, "w"):
209  pass
210 
211  self.log.info("Wrote EoR: %s", eor_fp)
212 
213 start_dqm_job = """
214 #!/bin/env /bin/bash
215 set -x #echo on
216 TODAY=$(date)
217 logname="/var/log/hltd/pid/hlt_run$4_pid$$.log"
218 lognamez="/var/log/hltd/pid/hlt_run$4_pid$$_gzip.log.gz"
219 #override the noclobber option by using >| operator for redirection - then keep appending to log
220 echo startDqmRun invoked $TODAY with arguments $1 $2 $3 $4 $5 $6 $7 $8 >| $logname
221 export http_proxy="http://cmsproxy.cms:3128"
222 export https_proxy="https://cmsproxy.cms:3128/"
223 export NO_PROXY=".cms"
224 export SCRAM_ARCH=$2
225 cd $1
226 cd base
227 source cmsset_default.sh >> $logname
228 cd $1
229 cd current
230 pwd >> $logname 2>&1
231 eval `scram runtime -sh`;
232 cd $3;
233 pwd >> $logname 2>&1
234 #exec esMonitoring.py -z $lognamez cmsRun `readlink $6` runInputDir=$5 runNumber=$4 $7 $8 >> $logname 2>&1
235 exec esMonitoring.py cmsRun `readlink $6` runInputDir=$5 runNumber=$4 $7 $8
236 """
237 
238 start_dqm_job = start_dqm_job.replace("/var/log/hltd/pid/", '{log_path}/')
239 start_dqm_job = start_dqm_job.replace(" cmsRun ", ' {cmsRun} ')
240 
241 
242 RunDesc = collections.namedtuple('Run', ['run', 'run_fp', 'global_fp', 'global_param'])
243 RunState = collections.namedtuple('RunState', ['desc', 'proc'])
244 
246  def _set_name(self):
247  x = os.path.basename(self.cfg_file)
248  x = re.sub(r'(.*)\.py', r'\1', x)
249  x = re.sub(r'(.*)_cfg', r'\1', x)
250  x = re.sub(r'(.*)-live', r'\1', x)
251  x = re.sub(r'(.*)_sourceclient', r'\1', x)
252  x = re.sub(r'(.*)_dqm', r'\1', x)
253 
254  x = "".join([c for c in x if c.isalnum()])
255  self.tag = x
256  self.name = "cmssw_%s" % x
257 
258  def _find_release(self):
259  fp = os.path.realpath(self.cfg_file)
260  while len(fp) > 3:
261  bn = os.path.basename(fp)
262  fp = os.path.dirname(fp)
263 
264  if bn == "src":
265  break
266 
267  if len(fp) <= 3:
268  raise Exception("Could not find the cmssw release area.")
269 
270  self.cmsenv_path = fp
271  self.log.info("cmsenv path: %s", self.cmsenv_path)
272 
273  def _prepare_files(self):
274  self.home_path = os.path.join(self.opts.work_home, "%s_%s" % (self.name, hex(id(self))))
275  self.home_path = os.path.realpath(self.home_path)
276  os.makedirs(self.home_path)
277 
278  self.log_path = self.opts.work_logs
279  self.log.info("logs path: %s", self.log_path)
280 
281  self.exec_file = os.path.join(self.home_path, "startDqmRun.sh")
282  self.log.info("Creating: %s", self.exec_file)
283  f = open(self.exec_file, "w")
284  template = start_dqm_job
285  body = template.format(log_path=self.log_path, cmsRun=self.opts.cmsRun)
286  f.write(body)
287  f.close()
288  os.chmod(self.exec_file, 0o755)
289 
290  cmsset_globs = ["/afs/cern.ch/cms/cmsset_default.sh", "/home/dqm*local/base/cmsset_default.sh"]
291  cmsset_target = None
292  for t in cmsset_globs:
293  files = glob.glob(t)
294  for f in files:
295  cmsset_target = f
296  break
297 
298  if cmsset_target is not None:
299  base = os.path.join(self.home_path, "base")
300  os.makedirs(base)
301 
302  cmsset_link = os.path.join(base, "cmsset_default.sh")
303  self.log.info("Linking : %s -> %s", cmsset_link, cmsset_target)
304  os.symlink(cmsset_target, cmsset_link)
305  else:
306  self.log.warning("Couldn't find cmsset_default.sh, source it yourself!")
307 
308  current_link = os.path.join(self.home_path, "current")
309  target = os.path.relpath(self.cmsenv_path, self.home_path)
310  self.log.info("Linking : %s -> %s", current_link, target)
311  os.symlink(target, current_link)
312 
313  # check if current is outside the release directory
314  # otherwise scram gets stuck forever
315  cp = os.path.commonprefix([self.home_path, self.cmsenv_path])
316  if self.cmsenv_path == cp:
317  self.log.error("Working directory (incl. control directory), have to be outside the cmssw release. Otherwise scram fails due to recursive links.")
318  raise Exception("Invalid home_path: %s" % self.home_path)
319 
320  output_link = os.path.join(self.home_path, "output")
321  output_target = os.path.realpath(self.opts.work_output)
322  target = os.path.relpath(output_target, self.home_path)
323  self.log.info("Linking : %s -> %s", output_link, target)
324  os.symlink(target, output_link)
325  self.output_path = output_link
326 
327  cfg_link = os.path.join(self.home_path, os.path.basename(self.cfg_file))
328  target = self.cfg_fp
329  self.log.info("Linking : %s -> %s", cfg_link, target)
330  os.symlink(target, cfg_link)
331  self.cfg_link = cfg_link
332 
333 
334  def do_init(self):
335  # check if our input directory is okay
336  self.ramdisk = self.opts.work_ramdisk
337  self.run = self.opts.run
338  self.cfg_file = self.kwargs["cfg_file"]
339 
340  if not os.path.isfile(self.cfg_file):
341  raise Exception("Configuration file not found: %s" % self.cfg_file)
342 
343  self.cfg_fp = os.path.realpath(self.cfg_file)
344  self.ramdisk_fp = os.path.realpath(self.ramdisk)
345 
346  self._set_name()
347  self._find_release()
348  self._prepare_files()
349 
350  def make_args(self, run):
351  args = []
352  args.append("bash") # arg 0
353  args.append(self.exec_file) # arg 0
354  args.append(self.home_path) # home path
355  args.append("slc6_amd64_gcc491") # release
356  args.append(self.output_path) # cwd/output path
357  args.append(str(run)) # run
358  args.append(self.ramdisk_fp) # ramdisk
359  args.append(self.cfg_link) # cmsRun arg 1
360  args.append("runkey=pp_run") # cmsRun arg 2
361 
362  return args
363 
364  def discover_latest(self):
365  re_run = re.compile(r'run([0-9]+)')
366  re_global = re.compile(r'\.run([0-9]+)\.global')
367 
368  # find runs
369  runs = {}
370  globals = {}
371  for x in os.listdir(self.ramdisk):
372  m = re_run.match(x)
373  if m:
374  runs[int(m.group(1))] = x
375 
376  m = re_global.match(x)
377  if m:
378  globals[int(m.group(1))] = x
379 
380  # find max global for which there is a run directory
381  run_set = set(runs.keys())
382  run_set = run_set.intersection(globals.keys())
383 
384  if self.opts.run < 0:
385  largest = max(run_set)
386  else:
387  largest = self.opts.run
388 
389  #self.log.info("Largest: %s", largest)
390  global_fp = os.path.join(self.ramdisk, globals[largest])
391  with open(global_fp, "r") as f:
392  global_param = f.read()
393 
394  return RunDesc(
395  run = largest,
396  run_fp = os.path.join(self.ramdisk, runs[largest]),
397  global_fp = global_fp,
398  global_param = global_param,
399  )
400 
401  def start_run(self, current):
402  old_state = self.current_state
403 
404  # kill the old run
405  # nope, since it involves eof and i am lazy
406  if old_state:
407  return
408 
409  args = self.make_args(current.run)
410  self.log.info("Executing: %s", " ".join(args))
411  proc = subprocess.Popen(args, preexec_fn=preexec_kill_on_pdeath)
412  self.current_state = RunState(desc=current, proc=proc)
413 
414  def do_exec(self):
415  time.sleep(1)
416 
417  self.current_state = None
418 
419  while True:
420  latest = self.discover_latest()
421  if self.current_state is None or latest != self.current_state.desc:
422  self.log.info("Found latest run: %s", latest)
423 
424  self.start_run(latest)
425 
426  if not self.current_state:
427  self.log.info("Run not found, waiting 1 sec.")
428  else:
429  r = self.current_state.proc.poll()
430  if r is not None:
431  self.log.info("Process exitted: %s", r)
432 
433  return 0
434 
435  time.sleep(1)
436 
437 import getpass
438 if __name__ == "__main__":
439  if len(sys.argv) == 2 and sys.argv[-1].endswith(".pkl"):
440  f = sys.argv[-1]
441  obj = Applet.read(f)
442 
443  ret = obj.do_exec()
444  sys.exit(ret if ret else 0)
445 
446  # control -> interal files and home directory for the run
447  subdirectories = ["ramdisk", "output", "control", "home", "logs", "dqm_monitoring"]
448  username = getpass.getuser()
449 
450  parser = argparse.ArgumentParser(description="Emulate DQM@P5 environment and launch cmssw jobs.")
451  #parser.add_argument('-q', action='store_true', help="Don't write to stdout, just the log file.")
452  #parser.add_argument("log", type=str, help="Filename to write.", metavar="<logfile.gz>")
453 
454  parser.add_argument("--work", "-w", type=str, help="Working directory (used for inputs,outputs,monitoring and logs).", default="/tmp/pplay." + username)
455  parser.add_argument("--clean", "-c", action="store_true", help="Clean work directories (if they are not set).", default=False)
456  parser.add_argument("--dry", "-n", action="store_true", help="Do not execute, just init.", default=False)
457 
458  work_group = parser.add_argument_group('Paths', 'Path options for cmssw jobs, auto generated if not specified.')
459  for subdirectory in subdirectories:
460  work_group.add_argument("--work_%s" % subdirectory, type=str, help="Path for %s directory." % subdirectory, default=None)
461 
462  playback_group = parser.add_argument_group('Playback', 'Playback configuration/parameters.')
463  playback_group.add_argument("--playback", "-p", type=str, metavar="PLAYBACK_INPUT_DIR", help="Enable playback (emulate file delivery, otherwise set work_input).", default=None)
464  playback_group.add_argument("--playback_nlumi", type=int, help="Number of lumis to deliver, -1 for forever.", default=-1)
465  playback_group.add_argument("--playback_time_lumi", type=float, help="Number of seconds between lumisections.", default=23.3)
466 
467  run_group = parser.add_argument_group('Run', 'Run configuration/parameters.')
468  run_group.add_argument("--run", type=int, help="Run number, -1 for autodiscovery.", default=-1)
469  run_group.add_argument("--cmsRun", type=str, help="cmsRun command to run, for igprof and so on.", default="cmsRun")
470 
471  parser.add_argument('cmssw_configs', metavar='cmssw_cfg.py', type=str, nargs='*', help='List of cmssw jobs (clients).')
472 
473  args = parser.parse_args()
474 
475  if len(args.cmssw_configs) and args.cmssw_configs[0] == "--":
476  # compat with 2.6
477  args.cmssw_configs = args.cmssw_configs[1:]
478 
479  for subdirectory in subdirectories:
480  if getattr(args, "work_" + subdirectory) is None:
481  setattr(args, "work_" + subdirectory, os.path.join(args.work, subdirectory))
482 
483  path = getattr(args, "work_" + subdirectory)
484  if args.clean and os.path.isdir(path):
485  root_log.info("Removing directory: %s", path)
486  shutil.rmtree(path)
487 
488  path = getattr(args, "work_" + subdirectory)
489  if not os.path.isdir(path):
490  os.makedirs(path)
491 
492  root_log.info("Using directory: %s", path)
493 
494  print "*"*80
495  print args
496  print "*"*80
497 
498  applets = []
499 
500  if args.playback:
501  # launch playback service
502  playback = Playback("playback_emu", opts=args)
503  applets.append(playback)
504 
505  for cfg in args.cmssw_configs:
506  cfg_a = FrameworkJob("framework_job", opts=args, cfg_file=cfg)
507  applets.append(cfg_a)
508 
509  if len(applets) == 0:
510  sys.stderr.write("At least one process should be specified, use --playback and/or cmssw_configs options.\n")
511 
512  # serialize them into control directory
513  for a in applets:
514  fn = "%s_%s.pkl" % (a.name, hex(id(a)))
515  a.write(os.path.join(args.work_control, fn))
516 
517  if args.dry:
518  sys.exit(0)
519 
520  # launch each in a separate subprocess
521  for a in applets:
522  fp = a.control_fp
523 
524  args = [os.path.realpath(__file__), fp]
525  a.control_proc = subprocess.Popen(args, preexec_fn=preexec_kill_on_pdeath)
526 
527  for a in applets:
528  # wait till everything finishes
529  a.control_proc.wait()
530 
def __init__(self, name, opts, kwargs)
static std::string join(char **cmd)
Definition: RemoteFile.cc:18
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run