1 __author__ =
'Giacomo Govi' 4 import sqlalchemy.ext.declarative
6 from datetime
import datetime
13 import CondCore.Utilities.credentials
as auth
15 prod_db_service = [
'cms_orcon_prod',
'cms_orcon_prod/cms_cond_general_w']
16 dev_db_service = [
'cms_orcoff_prep',
'cms_orcoff_prep/cms_test_conditions']
17 schema_name =
'CMS_CONDITIONS' 18 sqlalchemy_tpl =
'oracle://%s:%s@%s' 19 coral_tpl =
'oracle://%s/%s' 20 private_db =
'sqlite:///o2o_jobs.db' 22 authPathEnvVar =
'COND_AUTH_PATH' 23 messageLevelEnvVar =
'O2O_LOG_LEVEL' 24 logFolderEnvVar =
'O2O_LOG_FOLDER' 26 _Base = sqlalchemy.ext.declarative.declarative_base()
28 fmt_str =
"[%(asctime)s] %(levelname)s: %(message)s" 29 logLevel = logging.INFO
30 if messageLevelEnvVar
in os.environ:
31 levStr = os.environ[messageLevelEnvVar]
33 logLevel = logging.DEBUG
34 logFormatter = logging.Formatter(fmt_str)
37 __tablename__ =
'O2O_JOB' 38 __table_args__ = {
'schema' : schema_name}
39 name = sqlalchemy.Column(sqlalchemy.String(100), primary_key=
True)
40 enabled = sqlalchemy.Column(sqlalchemy.Integer, nullable=
False)
41 tag_name = sqlalchemy.Column(sqlalchemy.String(100), nullable=
False)
42 interval = sqlalchemy.Column(sqlalchemy.Integer, nullable=
False)
45 __tablename__ =
'O2O_JOB_CONF' 46 __table_args__ = {
'schema' : schema_name}
47 job_name = sqlalchemy.Column(sqlalchemy.ForeignKey(O2OJob.name), primary_key=
True)
48 insertion_time = sqlalchemy.Column(sqlalchemy.TIMESTAMP, primary_key=
True)
49 configuration = sqlalchemy.Column(sqlalchemy.String(4000), nullable=
False)
51 job = sqlalchemy.orm.relationship(
'O2OJob', primaryjoin=
"O2OJob.name==O2OJobConf.job_name")
54 __tablename__ =
'O2O_RUN' 55 __table_args__ = {
'schema' : schema_name}
56 job_name = sqlalchemy.Column(sqlalchemy.ForeignKey(O2OJob.name), primary_key=
True)
57 start_time = sqlalchemy.Column(sqlalchemy.TIMESTAMP, primary_key=
True)
58 end_time = sqlalchemy.Column(sqlalchemy.TIMESTAMP, nullable=
True)
59 status_code = sqlalchemy.Column(sqlalchemy.Integer, nullable=
False)
60 log = sqlalchemy.Column(sqlalchemy.CLOB, nullable=
True)
62 job = sqlalchemy.orm.relationship(
'O2OJob', primaryjoin=
"O2OJob.name==O2ORun.job_name")
65 (username, account, pwd) = auth.get_credentials( authPathEnvVar, db_service[1], authFile )
86 line += (fmt.format( row[ind] )+
' ')
93 hsep += (fmt.format(
'')+
' ')
101 self.logger.setLevel(logLevel)
102 consoleHandler = logging.StreamHandler(sys.stdout)
103 consoleHandler.setFormatter(logFormatter)
104 self.logger.addHandler(consoleHandler)
112 if db_service
is None:
115 self.logger.info(
'Getting credentials')
117 if not os.path.exists(auth):
118 self.logger.error(
'Authentication path %s is invalid.' %auth)
122 except Exception
as e:
123 logging.debug(
str(e))
127 logging.error(
'Credentials for service %s (machine=%s) are not available' %(db_service[0],db_service[1]))
128 raise Exception(
"Cannot connect to db %s" %db_service[0] )
129 url = sqlalchemy_tpl %(username,pwd,db_service[0])
132 self.
eng = sqlalchemy.create_engine( url )
133 session = sqlalchemy.orm.scoped_session( sqlalchemy.orm.sessionmaker(bind=self.
eng))
134 except sqlalchemy.exc.SQLAlchemyError
as dberror:
135 self.logger.error(
str(dberror) )
141 O2OMgr.__init__(self)
149 with open( config_filename,
'r' ) as config_file: 150 config = config_file.read() 152 O2OMgr.logger( self).
error(
'The file %s contains an empty string.', config_filename )
156 O2OMgr.logger( self).
error(
'The file %s cannot be open.', config_filename )
157 except ValueError
as e:
159 O2OMgr.logger( self).
error(
'The file %s contains an invalid json string.', config_filename )
163 self.
session = O2OMgr.getSession( self,service, args.auth )
174 def add( self, job_name, config_filename, int_val, en_flag ):
175 res = self.session.query(O2OJob.enabled).filter_by(name=job_name)
180 O2OMgr.logger( self).
error(
"A job called '%s' exists already.", job_name )
185 job =
O2OJob(name=job_name,tag_name=
'-',enabled=en_flag,interval=int_val)
186 config =
O2OJobConf( job_name=job_name, insertion_time = datetime.now(), configuration = configJson )
187 self.session.add(job)
188 self.session.add(config)
189 self.session.commit()
190 O2OMgr.logger( self).
info(
"New o2o job '%s' created.", job_name )
193 def set( self, job_name, en_flag ):
194 res = self.session.query(O2OJob.enabled).filter_by(name=job_name)
199 O2OMgr.logger( self).
error(
"A job called '%s' does not exist.", job_name )
201 job =
O2OJob(name=job_name,enabled=en_flag)
205 self.session.merge(job)
206 self.session.commit()
207 O2OMgr.logger( self).
info(
"Job '%s' %s." %(job_name,action) )
210 res = self.session.query(O2OJob.enabled).filter_by(name=job_name)
215 O2OMgr.logger( self).
error(
"A job called '%s' does not exist.", job_name )
220 config =
O2OJobConf( job_name=job_name, insertion_time = datetime.now(), configuration = configJson )
221 self.session.add(config)
222 self.session.commit()
223 O2OMgr.logger( self).
info(
"New configuration inserted for job '%s'", job_name )
226 res = self.session.query(O2OJob.enabled).filter_by(name=job_name)
231 O2OMgr.logger( self).
error(
"A job called '%s' does not exist.", job_name )
233 job =
O2OJob(name=job_name,interval=int_val)
234 self.session.merge(job)
235 self.session.commit()
236 O2OMgr.logger( self).
info(
"The execution interval for job '%s' has been updated.", job_name )
239 res = self.session.query(O2OJob.name,O2OJob.tag_name)
245 configDict[
'tag']=tag
246 configJson = json.dumps( configDict )
247 config =
O2OJobConf( job_name=job_name, insertion_time = datetime.now(), configuration = configJson )
248 self.session.add(config)
249 O2OMgr.logger( self).
info(
"Configuration for job '%s' inserted.", job_name )
250 self.session.commit()
254 res = self.session.query(O2ORun.job_name,sqlalchemy.func.max(O2ORun.start_time)).group_by(O2ORun.job_name).order_by(O2ORun.job_name)
256 runs[r[0]] =
str(r[1])
257 res = self.session.query(O2OJob.name, O2OJob.interval, O2OJob.enabled).order_by(O2OJob.name).
all()
262 row.append(
'%5d' %r[1] )
263 enabled =
'Y' if (r[2]==1)
else 'N' 264 row.append(
'%4s' %enabled )
266 if r[0]
in runs.keys():
268 row.append( lastRun )
270 headers = [
'Job name',
'Interval',
'Enabled',
'Last run start']
274 res = self.session.query(O2OJob.enabled).filter_by(name=jname)
279 O2OMgr.logger( self).
error(
"A job called '%s' does not exist.", jname )
281 res = self.session.query( O2OJobConf.configuration, O2OJobConf.insertion_time ).filter_by(job_name=jname).order_by(O2OJobConf.insertion_time)
284 configs.append((
str(r[0]),r[1]))
287 print "Configurations for job '%s'" %jname
288 for cf
in reversed(configs):
289 print '#%2d since: %s' %(ind,cf[1])
293 O2OMgr.logger( self ).
info(
"No configuration found for job '%s'" %jname )
296 versionIndex =
int(versionIndex)
297 res = self.session.query(O2OJob.enabled).filter_by(name=jname)
302 O2OMgr.logger( self).
error(
"A job called '%s' does not exist.", jname )
304 res = self.session.query( O2OJobConf.configuration, O2OJobConf.insertion_time ).filter_by(job_name=jname).order_by(O2OJobConf.insertion_time)
307 configs.append((
str(r[0]),r[1]))
309 if versionIndex>ind
or versionIndex==0:
310 O2OMgr.logger( self ).
error(
"Configuration for job %s with index %s has not been found." %(jname,versionIndex))
312 print "Configuration #%2d for job '%s'" %(versionIndex,jname)
313 config = configs[versionIndex-1]
314 print '#%2d since %s' %(versionIndex,config[1])
316 if configFile
is None or configFile ==
'':
317 configFile =
'%s_%s.json' %(jname,versionIndex)
318 with open(configFile,
'w')
as json_file:
319 json_file.write(config[0])
324 def __init__( self, db_connection, session, logger ):
334 self.logger.info(
'Checking job %s', job_name)
338 res = self.session.query(O2OJob.enabled,O2OJob.tag_name).filter_by(name=job_name)
344 self.logger.error(
'The job %s is unknown.', job_name )
347 res = self.session.query(O2OJobConf.configuration).filter_by(job_name=job_name).order_by(sqlalchemy.desc(O2OJobConf.insertion_time)).
first()
352 self.logger.warning(
"No configuration found for job '%s'" %job_name )
355 self.conf_dict.update( json.loads(conf) )
356 self.logger.info(
'Using configuration: %s ' %conf)
357 except Exception
as e:
358 self.logger.error(
str(e) )
361 self.
start = datetime.now()
363 self.session.add(run)
364 self.session.commit()
367 self.logger.info(
'The job %s has been disabled.', job_name )
369 except sqlalchemy.exc.SQLAlchemyError
as dberror:
370 self.logger.error(
str(dberror) )
376 self.
end = datetime.now()
379 self.session.merge(run)
380 self.session.commit()
381 self.logger.info(
'Job %s ended.', self.
job_name )
383 except sqlalchemy.exc.SQLAlchemyError
as dberror:
384 self.logger.error(
str(dberror) )
389 command = args.executable
390 logFolder = os.getcwd()
391 if logFolderEnvVar
in os.environ:
392 logFolder = os.environ[logFolderEnvVar]
393 datelabel = datetime.now().strftime(
"%y-%m-%d-%H-%M-%S")
394 logFileName =
'%s-%s.log' %(job_name,datelabel)
395 logFile = os.path.join(logFolder,logFileName)
404 command = command.format(**self.
conf_dict )
405 except KeyError
as exc:
406 self.logger.error(
"Unresolved template key %s in the command." %
str(exc) )
408 self.logger.info(
'Command: "%s"', command )
410 self.logger.info(
'Executing command...' )
411 pipe = subprocess.Popen( command, shell=
True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT )
413 for line
in iter(pipe.stdout.readline,
''):
415 sys.stdout.write(line)
419 self.logger.info(
'Command returned code: %s' %pipe.returncode )
420 ret = pipe.returncode
421 except Exception
as e:
422 self.logger.error(
str(e) )
424 ended = self.
endJob( pipe.returncode, out )
427 with open(logFile,
'a')
as logF:
437 parser = argparse.ArgumentParser(description=
'CMS o2o command-line tool. For general help (manual page), use the help subcommand.')
438 parser.add_argument(
'--db', type=str, help=
'The target database: pro ( for prod ) or dev ( for prep ). default=pro')
439 parser.add_argument(
"--auth",
"-a", type=str, help=
"The path of the authentication file")
440 parser.add_argument(
'--verbose',
'-v', action=
'count', help=
'The verbosity level')
441 parser_subparsers = parser.add_subparsers(title=
'Available subcommands')
442 parser_create = parser_subparsers.add_parser(
'create', description=
'Create a new O2O job')
443 parser_create.add_argument(
'--name',
'-n', type=str, help=
'The o2o job name',required=
True)
444 parser_create.add_argument(
'--configFile',
'-c', type=str, help=
'the JSON configuration file path',required=
True)
445 parser_create.add_argument(
'--interval',
'-i', type=int, help=
'the chron job interval',default=0)
446 parser_create.set_defaults(func=self.
create)
447 parser_setConfig = parser_subparsers.add_parser(
'setConfig', description=
'Set a new configuration for the specified job. The configuration is expected as a list of entries "param": "value" (dictionary). The "param" labels will be used to inject the values in the command to execute. The dictionary is stored in JSON format.')
448 parser_setConfig.add_argument(
'--name',
'-n', type=str, help=
'The o2o job name',required=
True)
449 parser_setConfig.add_argument(
'--configFile',
'-c', type=str, help=
'the JSON configuration file path',required=
True)
450 parser_setConfig.set_defaults(func=self.
setConfig)
451 parser_setInterval = parser_subparsers.add_parser(
'setInterval',description=
'Set a new execution interval for the specified job')
452 parser_setInterval.add_argument(
'--name',
'-n', type=str, help=
'The o2o job name',required=
True)
453 parser_setInterval.add_argument(
'--interval',
'-i', type=int, help=
'the chron job interval',required=
True)
454 parser_setInterval.set_defaults(func=self.
setInterval)
455 parser_enable = parser_subparsers.add_parser(
'enable',description=
'enable the O2O job')
456 parser_enable.add_argument(
'--name',
'-n', type=str, help=
'The o2o job name',required=
True)
457 parser_enable.set_defaults(func=self.
enable)
458 parser_disable = parser_subparsers.add_parser(
'disable',description=
'disable the O2O job')
459 parser_disable.add_argument(
'--name',
'-n', type=str, help=
'The o2o job name',required=
True)
460 parser_disable.set_defaults(func=self.
disable)
461 parser_migrateConf = parser_subparsers.add_parser(
'migrateConfig',description=
'migrate the tag info for the jobs in configuration entries')
462 parser_migrateConf.set_defaults(func=self.
migrate)
463 parser_listJobs = parser_subparsers.add_parser(
'listJobs', description=
'list the registered jobs')
464 parser_listJobs.set_defaults(func=self.
listJobs)
465 parser_listConf = parser_subparsers.add_parser(
'listConfig', description=
'shows the configurations for the specified job')
466 parser_listConf.add_argument(
'--name',
'-n', type=str, help=
'The o2o job name',required=
True)
467 parser_listConf.add_argument(
'--dump', type=int, help=
'Dump the specified config.',default=0)
468 parser_listConf.set_defaults(func=self.
listConf)
469 parser_dumpConf = parser_subparsers.add_parser(
'dumpConfig', description=
'dumps a specific job configuration version')
470 parser_dumpConf.add_argument(
'versionIndex', type=str,help=
'the version to dump')
471 parser_dumpConf.add_argument(
'--name',
'-n', type=str, help=
'The o2o job name',required=
True)
472 parser_dumpConf.add_argument(
'--configFile',
'-c', type=str, help=
'the JSON configuration file name - default:[jobname]_[version].json')
473 parser_dumpConf.set_defaults(func=self.
dumpConf)
474 parser_run = parser_subparsers.add_parser(
'run', description=
'Wrapper for O2O jobs execution. Supports input parameter injection from the configuration file associated to the job. The formatting syntax supported are the python ones: "command -paramName {paramLabel}" or "command -paramName %(paramLabel)s". where [paramName] is the name of the parameter required for the command, and [paramLabel] is the key of the parameter entry in the config dictionary (recommended to be equal for clarity!"')
475 parser_run.add_argument(
'executable', type=str,help=
'command to execute')
476 parser_run.add_argument(
'--name',
'-n', type=str, help=
'The o2o job name',required=
True)
477 parser_run.set_defaults(func=self.
run)
479 args = parser.parse_args()
487 sys.exit( args.func())
488 except Exception
as e:
494 db_service = prod_db_service
495 if args.db
is not None:
496 if args.db ==
'dev' or args.db ==
'oradev' :
497 db_service = dev_db_service
498 elif args.db !=
'orapro' and args.db !=
'onlineorapro' and args.db !=
'pro':
499 raise Exception(
"Database '%s' is not known." %args.db )
502 return self.mgr.connect( db_service, args )
505 self.mgr.add( self.args.name, self.args.configFile, self.args.interval,
True )
508 self.mgr.setConfig( self.args.name, self.args.configFile )
511 self.mgr.setInterval( self.args.name, self.args.interval )
514 self.mgr.set( self.args.name,
True )
517 self.mgr.set( self.args.name,
False )
520 self.mgr.migrateConfig()
526 self.mgr.listConfig( self.args.name )
529 self.mgr.dumpConfig( self.args.name, self.args.versionIndex, self.args.configFile )
532 rmgr = self.mgr.runManager()
533 return rmgr.executeJob( self.
args )
def setConfig(self, job_name, config_filename)
def endJob(self, status, log)
def add(self, job_name, config_filename, int_val, en_flag)
def listConfig(self, jname)
def setInterval(self, job_name, int_val)
def connect(self, service, args)
def executeJob(self, args)
def readConfiguration(self, config_filename)
def set(self, job_name, en_flag)
def startJob(self, job_name)
def getSession(self, db_service, auth)
def __init__(self, db_connection, session, logger)
def dumpConfig(self, jname, versionIndex, configFile)
def print_table(headers, table)
def get_db_credentials(db_service, authFile)