3 connection class translates either a connection string for sqlite, oracle of frontier into a connection object. 4 Also sets up ORM with SQLAlchemy. 6 connection class can also take a pre-constructed engine - useful for web services. 11 from sqlalchemy
import create_engine, text, or_
12 from sqlalchemy.orm
import sessionmaker
13 from sqlalchemy.pool
import NullPool
15 from data_sources
import json_data_node
16 from copy
import deepcopy
27 connection_data =
None 28 netrc_authenticators =
None 32 Given a connection string, parses the connection string and connects. 35 def __init__(self, connection_data, mode=None, map_blobs=False, secrets=None, pooling=False):
45 self.regexp.connection_object = self
47 if type(connection_data)
in [str, unicode]:
50 self.
schema = self.connection_data.get(
"schema")
if self.connection_data.get(
"schema") !=
None else "" 59 engine_string =
str(connection_data)
61 if "oracle" in engine_string:
63 elif "frontier" in engine_string:
65 elif "sqlite" in engine_string:
68 self.range.database_type = db_type
69 self.radius.database_type = db_type
70 self.regexp.database_type = db_type
78 Setup engine with given credentials from netrc file, and make a session maker. 95 if self.
models[key].__class__ == sqlalchemy.ext.declarative.api.DeclarativeMeta\
96 and str(self.
models[key].__name__) !=
"Base":
109 self.
models[key].connection = self
110 tmp_models_dict[key.lower()] = self.
models[key]
111 tmp_models_dict[key.lower()].empty =
False 113 self.
models = tmp_models_dict
121 return subprocess.Popen([
'cmsGetFnConnect',
'frontier://%s' % database], stdout = subprocess.PIPE).
communicate()[0].
strip()
123 raise Exception(
"Frontier connections can only be constructed when inside a CMSSW environment.")
128 Get database string for frontier. 131 return 'oracle+frontier://@%s/%s' % (urllib.quote_plus(connection._get_CMS_frontier_connection_string(database)), schema)
136 Get database string for oracle. 138 return 'oracle://%s:%s@%s' % (user, pwd, db_name)
143 Build the connection url, and get credentials from self.secrets dictionary. 146 database_url = connection._cms_oracle_string(user, pwd, db_name)
149 url = sqlalchemy.engine.url.make_url(database_url)
150 if url.password
is None:
152 except sqlalchemy.exc.ArgumentError:
153 url = sqlalchemy.engine.url.make_url(
'sqlite:///%s' % db_name)
158 database_url = connection._cms_frontier_string(db_name, schema)
161 url = sqlalchemy.engine.url.make_url(database_url)
162 except sqlalchemy.exc.ArgumentError:
164 Is this needed for a use case? 166 url = sqlalchemy.engine.url.make_url(
'sqlite:///%s' % db_name)
173 self.session.commit()
176 return "Couldn't tear down connection on engine %s." %
str(self.
engine)
183 self.engine.dispose()
188 if model_name.__class__ == sqlalchemy.ext.declarative.api.DeclarativeMeta:
189 model_name = model_name.__name__
190 model_name = model_name.replace(
"_",
"")
191 return self.
models[model_name]
198 model_data = self.session.query(model)
199 for pk
in pk_to_value:
200 model_data = model_data.filter(model.__dict__[pk] == pk_to_value[pk])
201 return model_data.first()
204 return self.factory.object(
"globaltag", **pkargs)
207 return self.factory.object(
"globaltagmap", **pkargs)
209 """def global_tag_map_request(self, **pkargs): 210 return self.factory.object("globaltagmaprequest", **pkargs)""" 213 return self.factory.object(
"tag", **pkargs)
216 return self.factory.object(
"iov", **pkargs)
219 return self.factory.object(
"payload", **pkargs)
221 """def record(self, **pkargs): 222 return self.factory.object("record", **pkargs)""" 226 return "%%%s%%" % string
232 gt = self.
model(
"globaltag")
233 global_tags = self.session.query(gt).
filter(or_(
234 gt.name.ilike(string),
235 gt.description.ilike(string),
236 gt.release.ilike(string)
238 tag = self.
model(
"tag")
239 tags = self.session.query(tag).
filter(or_(
240 tag.name.ilike(string),
241 tag.object_type.ilike(string),
242 tag.description.ilike(string))
244 iov = self.
model(
"iov")
245 iovs = self.session.query(iov).
filter(or_(
246 iov.tag_name.ilike(string),
247 iov.since.ilike(string),
248 iov.payload_hash.ilike(string),
249 iov.insertion_time.ilike(string)
251 payload = self.
model(
"payload")
252 payloads = self.session.query(payload).
filter(or_(
253 payload.hash.ilike(string),
254 payload.object_type.ilike(string),
255 payload.insertion_time.ilike(string)
258 return json_data_node.make({
259 "global_tags" : global_tags.all(),
262 "payloads" : payloads.all()
267 self.session.add(new_object)
272 self.session.commit()
274 traceback.print_exc()
275 self.session.rollback()
278 if isinstance(object, list):
288 self.session.rollback()
290 traceback.print_exc()
291 print(
"Session couldn't be rolled back.")
295 Contains methods for creating objects. 304 from data_sources
import json_list
305 from models
import apply_filters
307 model = self.connection.model(class_name)
309 if self.connection.session ==
None:
313 model_data = self.connection.session.query(model)
314 if len(pkargs.items()) != 0:
317 amount = pkargs[
"amount"]
if "amount" in pkargs.keys()
else None 318 model_data = model_data.limit(amount)
319 if model_data.count() > 1:
321 return json_list(model_data.all())
322 elif model_data.count() == 1:
324 return model_data.first()
331 new_object.empty =
True 336 Returns a dictionary {login : ..., account : ..., password : ...} 339 headers = [
"login",
"account",
"password"]
340 authenticator_tuple = netrc.netrc(netrc_file).authenticators(key)
341 if authenticator_tuple ==
None:
342 raise Exception(
"netrc file must contain key '%s'." % key)
344 raise Exception(
"Couldn't get credentials from netrc file.")
345 return dict(
zip(headers, authenticator_tuple))
349 Function used to construct connection data dictionaries - internal to framework. 351 frontier_str_length = len(
"frontier://")
352 sqlite_str_length = len(
"sqlite://")
354 oracle_str_length = len(
"oracle://")
356 if type(connection_data)
in [str, unicode]
and connection_data[0:frontier_str_length] ==
"frontier://":
358 frontier://database_name/schema 360 db_name = connection_data[frontier_str_length:].
split(
"/")[0]
361 schema = connection_data[frontier_str_length:].
split(
"/")[1]
363 connection_data[
"database_name"] = db_name
364 connection_data[
"schema"] = schema
365 connection_data[
"host"] =
"frontier" 366 connection_data[
"secrets"] =
None 367 elif type(connection_data)
in [str, unicode]
and connection_data[0:sqlite_str_length] ==
"sqlite://":
369 sqlite://database_file_name 372 db_name = connection_data[sqlite_str_length:]
375 connection_data[
"database_name"] = os.path.abspath(db_name)
376 connection_data[
"schema"] = schema
377 connection_data[
"host"] =
"sqlite" 378 connection_data[
"secrets"] =
None 379 elif type(connection_data)
in [str, unicode]
and connection_data[0:oracle_str_length] ==
"oracle://":
381 oracle://account:password@database_name 383 oracle://database_name/schema (requires a separate method of authentication - either dictionary or netrc) 385 new_connection_string = connection_data[oracle_str_length:]
387 if ":" in new_connection_string:
389 database_name = new_connection_string[new_connection_string.index(
"@")+1:]
390 schema_name = new_connection_string[0:new_connection_string.index(
":")]
392 username = new_connection_string[0:new_connection_string.index(
":")]
393 password = new_connection_string[new_connection_string.index(
":")+1:new_connection_string.index(
"@")]
395 mode_to_netrc_key_suffix = {
"r" : "read", "w" : "write"}
396 database_name = new_connection_string[0:new_connection_string.index(
"/")]
397 schema_name = new_connection_string[new_connection_string.index(
"/")+1:]
399 username =
str(raw_input(
"Enter the username you want to connect to the schema '%s' with: " % (schema_name)))
400 password =
str(raw_input(
"Enter the password for the user '%s' in database '%s': " % (username, database_name)))
402 if isinstance(secrets, str):
403 netrc_key =
"%s/%s/%s" % (database_name, schema_name, mode_to_netrc_key_suffix[mode])
407 username = netrc_data[
"login"]
408 password = netrc_data[
"password"]
409 elif isinstance(secrets, dict):
410 username = secrets[
"user"]
411 password = secrets[
"password"]
413 raise Exception(
"Invalid type given for secrets. Either an str or a dict must be given.")
418 connection_data[
"database_name"] = database_name
419 connection_data[
"schema"] = schema_name
420 connection_data[
"password"] = password
421 connection_data[
"host"] =
"oracle" 422 connection_data[
"secrets"] = {
"login" : username,
"password" : password}
424 return connection_data
427 if dictionary[
"host"] !=
"sqlite":
428 if dictionary[
"host"] !=
"frontier":
431 user = dictionary[
"secrets"][
"login"]
432 pwd = dictionary[
"secrets"][
"password"]
435 return create_engine(connection.build_oracle_url(user, pwd, dictionary[
"database_name"]), label_length=6)
437 return create_engine(connection.build_oracle_url(user, pwd, dictionary[
"database_name"]), label_length=6, poolclass=NullPool)
442 return create_engine(connection.build_frontier_url(dictionary[
"database_name"], dictionary[
"schema"]), label_length=6)
444 return create_engine(connection.build_frontier_url(dictionary[
"database_name"], dictionary[
"schema"]), label_length=6, poolclass=NullPool)
447 return create_engine(
"sqlite:///%s" % dictionary[
"database_name"])
450 def connect(connection_data, mode="r", map_blobs=False, secrets=None, pooling=True):
452 Utility method for user - set up a connection object. 454 con =
connection(connection_data=connection_data, mode=mode, map_blobs=map_blobs, secrets=secrets, pooling=pooling)
def global_tag(self, pkargs)
def session_independent_object(object, schema=None)
def connect(connection_data, mode="r", map_blobs=False, secrets=None, pooling=True)
def build_frontier_url(db_name, schema)
def object(self, class_name, pkargs)
def object(self, model, pk_to_value)
S & print(S &os, JobReport::InputFile const &f)
static void * communicate(void *obj)
def model(self, model_name)
def write_and_commit(self, object)
def __init__(self, connection)
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
def global_tag_map(self, pkargs)
def _cms_frontier_string(database, schema="cms_conditions")
def engine_from_dictionary(dictionary, pooling=True)
def payload(self, pkargs)
def build_oracle_url(user, pwd, db_name)
def _get_CMS_frontier_connection_string(database)
def new_connection_dictionary(connection_data, secrets=None, mode="r")
def apply_filters(orm_query, orm_class, filters)
def _oracle_match_format(self, string)
def __init__(self, connection_data, mode=None, map_blobs=False, secrets=None, pooling=False)
def search_everything(self, string, amount=10)
def _get_netrc_data(netrc_file, key)
def _cms_oracle_string(user, pwd, db_name)