CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
List of all members | Public Member Functions | Public Attributes
o2olib.O2OJobMgr Class Reference
Inheritance diagram for o2olib.O2OJobMgr:

Public Member Functions

def __init__
 
def add
 
def connect
 
def dumpConfig
 
def getSession
 
def listConfig
 
def listJobs
 
def runManager
 
def set
 
def setConfig
 
def setInterval
 

Public Attributes

 conf_dict
 
 db_connection
 
 eng
 
 logger
 
 session
 
 verbose
 

Detailed Description

Definition at line 89 of file o2olib.py.

Constructor & Destructor Documentation

def o2olib.O2OJobMgr.__init__ (   self,
  logLevel 
)

Definition at line 91 of file o2olib.py.

91 
92  def __init__( self , logLevel):
93  self.db_connection = None
94  self.conf_dict = {}
95  fmt_str = "[%(asctime)s] %(levelname)s: %(message)s"
96  if messageLevelEnvVar in os.environ:
97  levStr = os.environ[messageLevelEnvVar]
98  if levStr == 'DEBUG':
99  logLevel = logging.DEBUG
100  logFormatter = logging.Formatter(fmt_str)
102  self.logger = logging.getLogger()
103  self.logger.setLevel(logLevel)
104  consoleHandler = logging.StreamHandler(sys.stdout)
105  consoleHandler.setFormatter(logFormatter)
106  self.logger.addHandler(consoleHandler)
107  self.eng = None

Member Function Documentation

def o2olib.O2OJobMgr.add (   self,
  job_name,
  configJson,
  int_val,
  freq_flag,
  en_flag 
)

Definition at line 148 of file o2olib.py.

Referenced by counter.Counter.register(), SequenceTypes.Task.remove(), and SequenceTypes.Task.replace().

149  def add( self, job_name, configJson, int_val, freq_flag, en_flag ):
150  if configJson == '':
151  return False
152  res = self.session.query(O2OJob.enabled).filter_by(name=job_name)
153  enabled = None
154  for r in res:
155  enabled = r
156  if enabled:
157  self.logger.error( "A job called '%s' exists already.", job_name )
158  return False
159  freq_val = 0
160  if freq_flag:
161  freq_val = 1
162  job = O2OJob(name=job_name,tag_name='-',enabled=en_flag,frequent=freq_val,interval=int_val)
163  config = O2OJobConf( job_name=job_name, insertion_time = datetime.utcnow(), configuration = configJson )
164  self.session.add(job)
165  self.session.add(config)
166  self.session.commit()
167  self.logger.info( "New o2o job '%s' created.", job_name )
168  return True
def o2olib.O2OJobMgr.connect (   self,
  service,
  args 
)

Definition at line 136 of file o2olib.py.

Referenced by o2o_db_cfgmap.DbManagerDAQ.update_hashmap().

137  def connect( self, service, args ):
138  self.session = self.getSession( service, args.role, args.auth )
139  self.verbose = args.verbose
140  if self.session is None:
141  return False
142  else:
143  self.db_connection = coral_tpl %(service,schema_name)
144  self.conf_dict['db']=self.db_connection
return True
def o2olib.O2OJobMgr.dumpConfig (   self,
  jname,
  versionIndex,
  configFile 
)

Definition at line 268 of file o2olib.py.

References print(), and str.

Referenced by Types.SecSource.configValue().

269  def dumpConfig( self, jname, versionIndex, configFile ):
270  versionIndex = int(versionIndex)
271  res = self.session.query(O2OJob.enabled).filter_by(name=jname)
272  enabled = None
273  for r in res:
274  enabled = r
275  if enabled is None:
276  self.logger.error( "A job called '%s' does not exist.", jname )
277  return
278  res = self.session.query( O2OJobConf.configuration, O2OJobConf.insertion_time ).filter_by(job_name=jname).order_by(O2OJobConf.insertion_time)
279  configs = []
280  for r in res:
281  configs.append((str(r[0]),r[1]))
282  ind = len(configs)
283  if versionIndex>ind or versionIndex==0:
284  self.logger.error("Configuration for job %s with index %s has not been found." %(jname,versionIndex))
285  return
286  print("Configuration #%2d for job '%s'" %(versionIndex,jname))
287  config = configs[versionIndex-1]
288  print('#%2d since %s' %(versionIndex,config[1]))
289  print(config[0])
290  if configFile is None or configFile == '':
291  configFile = '%s_%s.json' %(jname,versionIndex)
292  with open(configFile,'w') as json_file:
293  json_file.write(config[0])
294 
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
#define str(s)
def o2olib.O2OJobMgr.getSession (   self,
  db_service,
  role,
  authPath 
)

