1 from __future__
import print_function
2 __author__ =
'Giacomo Govi'
5 import sqlalchemy.ext.declarative
7 from datetime
import datetime
14 import CondCore.Utilities.credentials
as auth
16 prod_db_service =
'cms_orcon_prod'
17 dev_db_service =
'cms_orcoff_prep'
18 schema_name =
'CMS_CONDITIONS'
19 sqlalchemy_tpl =
'oracle://%s:%s@%s'
20 coral_tpl =
'oracle://%s/%s'
21 private_db =
'sqlite:///o2o_jobs.db'
23 messageLevelEnvVar =
'O2O_LOG_LEVEL'
24 logFolderEnvVar =
'O2O_LOG_FOLDER'
25 logger = logging.getLogger(__name__)
27 _Base = sqlalchemy.ext.declarative.declarative_base()
30 __tablename__ =
'O2O_JOB'
31 __table_args__ = {
'schema' : schema_name}
32 name = sqlalchemy.Column(sqlalchemy.String(100), primary_key=
True)
33 enabled = sqlalchemy.Column(sqlalchemy.Integer, nullable=
False)
34 frequent = sqlalchemy.Column(sqlalchemy.Integer, nullable=
False)
35 tag_name = sqlalchemy.Column(sqlalchemy.String(100), nullable=
False)
36 interval = sqlalchemy.Column(sqlalchemy.Integer, nullable=
False)
39 __tablename__ =
'O2O_JOB_CONF'
40 __table_args__ = {
'schema' : schema_name}
41 job_name = sqlalchemy.Column(sqlalchemy.ForeignKey(O2OJob.name), primary_key=
True)
42 insertion_time = sqlalchemy.Column(sqlalchemy.TIMESTAMP, primary_key=
True)
43 configuration = sqlalchemy.Column(sqlalchemy.String(4000), nullable=
False)
45 job = sqlalchemy.orm.relationship(
'O2OJob', primaryjoin=
"O2OJob.name==O2OJobConf.job_name")
48 __tablename__ =
'O2O_RUN'
49 __table_args__ = {
'schema' : schema_name}
50 job_name = sqlalchemy.Column(sqlalchemy.ForeignKey(O2OJob.name), primary_key=
True)
51 start_time = sqlalchemy.Column(sqlalchemy.TIMESTAMP, primary_key=
True)
52 end_time = sqlalchemy.Column(sqlalchemy.TIMESTAMP, nullable=
True)
53 status_code = sqlalchemy.Column(sqlalchemy.Integer, nullable=
False)
54 log = sqlalchemy.Column(sqlalchemy.CLOB, nullable=
True)
56 job = sqlalchemy.orm.relationship(
'O2OJob', primaryjoin=
"O2OJob.name==O2ORun.job_name")
76 line += (fmt.format( row[ind] )+
' ')
83 hsep += (fmt.format(
'')+
' ')
94 fmt_str =
"[%(asctime)s] %(levelname)s: %(message)s"
95 if messageLevelEnvVar
in os.environ:
96 levStr = os.environ[messageLevelEnvVar]
98 logLevel = logging.DEBUG
99 logFormatter = logging.Formatter(fmt_str)
102 self.
logger.setLevel(logLevel)
103 consoleHandler = logging.StreamHandler(sys.stdout)
104 consoleHandler.setFormatter(logFormatter)
105 self.
logger.addHandler(consoleHandler)
110 if db_service
is None:
114 if authPath
is not None:
115 if not os.path.exists(authPath):
116 self.
logger.
error(
'Authentication path %s is invalid.' %authPath)
119 (username, account, pwd) = auth.get_credentials_for_schema( db_service, schema_name, role, authPath )
120 except Exception
as e:
125 self.
logger.
error(
'Credentials for service %s are not available' %db_service)
126 raise Exception(
"Cannot connect to db %s" %db_service )
127 url = sqlalchemy_tpl %(username,pwd,db_service)
130 self.
eng = sqlalchemy.create_engine( url, max_identifier_length=30)
131 session = sqlalchemy.orm.scoped_session( sqlalchemy.orm.sessionmaker(bind=self.
eng))
132 except sqlalchemy.exc.SQLAlchemyError
as dberror:
148 def add( self, job_name, configJson, int_val, freq_flag, en_flag ):
151 res = self.
session.
query(O2OJob.enabled).filter_by(name=job_name)
156 self.
logger.
error(
"A job called '%s' exists already.", job_name )
161 job =
O2OJob(name=job_name,tag_name=
'-',enabled=en_flag,frequent=freq_val,interval=int_val)
162 config =
O2OJobConf( job_name=job_name, insertion_time = datetime.utcnow(), configuration = configJson )
166 self.
logger.
info(
"New o2o job '%s' created.", job_name )
169 def set( self, job_name, en_flag, fr_val=None ):
170 res = self.
session.
query(O2OJob.enabled).filter_by(name=job_name)
175 self.
logger.
error(
"A job called '%s' does not exist.", job_name )
177 if en_flag
is not None and enabled != en_flag:
178 job =
O2OJob(name=job_name,enabled=en_flag)
184 self.
logger.
info(
"Job '%s' %s." %(job_name,action) )
185 if fr_val
is not None:
186 job =
O2OJob(name=job_name,frequent=fr_val)
190 self.
logger.
info(
"Job '%s' set 'frequent'" %job_name)
192 self.
logger.
info(
"Job '%s' unset 'frequent'" %job_name)
197 res = self.
session.
query(O2OJob.enabled).filter_by(name=job_name)
202 self.
logger.
error(
"A job called '%s' does not exist.", job_name )
204 config =
O2OJobConf( job_name=job_name, insertion_time = datetime.utcnow(), configuration = configJson )
207 self.
logger.
info(
"New configuration inserted for job '%s'", job_name )
211 res = self.
session.
query(O2OJob.enabled).filter_by(name=job_name)
216 self.
logger.
error(
"A job called '%s' does not exist.", job_name )
218 job =
O2OJob(name=job_name,interval=int_val)
221 self.
logger.
info(
"The execution interval for job '%s' has been updated.", job_name )
225 res = self.
session.
query(O2ORun.job_name,sqlalchemy.func.max(O2ORun.start_time)).group_by(O2ORun.job_name).order_by(O2ORun.job_name)
227 runs[r[0]] =
str(r[1])
228 res = self.
session.
query(O2OJob.name, O2OJob.interval, O2OJob.enabled, O2OJob.frequent).order_by(O2OJob.name).
all()
233 row.append(
'%5d' %r[1] )
234 frequent =
'Y' if (r[3]==1)
else 'N'
235 row.append(
'%4s' %frequent )
236 enabled =
'Y' if (r[2]==1)
else 'N'
237 row.append(
'%4s' %enabled )
239 if r[0]
in runs.keys():
241 row.append( lastRun )
243 headers = [
'Job name',
'Interval',
'Frequent',
'Enabled',
'Last run start']
247 res = self.
session.
query(O2OJob.enabled).filter_by(name=jname)
252 self.
logger.
error(
"A job called '%s' does not exist.", jname )
254 res = self.
session.
query( O2OJobConf.configuration, O2OJobConf.insertion_time ).filter_by(job_name=jname).order_by(O2OJobConf.insertion_time)
257 configs.append((
str(r[0]),r[1]))
260 print(
"Configurations for job '%s'" %jname)
261 for cf
in reversed(configs):
262 print(
'#%2d since: %s' %(ind,cf[1]))
266 self.
logger.
info(
"No configuration found for job '%s'" %jname )
269 versionIndex =
int(versionIndex)
270 res = self.
session.
query(O2OJob.enabled).filter_by(name=jname)
275 self.
logger.
error(
"A job called '%s' does not exist.", jname )
277 res = self.
session.
query( O2OJobConf.configuration, O2OJobConf.insertion_time ).filter_by(job_name=jname).order_by(O2OJobConf.insertion_time)
280 configs.append((
str(r[0]),r[1]))
282 if versionIndex>ind
or versionIndex==0:
283 self.
logger.
error(
"Configuration for job %s with index %s has not been found." %(jname,versionIndex))
285 print(
"Configuration #%2d for job '%s'" %(versionIndex,jname))
286 config = configs[versionIndex-1]
287 print(
'#%2d since %s' %(versionIndex,config[1]))
289 if configFile
is None or configFile ==
'':
290 configFile =
'%s_%s.json' %(jname,versionIndex)
291 with open(configFile,
'w')
as json_file:
292 json_file.write(config[0])
297 def __init__( self, db_connection, session, logger ):
311 res = self.
session.
query(O2OJob.enabled,O2OJob.tag_name).filter_by(name=job_name)
317 self.
logger.
error(
'The job %s is unknown.', job_name )
320 res = self.
session.
query(O2OJobConf.configuration).filter_by(job_name=job_name).order_by(sqlalchemy.desc(O2OJobConf.insertion_time)).
first()
325 self.
logger.
warning(
"No configuration found for job '%s'" %job_name )
329 self.
logger.
info(
'Using configuration: %s ' %conf)
330 except Exception
as e:
334 self.
start = datetime.utcnow()
340 self.
logger.
info(
'The job %s has been disabled.', job_name )
342 except sqlalchemy.exc.SQLAlchemyError
as dberror:
349 self.
end = datetime.utcnow()
356 except sqlalchemy.exc.SQLAlchemyError
as dberror:
362 command = args.executable
363 logFolder = os.getcwd()
364 if logFolderEnvVar
in os.environ:
365 logFolder = os.environ[logFolderEnvVar]
366 datelabel = datetime.utcnow().strftime(
"%y-%m-%d-%H-%M-%S")
367 logFileName =
'%s-%s.log' %(job_name,datelabel)
368 logFile = os.path.join(logFolder,logFileName)
377 command = command.format(**self.
conf_dict )
378 except KeyError
as exc:
379 self.
logger.
error(
"Unresolved template key %s in the command." %
str(exc) )
385 pipe = subprocess.Popen( command, shell=
True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT )
386 for line
in pipe.stdout:
387 if args.verbose
is not None and args.verbose>=1:
388 sys.stdout.write(line.decode())
392 self.
logger.
info(
'Command returned code: %s' %pipe.returncode )
393 ret = pipe.returncode
394 except Exception
as e:
397 ended = self.
endJob( pipe.returncode, out )
400 with open(logFile,
'a')
as logF:
407 with open( config_filename,
'r' )
as config_file:
408 config = config_file.read().
strip(
'\n')
410 logging.error(
'The file %s contains an empty string.', config_filename )
414 logging.error(
'The file %s cannot be open.', config_filename )
415 except ValueError
as e:
417 logging.error(
'The file "%s" contains an invalid json string.', config_filename )
421 config = config_string
424 except ValueError
as e:
426 logging.error(
'The string "%s" is an invalid json format.', config_string )
436 parser = argparse.ArgumentParser(description=
'CMS o2o command-line tool. For general help (manual page), use the help subcommand.')
437 parser.add_argument(
'--db', type=str, help=
'The target database: pro ( for prod ) or dev ( for prep ). default=pro')
438 parser.add_argument(
"--auth",
"-a", type=str, help=
"The path of the authentication file")
439 parser.add_argument(
'--verbose',
'-v', action=
'count', help=
'The verbosity level')
440 parser_subparsers = parser.add_subparsers(title=
'Available subcommands')
441 parser_create = parser_subparsers.add_parser(
'create', description=
'Create a new O2O job')
442 parser_create.add_argument(
'--name',
'-n', type=str, help=
'The o2o job name',required=
True)
443 parser_create.add_argument(
'--configFile',
'-c', type=str, help=
'the JSON configuration file path')
444 parser_create.add_argument(
'--configString',
'-s', type=str, help=
'the JSON configuration string')
445 parser_create.add_argument(
'--interval',
'-i', type=int, help=
'the chron job interval',default=0)
446 parser_create.add_argument(
'--frequent',
'-f',action=
'store_true',help=
'set the "frequent" flag for this job ("false" by default)')
447 parser_create.set_defaults(func=self.
create,role=auth.admin_role)
448 parser_setConfig = parser_subparsers.add_parser(
'setConfig', description=
'Set a new configuration for the specified job. The configuration is expected as a list of entries "param": "value" (dictionary). The "param" labels will be used to inject the values in the command to execute. The dictionary is stored in JSON format.')
449 parser_setConfig.add_argument(
'--name',
'-n', type=str, help=
'The o2o job name',required=
True)
450 parser_setConfig.add_argument(
'--configFile',
'-c', type=str, help=
'the JSON configuration file path')
451 parser_setConfig.add_argument(
'--configString',
'-s', type=str, help=
'the JSON configuration string')
452 parser_setConfig.set_defaults(func=self.
setConfig,role=auth.admin_role)
453 parser_setFrequent = parser_subparsers.add_parser(
'setFrequent',description=
'Set the "frequent" flag for the specified job')
454 parser_setFrequent.add_argument(
'--name',
'-n', type=str, help=
'The o2o job name',required=
True)
455 parser_setFrequent.add_argument(
'--flag',
'-f', choices=[
'0',
'1'], help=
'the flag value to set',required=
True)
456 parser_setFrequent.set_defaults(func=self.
setFrequent,role=auth.admin_role)
457 parser_setInterval = parser_subparsers.add_parser(
'setInterval',description=
'Set a new execution interval for the specified job')
458 parser_setInterval.add_argument(
'--name',
'-n', type=str, help=
'The o2o job name',required=
True)
459 parser_setInterval.add_argument(
'--interval',
'-i', type=int, help=
'the chron job interval',required=
True)
460 parser_setInterval.set_defaults(func=self.
setInterval,role=auth.admin_role)
461 parser_enable = parser_subparsers.add_parser(
'enable',description=
'enable the O2O job')
462 parser_enable.add_argument(
'--name',
'-n', type=str, help=
'The o2o job name',required=
True)
463 parser_enable.set_defaults(func=self.
enable,role=auth.admin_role)
464 parser_disable = parser_subparsers.add_parser(
'disable',description=
'disable the O2O job')
465 parser_disable.add_argument(
'--name',
'-n', type=str, help=
'The o2o job name',required=
True)
466 parser_disable.set_defaults(func=self.
disable,role=auth.admin_role)
467 parser_listJobs = parser_subparsers.add_parser(
'listJobs', description=
'list the registered jobs')
468 parser_listJobs.set_defaults(func=self.
listJobs,role=auth.reader_role)
469 parser_listConf = parser_subparsers.add_parser(
'listConfig', description=
'shows the configurations for the specified job')
470 parser_listConf.add_argument(
'--name',
'-n', type=str, help=
'The o2o job name',required=
True)
471 parser_listConf.add_argument(
'--dump', type=int, help=
'Dump the specified config.',default=0)
472 parser_listConf.set_defaults(func=self.
listConf,role=auth.reader_role)
473 parser_dumpConf = parser_subparsers.add_parser(
'dumpConfig', description=
'dumps a specific job configuration version')
474 parser_dumpConf.add_argument(
'versionIndex', type=str,help=
'the version to dump')
475 parser_dumpConf.add_argument(
'--name',
'-n', type=str, help=
'The o2o job name',required=
True)
476 parser_dumpConf.add_argument(
'--configFile',
'-c', type=str, help=
'the JSON configuration file name - default:[jobname]_[version].json')
477 parser_dumpConf.set_defaults(func=self.
dumpConf,role=auth.reader_role)
478 parser_run = parser_subparsers.add_parser(
'run', description=
'Wrapper for O2O jobs execution. Supports input parameter injection from the configuration file associated to the job. The formatting syntax supported are the python ones: "command -paramName {paramLabel}" or "command -paramName %(paramLabel)s". where [paramName] is the name of the parameter required for the command, and [paramLabel] is the key of the parameter entry in the config dictionary (recommended to be equal for clarity!"')
479 parser_run.add_argument(
'executable', type=str,help=
'command to execute')
480 parser_run.add_argument(
'--name',
'-n', type=str, help=
'The o2o job name',required=
True)
481 parser_run.set_defaults(func=self.
run,role=auth.writer_role)
483 args = parser.parse_args()
485 if args.verbose
is not None and args.verbose >=1:
491 sys.exit( args.func())
492 except Exception
as e:
498 db_service = prod_db_service
499 if args.db
is not None:
500 if args.db ==
'dev' or args.db ==
'oradev' :
501 db_service = dev_db_service
502 elif args.db !=
'orapro' and args.db !=
'onlineorapro' and args.db !=
'pro':
503 raise Exception(
"Database '%s' is not known." %args.db )
505 logLevel = logging.DEBUG
if args.verbose
is not None and args.verbose >= 1
else logging.INFO
511 if self.
args.configFile
is not None:
512 if self.
args.configString
is not None:
513 logging.error(
'Ambigouous input provided: please specify a configFile OR a configString')
518 if self.
args.configString
is None:
519 logging.error(
'No configuration has been provided: please specify "configFile" or "configString" param.')
523 self.
mgr.
add( self.
args.name, configJson, self.
args.interval, self.
args.frequent,
True )
527 if self.
args.configFile
is not None:
528 if self.
args.configString
is not None:
529 logging.error(
'Ambigouous input provided: please specify a configFile OR a configString')
534 if self.
args.configString
is None:
535 logging.error(
'No configuration has been provided: please specify "configFile" or "configString" param.')
545 self.
mgr.set( self.
args.name,
True )
548 self.
mgr.set( self.
args.name,
False )
557 self.
mgr.listConfig( self.
args.name )
563 rmgr = self.
mgr.runManager()
564 return rmgr.executeJob( self.
args )