CMS 3D CMS Logo

o2olib.py
Go to the documentation of this file.
1 from __future__ import print_function
2 __author__ = 'Giacomo Govi'
3 
4 import sqlalchemy
5 import sqlalchemy.ext.declarative
6 import subprocess
7 from datetime import datetime
8 import os
9 import sys
10 import logging
11 import string
12 import json
13 
14 import CondCore.Utilities.credentials as auth
15 
16 prod_db_service = ['cms_orcon_prod','cms_orcon_prod/cms_cond_general_w']
17 dev_db_service = ['cms_orcoff_prep','cms_orcoff_prep/cms_test_conditions']
18 schema_name = 'CMS_CONDITIONS'
19 sqlalchemy_tpl = 'oracle://%s:%s@%s'
20 coral_tpl = 'oracle://%s/%s'
21 private_db = 'sqlite:///o2o_jobs.db'
22 startStatus = -1
23 authPathEnvVar = 'COND_AUTH_PATH'
24 messageLevelEnvVar = 'O2O_LOG_LEVEL'
25 logFolderEnvVar = 'O2O_LOG_FOLDER'
26 
27 _Base = sqlalchemy.ext.declarative.declarative_base()
28 
29 fmt_str = "[%(asctime)s] %(levelname)s: %(message)s"
30 logLevel = logging.INFO
31 if messageLevelEnvVar in os.environ:
32  levStr = os.environ[messageLevelEnvVar]
33  if levStr == 'DEBUG':
34  logLevel = logging.DEBUG
35 logFormatter = logging.Formatter(fmt_str)
36 
37 class O2OJob(_Base):
38  __tablename__ = 'O2O_JOB'
39  __table_args__ = {'schema' : schema_name}
40  name = sqlalchemy.Column(sqlalchemy.String(100), primary_key=True)
41  enabled = sqlalchemy.Column(sqlalchemy.Integer, nullable=False)
42  tag_name = sqlalchemy.Column(sqlalchemy.String(100), nullable=False)
43  interval = sqlalchemy.Column(sqlalchemy.Integer, nullable=False)
44 
45 class O2OJobConf(_Base):
46  __tablename__ = 'O2O_JOB_CONF'
47  __table_args__ = {'schema' : schema_name}
48  job_name = sqlalchemy.Column(sqlalchemy.ForeignKey(O2OJob.name), primary_key=True)
49  insertion_time = sqlalchemy.Column(sqlalchemy.TIMESTAMP, primary_key=True)
50  configuration = sqlalchemy.Column(sqlalchemy.String(4000), nullable=False)
51 
52  job = sqlalchemy.orm.relationship('O2OJob', primaryjoin="O2OJob.name==O2OJobConf.job_name")
53 
54 class O2ORun(_Base):
55  __tablename__ = 'O2O_RUN'
56  __table_args__ = {'schema' : schema_name}
57  job_name = sqlalchemy.Column(sqlalchemy.ForeignKey(O2OJob.name), primary_key=True)
58  start_time = sqlalchemy.Column(sqlalchemy.TIMESTAMP, primary_key=True)
59  end_time = sqlalchemy.Column(sqlalchemy.TIMESTAMP, nullable=True)
60  status_code = sqlalchemy.Column(sqlalchemy.Integer, nullable=False)
61  log = sqlalchemy.Column(sqlalchemy.CLOB, nullable=True)
62 
63  job = sqlalchemy.orm.relationship('O2OJob', primaryjoin="O2OJob.name==O2ORun.job_name")
64 
65 def get_db_credentials( db_service, authFile ):
66  (username, account, pwd) = auth.get_credentials( authPathEnvVar, db_service[1], authFile )
67  return username,pwd
68 
69 def print_table( headers, table ):
70  ws = []
71  for h in headers:
72  ws.append(len(h))
73  for row in table:
74  ind = 0
75  for c in row:
76  if ind<len(ws):
77  if len(c)> ws[ind]:
78  ws[ind] = len(c)
79  ind += 1
80 
81  def printf( row ):
82  line = ''
83  ind = 0
84  for w in ws:
85  fmt = '{:<%s}' %w
86  if ind<len(ws):
87  line += (fmt.format( row[ind] )+' ')
88  ind += 1
89  print(line)
90  printf( headers )
91  hsep = ''
92  for w in ws:
93  fmt = '{:-<%s}' %w
94  hsep += (fmt.format('')+' ')
95  print(hsep)
96  for row in table:
97  printf( row )
98 
99 class O2OMgr(object):
100  def __init__(self):
101  self.logger = logging.getLogger()
102  self.logger.setLevel(logLevel)
103  consoleHandler = logging.StreamHandler(sys.stdout)
104  consoleHandler.setFormatter(logFormatter)
105  self.logger.addHandler(consoleHandler)
106  self.eng = None
107 
108  def logger( self ):
109  return self.logger
110 
111  def getSession( self, db_service, auth ):
112  url = None
113  if db_service is None:
114  url = private_db
115  else:
116  self.logger.info('Getting credentials')
117  if auth is not None:
118  if not os.path.exists(auth):
119  self.logger.error('Authentication path %s is invalid.' %auth)
120  return None
121  try:
122  username, pwd = get_db_credentials( db_service, auth )
123  except Exception as e:
124  logging.debug(str(e))
125  username = None
126  pwd = None
127  if username is None:
128  logging.error('Credentials for service %s (machine=%s) are not available' %(db_service[0],db_service[1]))
129  raise Exception("Cannot connect to db %s" %db_service[0] )
130  url = sqlalchemy_tpl %(username,pwd,db_service[0])
131  session = None
132  try:
133  self.eng = sqlalchemy.create_engine( url )
134  session = sqlalchemy.orm.scoped_session( sqlalchemy.orm.sessionmaker(bind=self.eng))
135  except sqlalchemy.exc.SQLAlchemyError as dberror:
136  self.logger.error( str(dberror) )
137  return session
138 
140 
141  def __init__( self ):
142  O2OMgr.__init__(self)
143  self.db_connection = None
144  self.conf_dict = {}
145 
146 
147  def readConfiguration( self, config_filename ):
148  config = ''
149  try:
150  with open( config_filename, 'r' ) as config_file:
151  config = config_file.read()
152  if config == '':
153  O2OMgr.logger( self).error( 'The file %s contains an empty string.', config_filename )
154  else:
155  json.loads(config)
156  except IOError as e:
157  O2OMgr.logger( self).error( 'The file %s cannot be open.', config_filename )
158  except ValueError as e:
159  config = ''
160  O2OMgr.logger( self).error( 'The file %s contains an invalid json string.', config_filename )
161  return config
162 
163  def connect( self, service, args ):
164  self.session = O2OMgr.getSession( self,service, args.auth )
165  self.verbose = args.verbose
166  if self.session is None:
167  return False
168  else:
169  self.db_connection = coral_tpl %(service[0],schema_name)
170  self.conf_dict['db']=self.db_connection
171  return True
172  def runManager( self ):
173  return O2ORunMgr( self.db_connection, self.session, O2OMgr.logger( self ) )
174 
175  def add( self, job_name, config_filename, int_val, en_flag ):
176  res = self.session.query(O2OJob.enabled).filter_by(name=job_name)
177  enabled = None
178  for r in res:
179  enabled = r
180  if enabled:
181  O2OMgr.logger( self).error( "A job called '%s' exists already.", job_name )
182  return False
183  configJson = self.readConfiguration( config_filename )
184  if configJson == '':
185  return False
186  job = O2OJob(name=job_name,tag_name='-',enabled=en_flag,interval=int_val)
187  config = O2OJobConf( job_name=job_name, insertion_time = datetime.now(), configuration = configJson )
188  self.session.add(job)
189  self.session.add(config)
190  self.session.commit()
191  O2OMgr.logger( self).info( "New o2o job '%s' created.", job_name )
192  return True
193 
194  def set( self, job_name, en_flag ):
195  res = self.session.query(O2OJob.enabled).filter_by(name=job_name)
196  enabled = None
197  for r in res:
198  enabled = r
199  if enabled is None:
200  O2OMgr.logger( self).error( "A job called '%s' does not exist.", job_name )
201  return
202  job = O2OJob(name=job_name,enabled=en_flag)
203  action = 'enabled'
204  if not en_flag:
205  action = 'disabled'
206  self.session.merge(job)
207  self.session.commit()
208  O2OMgr.logger( self).info( "Job '%s' %s." %(job_name,action) )
209 
210  def setConfig( self, job_name, config_filename ):
211  res = self.session.query(O2OJob.enabled).filter_by(name=job_name)
212  enabled = None
213  for r in res:
214  enabled = r
215  if enabled is None:
216  O2OMgr.logger( self).error( "A job called '%s' does not exist.", job_name )
217  return
218  configJson = self.readConfiguration( config_filename )
219  if configJson == '':
220  return False
221  config = O2OJobConf( job_name=job_name, insertion_time = datetime.now(), configuration = configJson )
222  self.session.add(config)
223  self.session.commit()
224  O2OMgr.logger( self).info( "New configuration inserted for job '%s'", job_name )
225 
226  def setInterval( self, job_name, int_val ):
227  res = self.session.query(O2OJob.enabled).filter_by(name=job_name)
228  enabled = None
229  for r in res:
230  enabled = r
231  if enabled is None:
232  O2OMgr.logger( self).error( "A job called '%s' does not exist.", job_name )
233  return
234  job = O2OJob(name=job_name,interval=int_val)
235  self.session.merge(job)
236  self.session.commit()
237  O2OMgr.logger( self).info( "The execution interval for job '%s' has been updated.", job_name )
238 
239  def migrateConfig( self ):
240  res = self.session.query(O2OJob.name,O2OJob.tag_name)
241  for r in res:
242  job_name = r[0]
243  tag = r[1]
244  if tag != '-':
245  configDict = {}
246  configDict['tag']=tag
247  configJson = json.dumps( configDict )
248  config = O2OJobConf( job_name=job_name, insertion_time = datetime.now(), configuration = configJson )
249  self.session.add(config)
250  O2OMgr.logger( self).info( "Configuration for job '%s' inserted.", job_name )
251  self.session.commit()
252 
253  def listJobs( self ):
254  runs = {}
255  res = self.session.query(O2ORun.job_name,sqlalchemy.func.max(O2ORun.start_time)).group_by(O2ORun.job_name).order_by(O2ORun.job_name)
256  for r in res:
257  runs[r[0]] = str(r[1])
258  res = self.session.query(O2OJob.name, O2OJob.interval, O2OJob.enabled).order_by(O2OJob.name).all()
259  table = []
260  for r in res:
261  row = []
262  row.append(r[0]),
263  row.append('%5d' %r[1] )
264  enabled = 'Y' if (r[2]==1) else 'N'
265  row.append('%4s' %enabled )
266  lastRun = '-'
267  if r[0] in runs.keys():
268  lastRun = runs[r[0]]
269  row.append( lastRun )
270  table.append(row)
271  headers = ['Job name','Interval','Enabled','Last run start']
272  print_table( headers, table )
273 
274  def listConfig( self, jname ):
275  res = self.session.query(O2OJob.enabled).filter_by(name=jname)
276  enabled = None
277  for r in res:
278  enabled = r
279  if enabled is None:
280  O2OMgr.logger( self).error( "A job called '%s' does not exist.", jname )
281  return
282  res = self.session.query( O2OJobConf.configuration, O2OJobConf.insertion_time ).filter_by(job_name=jname).order_by(O2OJobConf.insertion_time)
283  configs = []
284  for r in res:
285  configs.append((str(r[0]),r[1]))
286  ind = len(configs)
287  if ind:
288  print("Configurations for job '%s'" %jname)
289  for cf in reversed(configs):
290  print('#%2d since: %s' %(ind,cf[1]))
291  print(cf[0])
292  ind -= 1
293  else:
294  O2OMgr.logger( self ).info("No configuration found for job '%s'" %jname )
295 
296  def dumpConfig( self, jname, versionIndex, configFile ):
297  versionIndex = int(versionIndex)
298  res = self.session.query(O2OJob.enabled).filter_by(name=jname)
299  enabled = None
300  for r in res:
301  enabled = r
302  if enabled is None:
303  O2OMgr.logger( self).error( "A job called '%s' does not exist.", jname )
304  return
305  res = self.session.query( O2OJobConf.configuration, O2OJobConf.insertion_time ).filter_by(job_name=jname).order_by(O2OJobConf.insertion_time)
306  configs = []
307  for r in res:
308  configs.append((str(r[0]),r[1]))
309  ind = len(configs)
310  if versionIndex>ind or versionIndex==0:
311  O2OMgr.logger( self ).error("Configuration for job %s with index %s has not been found." %(jname,versionIndex))
312  return
313  print("Configuration #%2d for job '%s'" %(versionIndex,jname))
314  config = configs[versionIndex-1]
315  print('#%2d since %s' %(versionIndex,config[1]))
316  print(config[0])
317  if configFile is None or configFile == '':
318  configFile = '%s_%s.json' %(jname,versionIndex)
319  with open(configFile,'w') as json_file:
320  json_file.write(config[0])
321 
322 
324 
325  def __init__( self, db_connection, session, logger ):
326  self.job_name = None
327  self.start = None
328  self.end = None
329  self.conf_dict = {}
330  self.conf_dict['db'] = db_connection
331  self.session = session
332  self.logger = logger
333 
334  def startJob( self, job_name ):
335  self.logger.info('Checking job %s', job_name)
336  exists = None
337  enabled = None
338  try:
339  res = self.session.query(O2OJob.enabled,O2OJob.tag_name).filter_by(name=job_name)
340  for r in res:
341  exists = True
342  enabled = int(r[0])
343  self.tag_name = str(r[1])
344  if exists is None:
345  self.logger.error( 'The job %s is unknown.', job_name )
346  return 2
347  if enabled:
348  res = self.session.query(O2OJobConf.configuration).filter_by(job_name=job_name).order_by(sqlalchemy.desc(O2OJobConf.insertion_time)).first()
349  conf = None
350  for r in res:
351  conf = str(r)
352  if conf is None:
353  self.logger.warning("No configuration found for job '%s'" %job_name )
354  else:
355  try:
356  self.conf_dict.update( json.loads(conf) )
357  self.logger.info('Using configuration: %s ' %conf)
358  except Exception as e:
359  self.logger.error( str(e) )
360  return 6
361  self.job_name = job_name
362  self.start = datetime.now()
363  run = O2ORun(job_name=self.job_name,start_time=self.start,status_code=startStatus)
364  self.session.add(run)
365  self.session.commit()
366  return 0
367  else:
368  self.logger.info( 'The job %s has been disabled.', job_name )
369  return 5
370  except sqlalchemy.exc.SQLAlchemyError as dberror:
371  self.logger.error( str(dberror) )
372  return 7
373  return -1
374 
375 
376  def endJob( self, status, log ):
377  self.end = datetime.now()
378  try:
379  run = O2ORun(job_name=self.job_name,start_time=self.start,end_time=self.end,status_code=status,log=log)
380  self.session.merge(run)
381  self.session.commit()
382  self.logger.info( 'Job %s ended.', self.job_name )
383  return 0
384  except sqlalchemy.exc.SQLAlchemyError as dberror:
385  self.logger.error( str(dberror) )
386  return 8
387 
388  def executeJob( self, args ):
389  job_name = args.name
390  command = args.executable
391  logFolder = os.getcwd()
392  if logFolderEnvVar in os.environ:
393  logFolder = os.environ[logFolderEnvVar]
394  datelabel = datetime.now().strftime("%y-%m-%d-%H-%M-%S")
395  logFileName = '%s-%s.log' %(job_name,datelabel)
396  logFile = os.path.join(logFolder,logFileName)
397  started = self.startJob( job_name )
398  if started !=0:
399  return started
400  ret = -1
401  try:
402  # replacing %([key])s placeholders...
403  command = command %(self.conf_dict)
404  #replacing {[key]} placeholders
405  command = command.format(**self.conf_dict )
406  except KeyError as exc:
407  self.logger.error( "Unresolved template key %s in the command." %str(exc) )
408  return 3
409  self.logger.info('Command: "%s"', command )
410  try:
411  self.logger.info('Executing command...' )
412  pipe = subprocess.Popen( command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT )
413  out = ''
414  for line in iter(pipe.stdout.readline, ''):
415  if args.verbose>=1:
416  sys.stdout.write(line)
417  sys.stdout.flush()
418  out += line
419  pipe.communicate()
420  self.logger.info( 'Command returned code: %s' %pipe.returncode )
421  ret = pipe.returncode
422  except Exception as e:
423  self.logger.error( str(e) )
424  return 4
425  ended = self.endJob( pipe.returncode, out )
426  if ended != 0:
427  ret = ended
428  with open(logFile,'a') as logF:
429  logF.write(out)
430  return ret
431 
432 import optparse
433 import argparse
434 
435 class O2OTool():
436 
437  def execute(self):
438  parser = argparse.ArgumentParser(description='CMS o2o command-line tool. For general help (manual page), use the help subcommand.')
439  parser.add_argument('--db', type=str, help='The target database: pro ( for prod ) or dev ( for prep ). default=pro')
440  parser.add_argument("--auth","-a", type=str, help="The path of the authentication file")
441  parser.add_argument('--verbose', '-v', action='count', help='The verbosity level')
442  parser_subparsers = parser.add_subparsers(title='Available subcommands')
443  parser_create = parser_subparsers.add_parser('create', description='Create a new O2O job')
444  parser_create.add_argument('--name', '-n', type=str, help='The o2o job name',required=True)
445  parser_create.add_argument('--configFile', '-c', type=str, help='the JSON configuration file path',required=True)
446  parser_create.add_argument('--interval', '-i', type=int, help='the chron job interval',default=0)
447  parser_create.set_defaults(func=self.create)
448  parser_setConfig = parser_subparsers.add_parser('setConfig', description='Set a new configuration for the specified job. The configuration is expected as a list of entries "param": "value" (dictionary). The "param" labels will be used to inject the values in the command to execute. The dictionary is stored in JSON format.')
449  parser_setConfig.add_argument('--name', '-n', type=str, help='The o2o job name',required=True)
450  parser_setConfig.add_argument('--configFile', '-c', type=str, help='the JSON configuration file path',required=True)
451  parser_setConfig.set_defaults(func=self.setConfig)
452  parser_setInterval = parser_subparsers.add_parser('setInterval',description='Set a new execution interval for the specified job')
453  parser_setInterval.add_argument('--name', '-n', type=str, help='The o2o job name',required=True)
454  parser_setInterval.add_argument('--interval', '-i', type=int, help='the chron job interval',required=True)
455  parser_setInterval.set_defaults(func=self.setInterval)
456  parser_enable = parser_subparsers.add_parser('enable',description='enable the O2O job')
457  parser_enable.add_argument('--name', '-n', type=str, help='The o2o job name',required=True)
458  parser_enable.set_defaults(func=self.enable)
459  parser_disable = parser_subparsers.add_parser('disable',description='disable the O2O job')
460  parser_disable.add_argument('--name', '-n', type=str, help='The o2o job name',required=True)
461  parser_disable.set_defaults(func=self.disable)
462  parser_migrateConf = parser_subparsers.add_parser('migrateConfig',description='migrate the tag info for the jobs in configuration entries')
463  parser_migrateConf.set_defaults(func=self.migrate)
464  parser_listJobs = parser_subparsers.add_parser('listJobs', description='list the registered jobs')
465  parser_listJobs.set_defaults(func=self.listJobs)
466  parser_listConf = parser_subparsers.add_parser('listConfig', description='shows the configurations for the specified job')
467  parser_listConf.add_argument('--name', '-n', type=str, help='The o2o job name',required=True)
468  parser_listConf.add_argument('--dump', type=int, help='Dump the specified config.',default=0)
469  parser_listConf.set_defaults(func=self.listConf)
470  parser_dumpConf = parser_subparsers.add_parser('dumpConfig', description='dumps a specific job configuration version')
471  parser_dumpConf.add_argument('versionIndex', type=str,help='the version to dump')
472  parser_dumpConf.add_argument('--name', '-n', type=str, help='The o2o job name',required=True)
473  parser_dumpConf.add_argument('--configFile', '-c', type=str, help='the JSON configuration file name - default:[jobname]_[version].json')
474  parser_dumpConf.set_defaults(func=self.dumpConf)
475  parser_run = parser_subparsers.add_parser('run', description='Wrapper for O2O jobs execution. Supports input parameter injection from the configuration file associated to the job. The formatting syntax supported are the python ones: "command -paramName {paramLabel}" or "command -paramName %(paramLabel)s". where [paramName] is the name of the parameter required for the command, and [paramLabel] is the key of the parameter entry in the config dictionary (recommended to be equal for clarity!"')
476  parser_run.add_argument('executable', type=str,help='command to execute')
477  parser_run.add_argument('--name', '-n', type=str, help='The o2o job name',required=True)
478  parser_run.set_defaults(func=self.run)
479 
480  args = parser.parse_args()
481 
482  if args.verbose >=1:
483  self.setup(args)
484  return args.func()
485  else:
486  try:
487  self.setup(args)
488  sys.exit( args.func())
489  except Exception as e:
490  logging.error(e)
491  sys.exit(1)
492 
493  def setup(self, args):
494  self.args = args
495  db_service = prod_db_service
496  if args.db is not None:
497  if args.db == 'dev' or args.db == 'oradev' :
498  db_service = dev_db_service
499  elif args.db != 'orapro' and args.db != 'onlineorapro' and args.db != 'pro':
500  raise Exception("Database '%s' is not known." %args.db )
501 
502  self.mgr = O2OJobMgr()
503  return self.mgr.connect( db_service, args )
504 
505  def create(self):
506  self.mgr.add( self.args.name, self.args.configFile, self.args.interval, True )
507 
508  def setConfig(self):
509  self.mgr.setConfig( self.args.name, self.args.configFile )
510 
511  def setInterval(self):
512  self.mgr.setInterval( self.args.name, self.args.interval )
513 
514  def enable(self):
515  self.mgr.set( self.args.name, True )
516 
517  def disable(self):
518  self.mgr.set( self.args.name, False )
519 
520  def migrate(self):
521  self.mgr.migrateConfig()
522 
523  def listJobs(self):
524  self.mgr.listJobs()
525 
526  def listConf(self):
527  self.mgr.listConfig( self.args.name )
528 
529  def dumpConf(self):
530  self.mgr.dumpConfig( self.args.name, self.args.versionIndex, self.args.configFile )
531 
532  def run(self):
533  rmgr = self.mgr.runManager()
534  return rmgr.executeJob( self.args )
def listJobs(self)
Definition: o2olib.py:253
def disable(self)
Definition: o2olib.py:517
def listJobs(self)
Definition: o2olib.py:523
def setup(self, args)
Definition: o2olib.py:493
static const TGPicture * info(bool iBackgroundIsBlack)
def setConfig(self, job_name, config_filename)
Definition: o2olib.py:210
def endJob(self, status, log)
Definition: o2olib.py:376
def dumpConf(self)
Definition: o2olib.py:529
def add(self, job_name, config_filename, int_val, en_flag)
Definition: o2olib.py:175
def listConfig(self, jname)
Definition: o2olib.py:274
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
def __init__(self)
Definition: o2olib.py:100
def setInterval(self, job_name, int_val)
Definition: o2olib.py:226
def setInterval(self)
Definition: o2olib.py:511
def migrate(self)
Definition: o2olib.py:520
def runManager(self)
Definition: o2olib.py:172
def connect(self, service, args)
Definition: o2olib.py:163
def executeJob(self, args)
Definition: o2olib.py:388
def readConfiguration(self, config_filename)
Definition: o2olib.py:147
def __init__(self)
Definition: o2olib.py:141
def set(self, job_name, en_flag)
Definition: o2olib.py:194
def startJob(self, job_name)
Definition: o2olib.py:334
def run(self)
Definition: o2olib.py:532
def getSession(self, db_service, auth)
Definition: o2olib.py:111
def migrateConfig(self)
Definition: o2olib.py:239
def __init__(self, db_connection, session, logger)
Definition: o2olib.py:325
def enable(self)
Definition: o2olib.py:514
def dumpConfig(self, jname, versionIndex, configFile)
Definition: o2olib.py:296
def print_table(headers, table)
Definition: o2olib.py:69
#define str(s)
def create(self)
Definition: o2olib.py:505
def get_db_credentials(db_service, authFile)
Definition: o2olib.py:65
def setConfig(self)
Definition: o2olib.py:508
def listConf(self)
Definition: o2olib.py:526
def execute(self)
Definition: o2olib.py:437