CMS 3D CMS Logo

o2olib.py
Go to the documentation of this file.
1 __author__ = 'Giacomo Govi'
2 
3 import sqlalchemy
4 import sqlalchemy.ext.declarative
5 import subprocess
6 from datetime import datetime
7 import os
8 import sys
9 import logging
10 import string
11 import json
12 
13 import CondCore.Utilities.credentials as auth
14 
15 prod_db_service = ['cms_orcon_prod','cms_orcon_prod/cms_cond_general_w']
16 dev_db_service = ['cms_orcoff_prep','cms_orcoff_prep/cms_test_conditions']
17 schema_name = 'CMS_CONDITIONS'
18 sqlalchemy_tpl = 'oracle://%s:%s@%s'
19 coral_tpl = 'oracle://%s/%s'
20 private_db = 'sqlite:///o2o_jobs.db'
21 startStatus = -1
22 authPathEnvVar = 'COND_AUTH_PATH'
23 messageLevelEnvVar = 'O2O_LOG_LEVEL'
24 logFolderEnvVar = 'O2O_LOG_FOLDER'
25 
26 _Base = sqlalchemy.ext.declarative.declarative_base()
27 
28 fmt_str = "[%(asctime)s] %(levelname)s: %(message)s"
29 logLevel = logging.INFO
30 if messageLevelEnvVar in os.environ:
31  levStr = os.environ[messageLevelEnvVar]
32  if levStr == 'DEBUG':
33  logLevel = logging.DEBUG
34 logFormatter = logging.Formatter(fmt_str)
35 
36 class O2OJob(_Base):
37  __tablename__ = 'O2O_JOB'
38  __table_args__ = {'schema' : schema_name}
39  name = sqlalchemy.Column(sqlalchemy.String(100), primary_key=True)
40  enabled = sqlalchemy.Column(sqlalchemy.Integer, nullable=False)
41  tag_name = sqlalchemy.Column(sqlalchemy.String(100), nullable=False)
42  interval = sqlalchemy.Column(sqlalchemy.Integer, nullable=False)
43 
44 class O2OJobConf(_Base):
45  __tablename__ = 'O2O_JOB_CONF'
46  __table_args__ = {'schema' : schema_name}
47  job_name = sqlalchemy.Column(sqlalchemy.ForeignKey(O2OJob.name), primary_key=True)
48  insertion_time = sqlalchemy.Column(sqlalchemy.TIMESTAMP, primary_key=True)
49  configuration = sqlalchemy.Column(sqlalchemy.String(4000), nullable=False)
50 
51  job = sqlalchemy.orm.relationship('O2OJob', primaryjoin="O2OJob.name==O2OJobConf.job_name")
52 
53 class O2ORun(_Base):
54  __tablename__ = 'O2O_RUN'
55  __table_args__ = {'schema' : schema_name}
56  job_name = sqlalchemy.Column(sqlalchemy.ForeignKey(O2OJob.name), primary_key=True)
57  start_time = sqlalchemy.Column(sqlalchemy.TIMESTAMP, primary_key=True)
58  end_time = sqlalchemy.Column(sqlalchemy.TIMESTAMP, nullable=True)
59  status_code = sqlalchemy.Column(sqlalchemy.Integer, nullable=False)
60  log = sqlalchemy.Column(sqlalchemy.CLOB, nullable=True)
61 
62  job = sqlalchemy.orm.relationship('O2OJob', primaryjoin="O2OJob.name==O2ORun.job_name")
63 
64 def get_db_credentials( db_service, authFile ):
65  (username, account, pwd) = auth.get_credentials( authPathEnvVar, db_service[1], authFile )
66  return username,pwd
67 
68 def print_table( headers, table ):
69  ws = []
70  for h in headers:
71  ws.append(len(h))
72  for row in table:
73  ind = 0
74  for c in row:
75  if ind<len(ws):
76  if len(c)> ws[ind]:
77  ws[ind] = len(c)
78  ind += 1
79 
80  def printf( row ):
81  line = ''
82  ind = 0
83  for w in ws:
84  fmt = '{:<%s}' %w
85  if ind<len(ws):
86  line += (fmt.format( row[ind] )+' ')
87  ind += 1
88  print line
89  printf( headers )
90  hsep = ''
91  for w in ws:
92  fmt = '{:-<%s}' %w
93  hsep += (fmt.format('')+' ')
94  print hsep
95  for row in table:
96  printf( row )
97 
98 class O2OMgr(object):
99  def __init__(self):
100  self.logger = logging.getLogger()
101  self.logger.setLevel(logLevel)
102  consoleHandler = logging.StreamHandler(sys.stdout)
103  consoleHandler.setFormatter(logFormatter)
104  self.logger.addHandler(consoleHandler)
105  self.eng = None
106 
107  def logger( self ):
108  return self.logger
109 
110  def getSession( self, db_service, auth ):
111  url = None
112  if db_service is None:
113  url = private_db
114  else:
115  self.logger.info('Getting credentials')
116  if auth is not None:
117  if not os.path.exists(auth):
118  self.logger.error('Authentication path %s is invalid.' %auth)
119  return None
120  try:
121  username, pwd = get_db_credentials( db_service, auth )
122  except Exception as e:
123  logging.debug(str(e))
124  username = None
125  pwd = None
126  if username is None:
127  logging.error('Credentials for service %s (machine=%s) are not available' %(db_service[0],db_service[1]))
128  raise Exception("Cannot connect to db %s" %db_service[0] )
129  url = sqlalchemy_tpl %(username,pwd,db_service[0])
130  session = None
131  try:
132  self.eng = sqlalchemy.create_engine( url )
133  session = sqlalchemy.orm.scoped_session( sqlalchemy.orm.sessionmaker(bind=self.eng))
134  except sqlalchemy.exc.SQLAlchemyError as dberror:
135  self.logger.error( str(dberror) )
136  return session
137 
139 
140  def __init__( self ):
141  O2OMgr.__init__(self)
142  self.db_connection = None
143  self.conf_dict = {}
144 
145 
146  def readConfiguration( self, config_filename ):
147  config = ''
148  try:
149  with open( config_filename, 'r' ) as config_file:
150  config = config_file.read()
151  if config == '':
152  O2OMgr.logger( self).error( 'The file %s contains an empty string.', config_filename )
153  else:
154  json.loads(config)
155  except IOError as e:
156  O2OMgr.logger( self).error( 'The file %s cannot be open.', config_filename )
157  except ValueError as e:
158  config = ''
159  O2OMgr.logger( self).error( 'The file %s contains an invalid json string.', config_filename )
160  return config
161 
162  def connect( self, service, args ):
163  self.session = O2OMgr.getSession( self,service, args.auth )
164  self.verbose = args.verbose
165  if self.session is None:
166  return False
167  else:
168  self.db_connection = coral_tpl %(service[0],schema_name)
169  self.conf_dict['db']=self.db_connection
170  return True
171  def runManager( self ):
172  return O2ORunMgr( self.db_connection, self.session, O2OMgr.logger( self ) )
173 
174  def add( self, job_name, config_filename, int_val, en_flag ):
175  res = self.session.query(O2OJob.enabled).filter_by(name=job_name)
176  enabled = None
177  for r in res:
178  enabled = r
179  if enabled:
180  O2OMgr.logger( self).error( "A job called '%s' exists already.", job_name )
181  return False
182  configJson = self.readConfiguration( config_filename )
183  if configJson == '':
184  return False
185  job = O2OJob(name=job_name,tag_name='-',enabled=en_flag,interval=int_val)
186  config = O2OJobConf( job_name=job_name, insertion_time = datetime.now(), configuration = configJson )
187  self.session.add(job)
188  self.session.add(config)
189  self.session.commit()
190  O2OMgr.logger( self).info( "New o2o job '%s' created.", job_name )
191  return True
192 
193  def set( self, job_name, en_flag ):
194  res = self.session.query(O2OJob.enabled).filter_by(name=job_name)
195  enabled = None
196  for r in res:
197  enabled = r
198  if enabled is None:
199  O2OMgr.logger( self).error( "A job called '%s' does not exist.", job_name )
200  return
201  job = O2OJob(name=job_name,enabled=en_flag)
202  action = 'enabled'
203  if not en_flag:
204  action = 'disabled'
205  self.session.merge(job)
206  self.session.commit()
207  O2OMgr.logger( self).info( "Job '%s' %s." %(job_name,action) )
208 
209  def setConfig( self, job_name, config_filename ):
210  res = self.session.query(O2OJob.enabled).filter_by(name=job_name)
211  enabled = None
212  for r in res:
213  enabled = r
214  if enabled is None:
215  O2OMgr.logger( self).error( "A job called '%s' does not exist.", job_name )
216  return
217  configJson = self.readConfiguration( config_filename )
218  if configJson == '':
219  return False
220  config = O2OJobConf( job_name=job_name, insertion_time = datetime.now(), configuration = configJson )
221  self.session.add(config)
222  self.session.commit()
223  O2OMgr.logger( self).info( "New configuration inserted for job '%s'", job_name )
224 
225  def setInterval( self, job_name, int_val ):
226  res = self.session.query(O2OJob.enabled).filter_by(name=job_name)
227  enabled = None
228  for r in res:
229  enabled = r
230  if enabled is None:
231  O2OMgr.logger( self).error( "A job called '%s' does not exist.", job_name )
232  return
233  job = O2OJob(name=job_name,interval=int_val)
234  self.session.merge(job)
235  self.session.commit()
236  O2OMgr.logger( self).info( "The execution interval for job '%s' has been updated.", job_name )
237 
238  def migrateConfig( self ):
239  res = self.session.query(O2OJob.name,O2OJob.tag_name)
240  for r in res:
241  job_name = r[0]
242  tag = r[1]
243  if tag != '-':
244  configDict = {}
245  configDict['tag']=tag
246  configJson = json.dumps( configDict )
247  config = O2OJobConf( job_name=job_name, insertion_time = datetime.now(), configuration = configJson )
248  self.session.add(config)
249  O2OMgr.logger( self).info( "Configuration for job '%s' inserted.", job_name )
250  self.session.commit()
251 
252  def listJobs( self ):
253  runs = {}
254  res = self.session.query(O2ORun.job_name,sqlalchemy.func.max(O2ORun.start_time)).group_by(O2ORun.job_name).order_by(O2ORun.job_name)
255  for r in res:
256  runs[r[0]] = str(r[1])
257  res = self.session.query(O2OJob.name, O2OJob.interval, O2OJob.enabled).order_by(O2OJob.name).all()
258  table = []
259  for r in res:
260  row = []
261  row.append(r[0]),
262  row.append('%5d' %r[1] )
263  enabled = 'Y' if (r[2]==1) else 'N'
264  row.append('%4s' %enabled )
265  lastRun = '-'
266  if r[0] in runs.keys():
267  lastRun = runs[r[0]]
268  row.append( lastRun )
269  table.append(row)
270  headers = ['Job name','Interval','Enabled','Last run start']
271  print_table( headers, table )
272 
273  def listConfig( self, jname ):
274  res = self.session.query(O2OJob.enabled).filter_by(name=jname)
275  enabled = None
276  for r in res:
277  enabled = r
278  if enabled is None:
279  O2OMgr.logger( self).error( "A job called '%s' does not exist.", jname )
280  return
281  res = self.session.query( O2OJobConf.configuration, O2OJobConf.insertion_time ).filter_by(job_name=jname).order_by(O2OJobConf.insertion_time)
282  configs = []
283  for r in res:
284  configs.append((str(r[0]),r[1]))
285  ind = len(configs)
286  if ind:
287  print "Configurations for job '%s'" %jname
288  for cf in reversed(configs):
289  print '#%2d since: %s' %(ind,cf[1])
290  print cf[0]
291  ind -= 1
292  else:
293  O2OMgr.logger( self ).info("No configuration found for job '%s'" %jname )
294 
295  def dumpConfig( self, jname, versionIndex, configFile ):
296  versionIndex = int(versionIndex)
297  res = self.session.query(O2OJob.enabled).filter_by(name=jname)
298  enabled = None
299  for r in res:
300  enabled = r
301  if enabled is None:
302  O2OMgr.logger( self).error( "A job called '%s' does not exist.", jname )
303  return
304  res = self.session.query( O2OJobConf.configuration, O2OJobConf.insertion_time ).filter_by(job_name=jname).order_by(O2OJobConf.insertion_time)
305  configs = []
306  for r in res:
307  configs.append((str(r[0]),r[1]))
308  ind = len(configs)
309  if versionIndex>ind or versionIndex==0:
310  O2OMgr.logger( self ).error("Configuration for job %s with index %s has not been found." %(jname,versionIndex))
311  return
312  print "Configuration #%2d for job '%s'" %(versionIndex,jname)
313  config = configs[versionIndex-1]
314  print '#%2d since %s' %(versionIndex,config[1])
315  print config[0]
316  if configFile is None or configFile == '':
317  configFile = '%s_%s.json' %(jname,versionIndex)
318  with open(configFile,'w') as json_file:
319  json_file.write(config[0])
320 
321 
323 
324  def __init__( self, db_connection, session, logger ):
325  self.job_name = None
326  self.start = None
327  self.end = None
328  self.conf_dict = {}
329  self.conf_dict['db'] = db_connection
330  self.session = session
331  self.logger = logger
332 
333  def startJob( self, job_name ):
334  self.logger.info('Checking job %s', job_name)
335  exists = None
336  enabled = None
337  try:
338  res = self.session.query(O2OJob.enabled,O2OJob.tag_name).filter_by(name=job_name)
339  for r in res:
340  exists = True
341  enabled = int(r[0])
342  self.tag_name = str(r[1])
343  if exists is None:
344  self.logger.error( 'The job %s is unknown.', job_name )
345  return 2
346  if enabled:
347  res = self.session.query(O2OJobConf.configuration).filter_by(job_name=job_name).order_by(sqlalchemy.desc(O2OJobConf.insertion_time)).first()
348  conf = None
349  for r in res:
350  conf = str(r)
351  if conf is None:
352  self.logger.warning("No configuration found for job '%s'" %job_name )
353  else:
354  try:
355  self.conf_dict.update( json.loads(conf) )
356  self.logger.info('Using configuration: %s ' %conf)
357  except Exception as e:
358  self.logger.error( str(e) )
359  return 6
360  self.job_name = job_name
361  self.start = datetime.now()
362  run = O2ORun(job_name=self.job_name,start_time=self.start,status_code=startStatus)
363  self.session.add(run)
364  self.session.commit()
365  return 0
366  else:
367  self.logger.info( 'The job %s has been disabled.', job_name )
368  return 5
369  except sqlalchemy.exc.SQLAlchemyError as dberror:
370  self.logger.error( str(dberror) )
371  return 7
372  return -1
373 
374 
375  def endJob( self, status, log ):
376  self.end = datetime.now()
377  try:
378  run = O2ORun(job_name=self.job_name,start_time=self.start,end_time=self.end,status_code=status,log=log)
379  self.session.merge(run)
380  self.session.commit()
381  self.logger.info( 'Job %s ended.', self.job_name )
382  return 0
383  except sqlalchemy.exc.SQLAlchemyError as dberror:
384  self.logger.error( str(dberror) )
385  return 8
386 
387  def executeJob( self, args ):
388  job_name = args.name
389  command = args.executable
390  logFolder = os.getcwd()
391  if logFolderEnvVar in os.environ:
392  logFolder = os.environ[logFolderEnvVar]
393  datelabel = datetime.now().strftime("%y-%m-%d-%H-%M-%S")
394  logFileName = '%s-%s.log' %(job_name,datelabel)
395  logFile = os.path.join(logFolder,logFileName)
396  started = self.startJob( job_name )
397  if started !=0:
398  return started
399  ret = -1
400  try:
401  # replacing %([key])s placeholders...
402  command = command %(self.conf_dict)
403  #replacing {[key]} placeholders
404  command = command.format(**self.conf_dict )
405  except KeyError as exc:
406  self.logger.error( "Unresolved template key %s in the command." %str(exc) )
407  return 3
408  self.logger.info('Command: "%s"', command )
409  try:
410  self.logger.info('Executing command...' )
411  pipe = subprocess.Popen( command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT )
412  out = ''
413  for line in iter(pipe.stdout.readline, ''):
414  if args.verbose>=1:
415  sys.stdout.write(line)
416  sys.stdout.flush()
417  out += line
418  pipe.communicate()
419  self.logger.info( 'Command returned code: %s' %pipe.returncode )
420  ret = pipe.returncode
421  except Exception as e:
422  self.logger.error( str(e) )
423  return 4
424  ended = self.endJob( pipe.returncode, out )
425  if ended != 0:
426  ret = ended
427  with open(logFile,'a') as logF:
428  logF.write(out)
429  return ret
430 
431 import optparse
432 import argparse
433 
434 class O2OTool():
435 
436  def execute(self):
437  parser = argparse.ArgumentParser(description='CMS o2o command-line tool. For general help (manual page), use the help subcommand.')
438  parser.add_argument('--db', type=str, help='The target database: pro ( for prod ) or dev ( for prep ). default=pro')
439  parser.add_argument("--auth","-a", type=str, help="The path of the authentication file")
440  parser.add_argument('--verbose', '-v', action='count', help='The verbosity level')
441  parser_subparsers = parser.add_subparsers(title='Available subcommands')
442  parser_create = parser_subparsers.add_parser('create', description='Create a new O2O job')
443  parser_create.add_argument('--name', '-n', type=str, help='The o2o job name',required=True)
444  parser_create.add_argument('--configFile', '-c', type=str, help='the JSON configuration file path',required=True)
445  parser_create.add_argument('--interval', '-i', type=int, help='the chron job interval',default=0)
446  parser_create.set_defaults(func=self.create)
447  parser_setConfig = parser_subparsers.add_parser('setConfig', description='Set a new configuration for the specified job. The configuration is expected as a list of entries "param": "value" (dictionary). The "param" labels will be used to inject the values in the command to execute. The dictionary is stored in JSON format.')
448  parser_setConfig.add_argument('--name', '-n', type=str, help='The o2o job name',required=True)
449  parser_setConfig.add_argument('--configFile', '-c', type=str, help='the JSON configuration file path',required=True)
450  parser_setConfig.set_defaults(func=self.setConfig)
451  parser_setInterval = parser_subparsers.add_parser('setInterval',description='Set a new execution interval for the specified job')
452  parser_setInterval.add_argument('--name', '-n', type=str, help='The o2o job name',required=True)
453  parser_setInterval.add_argument('--interval', '-i', type=int, help='the chron job interval',required=True)
454  parser_setInterval.set_defaults(func=self.setInterval)
455  parser_enable = parser_subparsers.add_parser('enable',description='enable the O2O job')
456  parser_enable.add_argument('--name', '-n', type=str, help='The o2o job name',required=True)
457  parser_enable.set_defaults(func=self.enable)
458  parser_disable = parser_subparsers.add_parser('disable',description='disable the O2O job')
459  parser_disable.add_argument('--name', '-n', type=str, help='The o2o job name',required=True)
460  parser_disable.set_defaults(func=self.disable)
461  parser_migrateConf = parser_subparsers.add_parser('migrateConfig',description='migrate the tag info for the jobs in configuration entries')
462  parser_migrateConf.set_defaults(func=self.migrate)
463  parser_listJobs = parser_subparsers.add_parser('listJobs', description='list the registered jobs')
464  parser_listJobs.set_defaults(func=self.listJobs)
465  parser_listConf = parser_subparsers.add_parser('listConfig', description='shows the configurations for the specified job')
466  parser_listConf.add_argument('--name', '-n', type=str, help='The o2o job name',required=True)
467  parser_listConf.add_argument('--dump', type=int, help='Dump the specified config.',default=0)
468  parser_listConf.set_defaults(func=self.listConf)
469  parser_dumpConf = parser_subparsers.add_parser('dumpConfig', description='dumps a specific job configuration version')
470  parser_dumpConf.add_argument('versionIndex', type=str,help='the version to dump')
471  parser_dumpConf.add_argument('--name', '-n', type=str, help='The o2o job name',required=True)
472  parser_dumpConf.add_argument('--configFile', '-c', type=str, help='the JSON configuration file name - default:[jobname]_[version].json')
473  parser_dumpConf.set_defaults(func=self.dumpConf)
474  parser_run = parser_subparsers.add_parser('run', description='Wrapper for O2O jobs execution. Supports input parameter injection from the configuration file associated to the job. The formatting syntax supported are the python ones: "command -paramName {paramLabel}" or "command -paramName %(paramLabel)s". where [paramName] is the name of the parameter required for the command, and [paramLabel] is the key of the parameter entry in the config dictionary (recommended to be equal for clarity!"')
475  parser_run.add_argument('executable', type=str,help='command to execute')
476  parser_run.add_argument('--name', '-n', type=str, help='The o2o job name',required=True)
477  parser_run.set_defaults(func=self.run)
478 
479  args = parser.parse_args()
480 
481  if args.verbose >=1:
482  self.setup(args)
483  return args.func()
484  else:
485  try:
486  self.setup(args)
487  sys.exit( args.func())
488  except Exception as e:
489  logging.error(e)
490  sys.exit(1)
491 
492  def setup(self, args):
493  self.args = args
494  db_service = prod_db_service
495  if args.db is not None:
496  if args.db == 'dev' or args.db == 'oradev' :
497  db_service = dev_db_service
498  elif args.db != 'orapro' and args.db != 'onlineorapro' and args.db != 'pro':
499  raise Exception("Database '%s' is not known." %args.db )
500 
501  self.mgr = O2OJobMgr()
502  return self.mgr.connect( db_service, args )
503 
504  def create(self):
505  self.mgr.add( self.args.name, self.args.configFile, self.args.interval, True )
506 
507  def setConfig(self):
508  self.mgr.setConfig( self.args.name, self.args.configFile )
509 
510  def setInterval(self):
511  self.mgr.setInterval( self.args.name, self.args.interval )
512 
513  def enable(self):
514  self.mgr.set( self.args.name, True )
515 
516  def disable(self):
517  self.mgr.set( self.args.name, False )
518 
519  def migrate(self):
520  self.mgr.migrateConfig()
521 
522  def listJobs(self):
523  self.mgr.listJobs()
524 
525  def listConf(self):
526  self.mgr.listConfig( self.args.name )
527 
528  def dumpConf(self):
529  self.mgr.dumpConfig( self.args.name, self.args.versionIndex, self.args.configFile )
530 
531  def run(self):
532  rmgr = self.mgr.runManager()
533  return rmgr.executeJob( self.args )
def listJobs(self)
Definition: o2olib.py:252
def disable(self)
Definition: o2olib.py:516
def listJobs(self)
Definition: o2olib.py:522
def setup(self, args)
Definition: o2olib.py:492
static const TGPicture * info(bool iBackgroundIsBlack)
def setConfig(self, job_name, config_filename)
Definition: o2olib.py:209
def endJob(self, status, log)
Definition: o2olib.py:375
def dumpConf(self)
Definition: o2olib.py:528
def add(self, job_name, config_filename, int_val, en_flag)
Definition: o2olib.py:174
def listConfig(self, jname)
Definition: o2olib.py:273
def __init__(self)
Definition: o2olib.py:99
def setInterval(self, job_name, int_val)
Definition: o2olib.py:225
def setInterval(self)
Definition: o2olib.py:510
def migrate(self)
Definition: o2olib.py:519
def runManager(self)
Definition: o2olib.py:171
def connect(self, service, args)
Definition: o2olib.py:162
def executeJob(self, args)
Definition: o2olib.py:387
def readConfiguration(self, config_filename)
Definition: o2olib.py:146
def __init__(self)
Definition: o2olib.py:140
def set(self, job_name, en_flag)
Definition: o2olib.py:193
def startJob(self, job_name)
Definition: o2olib.py:333
def run(self)
Definition: o2olib.py:531
def getSession(self, db_service, auth)
Definition: o2olib.py:110
def migrateConfig(self)
Definition: o2olib.py:238
def __init__(self, db_connection, session, logger)
Definition: o2olib.py:324
def enable(self)
Definition: o2olib.py:513
def dumpConfig(self, jname, versionIndex, configFile)
Definition: o2olib.py:295
def print_table(headers, table)
Definition: o2olib.py:68
def create(self)
Definition: o2olib.py:504
def get_db_credentials(db_service, authFile)
Definition: o2olib.py:64
def setConfig(self)
Definition: o2olib.py:507
def listConf(self)
Definition: o2olib.py:525
def execute(self)
Definition: o2olib.py:436