Definition at line 108 of file o2olib.py.

References o2olib.O2OJobMgr.eng, and str.

109  def getSession( self, db_service, role, authPath ):
110  url = None
111  if db_service is None:
112  url = private_db
113  else:
114  self.logger.info('Getting credentials')
115  if authPath is not None:
116  if not os.path.exists(authPath):
117  self.logger.error('Authentication path %s is invalid.' %authPath)
118  return None
119  try:
120  (username, account, pwd) = auth.get_credentials_for_schema( db_service, schema_name, role, authPath )
121  except Exception as e:
122  self.logger.debug(str(e))
123  username = None
124  pwd = None
125  if username is None:
126  self.logger.error('Credentials for service %s are not available' %db_service)
127  raise Exception("Cannot connect to db %s" %db_service )
128  url = sqlalchemy_tpl %(username,pwd,db_service)
129  session = None
130  try:
131  self.eng = sqlalchemy.create_engine( url, max_identifier_length=30)
132  session = sqlalchemy.orm.scoped_session( sqlalchemy.orm.sessionmaker(bind=self.eng))
133  except sqlalchemy.exc.SQLAlchemyError as dberror:
134  self.logger.error( str(dberror) )
135  return session
#define str(s)
def o2olib.O2OJobMgr.listConfig (   self,
  jname 
)

Definition at line 246 of file o2olib.py.

References print(), and str.

247  def listConfig( self, jname ):
248  res = self.session.query(O2OJob.enabled).filter_by(name=jname)
249  enabled = None
250  for r in res:
251  enabled = r
252  if enabled is None:
253  self.logger.error( "A job called '%s' does not exist.", jname )
254  return
255  res = self.session.query( O2OJobConf.configuration, O2OJobConf.insertion_time ).filter_by(job_name=jname).order_by(O2OJobConf.insertion_time)
256  configs = []
257  for r in res:
258  configs.append((str(r[0]),r[1]))
259  ind = len(configs)
260  if ind:
261  print("Configurations for job '%s'" %jname)
262  for cf in reversed(configs):
263  print('#%2d since: %s' %(ind,cf[1]))
264  print(cf[0])
265  ind -= 1
266  else:
267  self.logger.info("No configuration found for job '%s'" %jname )
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
#define str(s)
def o2olib.O2OJobMgr.listJobs (   self)

Definition at line 223 of file o2olib.py.

References python.cmstools.all(), o2olib.print_table(), and str.

Referenced by o2olib.O2OTool.execute().

224  def listJobs( self ):
225  runs = {}
226  res = self.session.query(O2ORun.job_name,sqlalchemy.func.max(O2ORun.start_time)).group_by(O2ORun.job_name).order_by(O2ORun.job_name)
227  for r in res:
228  runs[r[0]] = str(r[1])
229  res = self.session.query(O2OJob.name, O2OJob.interval, O2OJob.enabled, O2OJob.frequent).order_by(O2OJob.name).all()
230  table = []
231  for r in res:
232  row = []
233  row.append(r[0]),
234  row.append('%5d' %r[1] )
235  frequent = 'Y' if (r[3]==1) else 'N'
236  row.append('%4s' %frequent )
237  enabled = 'Y' if (r[2]==1) else 'N'
238  row.append('%4s' %enabled )
239  lastRun = '-'
240  if r[0] in runs.keys():
241  lastRun = runs[r[0]]
242  row.append( lastRun )
243  table.append(row)
244  headers = ['Job name','Interval','Frequent','Enabled','Last run start']
245  print_table( headers, table )
def print_table
Definition: o2olib.py:58
def all
workaround iterator generators for ROOT classes
Definition: cmstools.py:25
#define str(s)
def o2olib.O2OJobMgr.runManager (   self)

Definition at line 145 of file o2olib.py.

