1 from __future__
import print_function
2 __author__ =
'Giacomo Govi' 5 import sqlalchemy.ext.declarative
7 from datetime
import datetime
14 import CondCore.Utilities.credentials
as auth
16 prod_db_service = [
'cms_orcon_prod',
'cms_orcon_prod/cms_cond_general_w']
17 dev_db_service = [
'cms_orcoff_prep',
'cms_orcoff_prep/cms_test_conditions']
18 schema_name =
'CMS_CONDITIONS' 19 sqlalchemy_tpl =
'oracle://%s:%s@%s' 20 coral_tpl =
'oracle://%s/%s' 21 private_db =
'sqlite:///o2o_jobs.db' 23 authPathEnvVar =
'COND_AUTH_PATH' 24 messageLevelEnvVar =
'O2O_LOG_LEVEL' 25 logFolderEnvVar =
'O2O_LOG_FOLDER' 27 _Base = sqlalchemy.ext.declarative.declarative_base()
29 fmt_str =
"[%(asctime)s] %(levelname)s: %(message)s" 30 logLevel = logging.INFO
31 if messageLevelEnvVar
in os.environ:
32 levStr = os.environ[messageLevelEnvVar]
34 logLevel = logging.DEBUG
35 logFormatter = logging.Formatter(fmt_str)
38 __tablename__ =
'O2O_JOB' 39 __table_args__ = {
'schema' : schema_name}
40 name = sqlalchemy.Column(sqlalchemy.String(100), primary_key=
True)
41 enabled = sqlalchemy.Column(sqlalchemy.Integer, nullable=
False)
42 tag_name = sqlalchemy.Column(sqlalchemy.String(100), nullable=
False)
43 interval = sqlalchemy.Column(sqlalchemy.Integer, nullable=
False)
46 __tablename__ =
'O2O_JOB_CONF' 47 __table_args__ = {
'schema' : schema_name}
48 job_name = sqlalchemy.Column(sqlalchemy.ForeignKey(O2OJob.name), primary_key=
True)
49 insertion_time = sqlalchemy.Column(sqlalchemy.TIMESTAMP, primary_key=
True)
50 configuration = sqlalchemy.Column(sqlalchemy.String(4000), nullable=
False)
52 job = sqlalchemy.orm.relationship(
'O2OJob', primaryjoin=
"O2OJob.name==O2OJobConf.job_name")
55 __tablename__ =
'O2O_RUN' 56 __table_args__ = {
'schema' : schema_name}
57 job_name = sqlalchemy.Column(sqlalchemy.ForeignKey(O2OJob.name), primary_key=
True)
58 start_time = sqlalchemy.Column(sqlalchemy.TIMESTAMP, primary_key=
True)
59 end_time = sqlalchemy.Column(sqlalchemy.TIMESTAMP, nullable=
True)
60 status_code = sqlalchemy.Column(sqlalchemy.Integer, nullable=
False)
61 log = sqlalchemy.Column(sqlalchemy.CLOB, nullable=
True)
63 job = sqlalchemy.orm.relationship(
'O2OJob', primaryjoin=
"O2OJob.name==O2ORun.job_name")
66 (username, account, pwd) = auth.get_credentials( authPathEnvVar, db_service[1], authFile )
87 line += (fmt.format( row[ind] )+
' ')
94 hsep += (fmt.format(
'')+
' ')
102 self.logger.setLevel(logLevel)
103 consoleHandler = logging.StreamHandler(sys.stdout)
104 consoleHandler.setFormatter(logFormatter)
105 self.logger.addHandler(consoleHandler)
113 if db_service
is None:
116 self.logger.info(
'Getting credentials')
118 if not os.path.exists(auth):
119 self.logger.error(
'Authentication path %s is invalid.' %auth)
123 except Exception
as e:
124 logging.debug(
str(e))
128 logging.error(
'Credentials for service %s (machine=%s) are not available' %(db_service[0],db_service[1]))
129 raise Exception(
"Cannot connect to db %s" %db_service[0] )
130 url = sqlalchemy_tpl %(username,pwd,db_service[0])
133 self.
eng = sqlalchemy.create_engine( url )
134 session = sqlalchemy.orm.scoped_session( sqlalchemy.orm.sessionmaker(bind=self.
eng))
135 except sqlalchemy.exc.SQLAlchemyError
as dberror:
136 self.logger.error(
str(dberror) )
142 O2OMgr.__init__(self)
150 with open( config_filename,
'r' ) as config_file: 151 config = config_file.read() 153 O2OMgr.logger( self).
error(
'The file %s contains an empty string.', config_filename )
157 O2OMgr.logger( self).
error(
'The file %s cannot be open.', config_filename )
158 except ValueError
as e:
160 O2OMgr.logger( self).
error(
'The file %s contains an invalid json string.', config_filename )
164 self.
session = O2OMgr.getSession( self,service, args.auth )
175 def add( self, job_name, config_filename, int_val, en_flag ):
176 res = self.session.query(O2OJob.enabled).filter_by(name=job_name)
181 O2OMgr.logger( self).
error(
"A job called '%s' exists already.", job_name )
186 job =
O2OJob(name=job_name,tag_name=
'-',enabled=en_flag,interval=int_val)
187 config =
O2OJobConf( job_name=job_name, insertion_time = datetime.now(), configuration = configJson )
188 self.session.add(job)
189 self.session.add(config)
190 self.session.commit()
191 O2OMgr.logger( self).
info(
"New o2o job '%s' created.", job_name )
194 def set( self, job_name, en_flag ):
195 res = self.session.query(O2OJob.enabled).filter_by(name=job_name)
200 O2OMgr.logger( self).
error(
"A job called '%s' does not exist.", job_name )
202 job =
O2OJob(name=job_name,enabled=en_flag)
206 self.session.merge(job)
207 self.session.commit()
208 O2OMgr.logger( self).
info(
"Job '%s' %s." %(job_name,action) )
211 res = self.session.query(O2OJob.enabled).filter_by(name=job_name)
216 O2OMgr.logger( self).
error(
"A job called '%s' does not exist.", job_name )
221 config =
O2OJobConf( job_name=job_name, insertion_time = datetime.now(), configuration = configJson )
222 self.session.add(config)
223 self.session.commit()
224 O2OMgr.logger( self).
info(
"New configuration inserted for job '%s'", job_name )
227 res = self.session.query(O2OJob.enabled).filter_by(name=job_name)
232 O2OMgr.logger( self).
error(
"A job called '%s' does not exist.", job_name )
234 job =
O2OJob(name=job_name,interval=int_val)
235 self.session.merge(job)
236 self.session.commit()
237 O2OMgr.logger( self).
info(
"The execution interval for job '%s' has been updated.", job_name )
240 res = self.session.query(O2OJob.name,O2OJob.tag_name)
246 configDict[
'tag']=tag
247 configJson = json.dumps( configDict )
248 config =
O2OJobConf( job_name=job_name, insertion_time = datetime.now(), configuration = configJson )
249 self.session.add(config)
250 O2OMgr.logger( self).
info(
"Configuration for job '%s' inserted.", job_name )
251 self.session.commit()
255 res = self.session.query(O2ORun.job_name,sqlalchemy.func.max(O2ORun.start_time)).group_by(O2ORun.job_name).order_by(O2ORun.job_name)
257 runs[r[0]] =
str(r[1])
258 res = self.session.query(O2OJob.name, O2OJob.interval, O2OJob.enabled).order_by(O2OJob.name).
all()
263 row.append(
'%5d' %r[1] )
264 enabled =
'Y' if (r[2]==1)
else 'N' 265 row.append(
'%4s' %enabled )
267 if r[0]
in runs.keys():
269 row.append( lastRun )
271 headers = [
'Job name',
'Interval',
'Enabled',
'Last run start']
275 res = self.session.query(O2OJob.enabled).filter_by(name=jname)
280 O2OMgr.logger( self).
error(
"A job called '%s' does not exist.", jname )
282 res = self.session.query( O2OJobConf.configuration, O2OJobConf.insertion_time ).filter_by(job_name=jname).order_by(O2OJobConf.insertion_time)
285 configs.append((
str(r[0]),r[1]))
288 print(
"Configurations for job '%s'" %jname)
289 for cf
in reversed(configs):
290 print(
'#%2d since: %s' %(ind,cf[1]))
294 O2OMgr.logger( self ).
info(
"No configuration found for job '%s'" %jname )
297 versionIndex =
int(versionIndex)
298 res = self.session.query(O2OJob.enabled).filter_by(name=jname)
303 O2OMgr.logger( self).
error(
"A job called '%s' does not exist.", jname )
305 res = self.session.query( O2OJobConf.configuration, O2OJobConf.insertion_time ).filter_by(job_name=jname).order_by(O2OJobConf.insertion_time)
308 configs.append((
str(r[0]),r[1]))
310 if versionIndex>ind
or versionIndex==0:
311 O2OMgr.logger( self ).
error(
"Configuration for job %s with index %s has not been found." %(jname,versionIndex))
313 print(
"Configuration #%2d for job '%s'" %(versionIndex,jname))
314 config = configs[versionIndex-1]
315 print(
'#%2d since %s' %(versionIndex,config[1]))
317 if configFile
is None or configFile ==
'':
318 configFile =
'%s_%s.json' %(jname,versionIndex)
319 with open(configFile,
'w')
as json_file:
320 json_file.write(config[0])
325 def __init__( self, db_connection, session, logger ):
335 self.logger.info(
'Checking job %s', job_name)
339 res = self.session.query(O2OJob.enabled,O2OJob.tag_name).filter_by(name=job_name)
345 self.logger.error(
'The job %s is unknown.', job_name )
348 res = self.session.query(O2OJobConf.configuration).filter_by(job_name=job_name).order_by(sqlalchemy.desc(O2OJobConf.insertion_time)).
first()
353 self.logger.warning(
"No configuration found for job '%s'" %job_name )
356 self.conf_dict.update( json.loads(conf) )
357 self.logger.info(
'Using configuration: %s ' %conf)
358 except Exception
as e:
359 self.logger.error(
str(e) )
362 self.
start = datetime.now()
364 self.session.add(run)
365 self.session.commit()
368 self.logger.info(
'The job %s has been disabled.', job_name )
370 except sqlalchemy.exc.SQLAlchemyError
as dberror:
371 self.logger.error(
str(dberror) )
377 self.
end = datetime.now()
380 self.session.merge(run)
381 self.session.commit()
382 self.logger.info(
'Job %s ended.', self.
job_name )
384 except sqlalchemy.exc.SQLAlchemyError
as dberror:
385 self.logger.error(
str(dberror) )
390 command = args.executable
391 logFolder = os.getcwd()
392 if logFolderEnvVar
in os.environ:
393 logFolder = os.environ[logFolderEnvVar]
394 datelabel = datetime.now().strftime(
"%y-%m-%d-%H-%M-%S")
395 logFileName =
'%s-%s.log' %(job_name,datelabel)
396 logFile = os.path.join(logFolder,logFileName)
405 command = command.format(**self.
conf_dict )
406 except KeyError
as exc:
407 self.logger.error(
"Unresolved template key %s in the command." %
str(exc) )
409 self.logger.info(
'Command: "%s"', command )
411 self.logger.info(
'Executing command...' )
412 pipe = subprocess.Popen( command, shell=
True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT )
414 for line
in iter(pipe.stdout.readline,
''):
416 sys.stdout.write(line)
420 self.logger.info(
'Command returned code: %s' %pipe.returncode )
421 ret = pipe.returncode
422 except Exception
as e:
423 self.logger.error(
str(e) )
425 ended = self.
endJob( pipe.returncode, out )
428 with open(logFile,
'a')
as logF:
438 parser = argparse.ArgumentParser(description=
'CMS o2o command-line tool. For general help (manual page), use the help subcommand.')
439 parser.add_argument(
'--db', type=str, help=
'The target database: pro ( for prod ) or dev ( for prep ). default=pro')
440 parser.add_argument(
"--auth",
"-a", type=str, help=
"The path of the authentication file")
441 parser.add_argument(
'--verbose',
'-v', action=
'count', help=
'The verbosity level')
442 parser_subparsers = parser.add_subparsers(title=
'Available subcommands')
443 parser_create = parser_subparsers.add_parser(
'create', description=
'Create a new O2O job')
444 parser_create.add_argument(
'--name',
'-n', type=str, help=
'The o2o job name',required=
True)
445 parser_create.add_argument(
'--configFile',
'-c', type=str, help=
'the JSON configuration file path',required=
True)
446 parser_create.add_argument(
'--interval',
'-i', type=int, help=
'the chron job interval',default=0)
447 parser_create.set_defaults(func=self.
create)
448 parser_setConfig = parser_subparsers.add_parser(
'setConfig', description=
'Set a new configuration for the specified job. The configuration is expected as a list of entries "param": "value" (dictionary). The "param" labels will be used to inject the values in the command to execute. The dictionary is stored in JSON format.')
449 parser_setConfig.add_argument(
'--name',
'-n', type=str, help=
'The o2o job name',required=
True)
450 parser_setConfig.add_argument(
'--configFile',
'-c', type=str, help=
'the JSON configuration file path',required=
True)
451 parser_setConfig.set_defaults(func=self.
setConfig)
452 parser_setInterval = parser_subparsers.add_parser(
'setInterval',description=
'Set a new execution interval for the specified job')
453 parser_setInterval.add_argument(
'--name',
'-n', type=str, help=
'The o2o job name',required=
True)
454 parser_setInterval.add_argument(
'--interval',
'-i', type=int, help=
'the chron job interval',required=
True)
455 parser_setInterval.set_defaults(func=self.
setInterval)
456 parser_enable = parser_subparsers.add_parser(
'enable',description=
'enable the O2O job')
457 parser_enable.add_argument(
'--name',
'-n', type=str, help=
'The o2o job name',required=
True)
458 parser_enable.set_defaults(func=self.
enable)
459 parser_disable = parser_subparsers.add_parser(
'disable',description=
'disable the O2O job')
460 parser_disable.add_argument(
'--name',
'-n', type=str, help=
'The o2o job name',required=
True)
461 parser_disable.set_defaults(func=self.
disable)
462 parser_migrateConf = parser_subparsers.add_parser(
'migrateConfig',description=
'migrate the tag info for the jobs in configuration entries')
463 parser_migrateConf.set_defaults(func=self.
migrate)
464 parser_listJobs = parser_subparsers.add_parser(
'listJobs', description=
'list the registered jobs')
465 parser_listJobs.set_defaults(func=self.
listJobs)
466 parser_listConf = parser_subparsers.add_parser(
'listConfig', description=
'shows the configurations for the specified job')
467 parser_listConf.add_argument(
'--name',
'-n', type=str, help=
'The o2o job name',required=
True)
468 parser_listConf.add_argument(
'--dump', type=int, help=
'Dump the specified config.',default=0)
469 parser_listConf.set_defaults(func=self.
listConf)
470 parser_dumpConf = parser_subparsers.add_parser(
'dumpConfig', description=
'dumps a specific job configuration version')
471 parser_dumpConf.add_argument(
'versionIndex', type=str,help=
'the version to dump')
472 parser_dumpConf.add_argument(
'--name',
'-n', type=str, help=
'The o2o job name',required=
True)
473 parser_dumpConf.add_argument(
'--configFile',
'-c', type=str, help=
'the JSON configuration file name - default:[jobname]_[version].json')
474 parser_dumpConf.set_defaults(func=self.
dumpConf)
475 parser_run = parser_subparsers.add_parser(
'run', description=
'Wrapper for O2O jobs execution. Supports input parameter injection from the configuration file associated to the job. The formatting syntax supported are the python ones: "command -paramName {paramLabel}" or "command -paramName %(paramLabel)s". where [paramName] is the name of the parameter required for the command, and [paramLabel] is the key of the parameter entry in the config dictionary (recommended to be equal for clarity!"')
476 parser_run.add_argument(
'executable', type=str,help=
'command to execute')
477 parser_run.add_argument(
'--name',
'-n', type=str, help=
'The o2o job name',required=
True)
478 parser_run.set_defaults(func=self.
run)
480 args = parser.parse_args()
488 sys.exit( args.func())
489 except Exception
as e:
495 db_service = prod_db_service
496 if args.db
is not None:
497 if args.db ==
'dev' or args.db ==
'oradev' :
498 db_service = dev_db_service
499 elif args.db !=
'orapro' and args.db !=
'onlineorapro' and args.db !=
'pro':
500 raise Exception(
"Database '%s' is not known." %args.db )
503 return self.mgr.connect( db_service, args )
506 self.mgr.add( self.args.name, self.args.configFile, self.args.interval,
True )
509 self.mgr.setConfig( self.args.name, self.args.configFile )
512 self.mgr.setInterval( self.args.name, self.args.interval )
515 self.mgr.set( self.args.name,
True )
518 self.mgr.set( self.args.name,
False )
521 self.mgr.migrateConfig()
527 self.mgr.listConfig( self.args.name )
530 self.mgr.dumpConfig( self.args.name, self.args.versionIndex, self.args.configFile )
533 rmgr = self.mgr.runManager()
534 return rmgr.executeJob( self.
args )
def setConfig(self, job_name, config_filename)
def endJob(self, status, log)
def add(self, job_name, config_filename, int_val, en_flag)
def listConfig(self, jname)
S & print(S &os, JobReport::InputFile const &f)
def setInterval(self, job_name, int_val)
def connect(self, service, args)
def executeJob(self, args)
def readConfiguration(self, config_filename)
def set(self, job_name, en_flag)
def startJob(self, job_name)
def getSession(self, db_service, auth)
def __init__(self, db_connection, session, logger)
def dumpConfig(self, jname, versionIndex, configFile)
def print_table(headers, table)
def get_db_credentials(db_service, authFile)