3 connection class translates either a connection string for sqlite, oracle of frontier into a connection object.
4 Also sets up ORM with SQLAlchemy.
6 connection class can also take a pre-constructed engine - useful for web services.
9 from __future__
import print_function
10 from __future__
import absolute_import
13 from sqlalchemy
import create_engine, text, or_
14 from sqlalchemy.orm
import sessionmaker
15 from sqlalchemy.pool
import NullPool
17 from .data_sources
import json_data_node
18 from copy
import deepcopy
29 connection_data =
None
30 netrc_authenticators =
None
34 Given a connection string, parses the connection string and connects.
37 def __init__(self, connection_data, mode=None, map_blobs=False, secrets=None, pooling=False):
47 self.
regexp.connection_object = self
49 if type(connection_data)
in [str, unicode]:
61 engine_string =
str(connection_data)
63 if "oracle" in engine_string:
65 elif "frontier" in engine_string:
67 elif "sqlite" in engine_string:
70 self.
range.database_type = db_type
71 self.
radius.database_type = db_type
72 self.
regexp.database_type = db_type
74 from .
import models
as ms
80 Setup engine with given credentials from netrc file, and make a session maker.
97 if self.
models[key].__class__ == sqlalchemy.ext.declarative.api.DeclarativeMeta\
98 and str(self.
models[key].__name__) !=
"Base":
111 self.
models[key].connection = self
112 tmp_models_dict[key.lower()] = self.
models[key]
113 tmp_models_dict[key.lower()].empty =
False
115 self.
models = tmp_models_dict
123 return subprocess.Popen([
'cmsGetFnConnect',
'frontier://%s' % database], stdout = subprocess.PIPE).
communicate()[0].
strip()
125 raise Exception(
"Frontier connections can only be constructed when inside a CMSSW environment.")
130 Get database string for frontier.
133 return 'oracle+frontier://@%s/%s' % (urllib.quote_plus(connection._get_CMS_frontier_connection_string(database)), schema)
138 Get database string for oracle.
140 return 'oracle://%s:%s@%s' % (user, pwd, db_name)
145 Build the connection url, and get credentials from self.secrets dictionary.
148 database_url = connection._cms_oracle_string(user, pwd, db_name)
151 url = sqlalchemy.engine.url.make_url(database_url)
152 if url.password
is None:
154 except sqlalchemy.exc.ArgumentError:
155 url = sqlalchemy.engine.url.make_url(
'sqlite:///%s' % db_name)
160 database_url = connection._cms_frontier_string(db_name, schema)
163 url = sqlalchemy.engine.url.make_url(database_url)
164 except sqlalchemy.exc.ArgumentError:
166 Is this needed for a use case?
168 url = sqlalchemy.engine.url.make_url(
'sqlite:///%s' % db_name)
178 return "Couldn't tear down connection on engine %s." %
str(self.
engine)
190 if model_name.__class__ == sqlalchemy.ext.declarative.api.DeclarativeMeta:
191 model_name = model_name.__name__
192 model_name = model_name.replace(
"_",
"")
193 return self.
models[model_name]
201 for pk
in pk_to_value:
202 model_data = model_data.filter(model.__dict__[pk] == pk_to_value[pk])
203 return model_data.first()
211 """def global_tag_map_request(self, **pkargs):
212 return self.factory.object("globaltagmaprequest", **pkargs)"""
223 """def record(self, **pkargs):
224 return self.factory.object("record", **pkargs)"""
228 return "%%%s%%" % string
234 gt = self.
model(
"globaltag")
236 gt.name.ilike(string),
237 gt.description.ilike(string),
238 gt.release.ilike(string)
240 tag = self.
model(
"tag")
242 tag.name.ilike(string),
243 tag.object_type.ilike(string),
244 tag.description.ilike(string))
246 iov = self.
model(
"iov")
248 iov.tag_name.ilike(string),
249 iov.since.ilike(string),
250 iov.payload_hash.ilike(string),
251 iov.insertion_time.ilike(string)
253 payload = self.
model(
"payload")
255 payload.hash.ilike(string),
256 payload.object_type.ilike(string),
257 payload.insertion_time.ilike(string)
260 return json_data_node.make({
261 "global_tags" : global_tags.all(),
264 "payloads" : payloads.all()
276 traceback.print_exc()
280 if isinstance(object, list):
292 traceback.print_exc()
293 print(
"Session couldn't be rolled back.")
297 Contains methods for creating objects.
306 from .data_sources
import json_list
307 from .models
import apply_filters
315 model_data = self.
connection.session.query(model)
316 if len(pkargs.items()) != 0:
319 amount = pkargs[
"amount"]
if "amount" in pkargs.keys()
else None
320 model_data = model_data.limit(amount)
321 if model_data.count() > 1:
323 return json_list(model_data.all())
324 elif model_data.count() == 1:
326 return model_data.first()
333 new_object.empty =
True
338 Returns a dictionary {login : ..., account : ..., password : ...}
341 headers = [
"login",
"account",
"password"]
342 authenticator_tuple = netrc.netrc(netrc_file).authenticators(key)
343 if authenticator_tuple ==
None:
344 raise Exception(
"netrc file must contain key '%s'." % key)
346 raise Exception(
"Couldn't get credentials from netrc file.")
347 return dict(
zip(headers, authenticator_tuple))
351 Function used to construct connection data dictionaries - internal to framework.
353 frontier_str_length = len(
"frontier://")
354 sqlite_str_length = len(
"sqlite://")
356 oracle_str_length = len(
"oracle://")
358 if type(connection_data)
in [str, unicode]
and connection_data[0:frontier_str_length] ==
"frontier://":
360 frontier://database_name/schema
362 db_name = connection_data[frontier_str_length:].
split(
"/")[0]
363 schema = connection_data[frontier_str_length:].
split(
"/")[1]
365 connection_data[
"database_name"] = db_name
366 connection_data[
"schema"] = schema
367 connection_data[
"host"] =
"frontier"
368 connection_data[
"secrets"] =
None
369 elif type(connection_data)
in [str, unicode]
and connection_data[0:sqlite_str_length] ==
"sqlite://":
371 sqlite://database_file_name
374 db_name = connection_data[sqlite_str_length:]
377 connection_data[
"database_name"] = os.path.abspath(db_name)
378 connection_data[
"schema"] = schema
379 connection_data[
"host"] =
"sqlite"
380 connection_data[
"secrets"] =
None
381 elif type(connection_data)
in [str, unicode]
and connection_data[0:oracle_str_length] ==
"oracle://":
383 oracle://account:password@database_name
385 oracle://database_name/schema (requires a separate method of authentication - either dictionary or netrc)
387 new_connection_string = connection_data[oracle_str_length:]
389 if ":" in new_connection_string:
391 database_name = new_connection_string[new_connection_string.index(
"@")+1:]
392 schema_name = new_connection_string[0:new_connection_string.index(
":")]
394 username = new_connection_string[0:new_connection_string.index(
":")]
395 password = new_connection_string[new_connection_string.index(
":")+1:new_connection_string.index(
"@")]
397 mode_to_netrc_key_suffix = {
"r" :
"read",
"w" :
"write"}
398 database_name = new_connection_string[0:new_connection_string.index(
"/")]
399 schema_name = new_connection_string[new_connection_string.index(
"/")+1:]
401 username =
str(raw_input(
"Enter the username you want to connect to the schema '%s' with: " % (schema_name)))
402 password =
str(raw_input(
"Enter the password for the user '%s' in database '%s': " % (username, database_name)))
404 if isinstance(secrets, str):
405 netrc_key =
"%s/%s/%s" % (database_name, schema_name, mode_to_netrc_key_suffix[mode])
409 username = netrc_data[
"login"]
410 password = netrc_data[
"password"]
411 elif isinstance(secrets, dict):
412 username = secrets[
"user"]
413 password = secrets[
"password"]
415 raise Exception(
"Invalid type given for secrets. Either an str or a dict must be given.")
420 connection_data[
"database_name"] = database_name
421 connection_data[
"schema"] = schema_name
422 connection_data[
"password"] = password
423 connection_data[
"host"] =
"oracle"
424 connection_data[
"secrets"] = {
"login" : username,
"password" : password}
426 return connection_data
429 if dictionary[
"host"] !=
"sqlite":
430 if dictionary[
"host"] !=
"frontier":
433 user = dictionary[
"secrets"][
"login"]
434 pwd = dictionary[
"secrets"][
"password"]
437 return create_engine(connection.build_oracle_url(user, pwd, dictionary[
"database_name"]), label_length=6)
439 return create_engine(connection.build_oracle_url(user, pwd, dictionary[
"database_name"]), label_length=6, poolclass=NullPool)
444 return create_engine(connection.build_frontier_url(dictionary[
"database_name"], dictionary[
"schema"]), label_length=6)
446 return create_engine(connection.build_frontier_url(dictionary[
"database_name"], dictionary[
"schema"]), label_length=6, poolclass=NullPool)
449 return create_engine(
"sqlite:///%s" % dictionary[
"database_name"])
452 def connect(connection_data, mode="r", map_blobs=False, secrets=None, pooling=True):
454 Utility method for user - set up a connection object.
456 con =
connection(connection_data=connection_data, mode=mode, map_blobs=map_blobs, secrets=secrets, pooling=pooling)