References o2olib.O2OJobMgr.db_connection, crabFunctions.CrabController.logger, o2olib.O2OJobMgr.logger, querying.connection.session, and o2olib.O2OJobMgr.session.

146  def runManager( self ):
147  return O2ORunMgr( self.db_connection, self.session, self.logger )
def o2olib.O2OJobMgr.set (   self,
  job_name,
  en_flag,
  fr_val = None 
)

Definition at line 169 of file o2olib.py.

170  def set( self, job_name, en_flag, fr_val=None ):
171  res = self.session.query(O2OJob.enabled).filter_by(name=job_name)
172  enabled = None
173  for r in res:
174  enabled = r
175  if enabled is None:
176  self.logger.error( "A job called '%s' does not exist.", job_name )
177  return
178  if en_flag is not None and enabled != en_flag:
179  job = O2OJob(name=job_name,enabled=en_flag)
180  self.session.merge(job)
181  self.session.commit()
182  action = 'enabled'
183  if not en_flag:
184  action = 'disabled'
185  self.logger.info( "Job '%s' %s." %(job_name,action) )
186  if fr_val is not None:
187  job = O2OJob(name=job_name,frequent=fr_val)
188  self.session.merge(job)
189  self.session.commit()
190  if fr_val==1:
191  self.logger.info( "Job '%s' set 'frequent'" %job_name)
192  else:
193  self.logger.info( "Job '%s' unset 'frequent'" %job_name)
def o2olib.O2OJobMgr.setConfig (   self,
  job_name,
  configJson 
)

Definition at line 194 of file o2olib.py.

Referenced by o2olib.O2OTool.execute().

195  def setConfig( self, job_name, configJson ):
196  if configJson == '':
197  return False
198  res = self.session.query(O2OJob.enabled).filter_by(name=job_name)
199  enabled = None
200  for r in res:
201  enabled = r
202  if enabled is None:
203  self.logger.error( "A job called '%s' does not exist.", job_name )
204  return
205  config = O2OJobConf( job_name=job_name, insertion_time = datetime.utcnow(), configuration = configJson )
206  self.session.add(config)
207  self.session.commit()
208  self.logger.info( "New configuration inserted for job '%s'", job_name )
209  return True
def o2olib.O2OJobMgr.setInterval (   self,
  job_name,
  int_val 
)

Definition at line 210 of file o2olib.py.

Referenced by o2olib.O2OTool.execute().

211  def setInterval( self, job_name, int_val ):
212  res = self.session.query(O2OJob.enabled).filter_by(name=job_name)
213  enabled = None
214  for r in res:
215  enabled = r
216  if enabled is None:
217  self.logger.error( "A job called '%s' does not exist.", job_name )
218  return
219  job = O2OJob(name=job_name,interval=int_val)
220  self.session.merge(job)
221  self.session.commit()
222  self.logger.info( "The execution interval for job '%s' has been updated.", job_name )

Member Data Documentation

o2olib.O2OJobMgr.conf_dict

Definition at line 93 of file o2olib.py.

Referenced by o2olib.O2ORunMgr.executeJob().

o2olib.O2OJobMgr.db_connection

Definition at line 92 of file o2olib.py.

Referenced by o2olib.O2OJobMgr.runManager().

o2olib.O2OJobMgr.eng

Definition at line 106 of file o2olib.py.

Referenced by o2olib.O2OJobMgr.getSession().

o2olib.O2OJobMgr.logger

Definition at line 101 of file o2olib.py.

Referenced by o2olib.O2OJobMgr.runManager().

o2olib.O2OJobMgr.session

Definition at line 137 of file o2olib.py.

Referenced by o2olib.O2OJobMgr.runManager().

o2olib.O2OJobMgr.verbose

Definition at line 138 of file o2olib.py.

Referenced by python.diff_provenance.difference.list_diff(), python.diffProv.difference.list_diff(), python.diff_provenance.difference.module_diff(), python.diffProv.difference.module_diff(), python.diff_provenance.difference.onefilemodules(), python.diffProv.difference.onefilemodules(), core.TriggerMatchAnalyzer.TriggerMatchAnalyzer.process(), core.SkimAnalyzerCount.SkimAnalyzerCount.process(), objects.VertexAnalyzer.VertexAnalyzer.process(), and confdbOfflineConverter.OfflineConverter.query().