3 connection class translates either a connection string for sqlite, oracle of frontier into a connection object. 4 Also sets up ORM with SQLAlchemy. 6 connection class can also take a pre-constructed engine - useful for web services. 9 from __future__
import print_function
10 from __future__
import absolute_import
13 from sqlalchemy
import create_engine, text, or_
14 from sqlalchemy.orm
import sessionmaker
15 from sqlalchemy.pool
import NullPool
17 from .data_sources
import json_data_node
18 from copy
import deepcopy
29 connection_data =
None 30 netrc_authenticators =
None 34 Given a connection string, parses the connection string and connects. 37 def __init__(self, connection_data, mode=None, map_blobs=False, secrets=None, pooling=False):
47 self.regexp.connection_object = self
49 if type(connection_data)
in [str, unicode]:
52 self.
schema = self.connection_data.get(
"schema")
if self.connection_data.get(
"schema") !=
None else "" 61 engine_string =
str(connection_data)
63 if "oracle" in engine_string:
65 elif "frontier" in engine_string:
67 elif "sqlite" in engine_string:
70 self.range.database_type = db_type
71 self.radius.database_type = db_type
72 self.regexp.database_type = db_type
74 from .
import models
as ms
80 Setup engine with given credentials from netrc file, and make a session maker. 97 if self.
models[key].__class__ == sqlalchemy.ext.declarative.api.DeclarativeMeta\
98 and str(self.
models[key].__name__) !=
"Base":
111 self.
models[key].connection = self
112 tmp_models_dict[key.lower()] = self.
models[key]
113 tmp_models_dict[key.lower()].empty =
False 115 self.
models = tmp_models_dict
123 return subprocess.Popen([
'cmsGetFnConnect',
'frontier://%s' % database], stdout = subprocess.PIPE).
communicate()[0].
strip()
125 raise Exception(
"Frontier connections can only be constructed when inside a CMSSW environment.")
130 Get database string for frontier. 133 return 'oracle+frontier://@%s/%s' % (urllib.quote_plus(connection._get_CMS_frontier_connection_string(database)), schema)
138 Get database string for oracle. 140 return 'oracle://%s:%s@%s' % (user, pwd, db_name)
145 Build the connection url, and get credentials from self.secrets dictionary. 148 database_url = connection._cms_oracle_string(user, pwd, db_name)
151 url = sqlalchemy.engine.url.make_url(database_url)
152 if url.password
is None:
154 except sqlalchemy.exc.ArgumentError:
155 url = sqlalchemy.engine.url.make_url(
'sqlite:///%s' % db_name)
160 database_url = connection._cms_frontier_string(db_name, schema)
163 url = sqlalchemy.engine.url.make_url(database_url)
164 except sqlalchemy.exc.ArgumentError:
166 Is this needed for a use case? 168 url = sqlalchemy.engine.url.make_url(
'sqlite:///%s' % db_name)
175 self.session.commit()
178 return "Couldn't tear down connection on engine %s." %
str(self.
engine)
185 self.engine.dispose()
190 if model_name.__class__ == sqlalchemy.ext.declarative.api.DeclarativeMeta:
191 model_name = model_name.__name__
192 model_name = model_name.replace(
"_",
"")
193 return self.
models[model_name]
200 model_data = self.session.query(model)
201 for pk
in pk_to_value:
202 model_data = model_data.filter(model.__dict__[pk] == pk_to_value[pk])
203 return model_data.first()
206 return self.factory.object(
"globaltag", **pkargs)
209 return self.factory.object(
"globaltagmap", **pkargs)
211 """def global_tag_map_request(self, **pkargs): 212 return self.factory.object("globaltagmaprequest", **pkargs)""" 215 return self.factory.object(
"tag", **pkargs)
218 return self.factory.object(
"iov", **pkargs)
221 return self.factory.object(
"payload", **pkargs)
223 """def record(self, **pkargs): 224 return self.factory.object("record", **pkargs)""" 228 return "%%%s%%" % string
234 gt = self.
model(
"globaltag")
235 global_tags = self.session.query(gt).
filter(or_(
236 gt.name.ilike(string),
237 gt.description.ilike(string),
238 gt.release.ilike(string)
240 tag = self.
model(
"tag")
241 tags = self.session.query(tag).
filter(or_(
242 tag.name.ilike(string),
243 tag.object_type.ilike(string),
244 tag.description.ilike(string))
246 iov = self.
model(
"iov")
247 iovs = self.session.query(iov).
filter(or_(
248 iov.tag_name.ilike(string),
249 iov.since.ilike(string),
250 iov.payload_hash.ilike(string),
251 iov.insertion_time.ilike(string)
253 payload = self.
model(
"payload")
254 payloads = self.session.query(payload).
filter(or_(
255 payload.hash.ilike(string),
256 payload.object_type.ilike(string),
257 payload.insertion_time.ilike(string)
260 return json_data_node.make({
261 "global_tags" : global_tags.all(),
264 "payloads" : payloads.all()
269 self.session.add(new_object)
274 self.session.commit()
276 traceback.print_exc()
277 self.session.rollback()
280 if isinstance(object, list):
290 self.session.rollback()
292 traceback.print_exc()
293 print(
"Session couldn't be rolled back.")
297 Contains methods for creating objects. 306 from .data_sources
import json_list
307 from .models
import apply_filters
309 model = self.connection.model(class_name)
311 if self.connection.session ==
None:
315 model_data = self.connection.session.query(model)
316 if len(pkargs.items()) != 0:
319 amount = pkargs[
"amount"]
if "amount" in pkargs.keys()
else None 320 model_data = model_data.limit(amount)
321 if model_data.count() > 1:
323 return json_list(model_data.all())
324 elif model_data.count() == 1:
326 return model_data.first()
333 new_object.empty =
True 338 Returns a dictionary {login : ..., account : ..., password : ...} 341 headers = [
"login",
"account",
"password"]
342 authenticator_tuple = netrc.netrc(netrc_file).authenticators(key)
343 if authenticator_tuple ==
None:
344 raise Exception(
"netrc file must contain key '%s'." % key)
346 raise Exception(
"Couldn't get credentials from netrc file.")
347 return dict(
zip(headers, authenticator_tuple))
351 Function used to construct connection data dictionaries - internal to framework. 353 frontier_str_length = len(
"frontier://")
354 sqlite_str_length = len(
"sqlite://")
356 oracle_str_length = len(
"oracle://")
358 if type(connection_data)
in [str, unicode]
and connection_data[0:frontier_str_length] ==
"frontier://":
360 frontier://database_name/schema 362 db_name = connection_data[frontier_str_length:].
split(
"/")[0]
363 schema = connection_data[frontier_str_length:].
split(
"/")[1]
365 connection_data[
"database_name"] = db_name
366 connection_data[
"schema"] = schema
367 connection_data[
"host"] =
"frontier" 368 connection_data[
"secrets"] =
None 369 elif type(connection_data)
in [str, unicode]
and connection_data[0:sqlite_str_length] ==
"sqlite://":
371 sqlite://database_file_name 374 db_name = connection_data[sqlite_str_length:]
377 connection_data[
"database_name"] = os.path.abspath(db_name)
378 connection_data[
"schema"] = schema
379 connection_data[
"host"] =
"sqlite" 380 connection_data[
"secrets"] =
None 381 elif type(connection_data)
in [str, unicode]
and connection_data[0:oracle_str_length] ==
"oracle://":
383 oracle://account:password@database_name 385 oracle://database_name/schema (requires a separate method of authentication - either dictionary or netrc) 387 new_connection_string = connection_data[oracle_str_length:]
389 if ":" in new_connection_string:
391 database_name = new_connection_string[new_connection_string.index(
"@")+1:]
392 schema_name = new_connection_string[0:new_connection_string.index(
":")]
394 username = new_connection_string[0:new_connection_string.index(
":")]
395 password = new_connection_string[new_connection_string.index(
":")+1:new_connection_string.index(
"@")]
397 mode_to_netrc_key_suffix = {
"r" : "read", "w" : "write"}
398 database_name = new_connection_string[0:new_connection_string.index(
"/")]
399 schema_name = new_connection_string[new_connection_string.index(
"/")+1:]
401 username =
str(raw_input(
"Enter the username you want to connect to the schema '%s' with: " % (schema_name)))
402 password =
str(raw_input(
"Enter the password for the user '%s' in database '%s': " % (username, database_name)))
404 if isinstance(secrets, str):
405 netrc_key =
"%s/%s/%s" % (database_name, schema_name, mode_to_netrc_key_suffix[mode])
409 username = netrc_data[
"login"]
410 password = netrc_data[
"password"]
411 elif isinstance(secrets, dict):
412 username = secrets[
"user"]
413 password = secrets[
"password"]
415 raise Exception(
"Invalid type given for secrets. Either an str or a dict must be given.")
420 connection_data[
"database_name"] = database_name
421 connection_data[
"schema"] = schema_name
422 connection_data[
"password"] = password
423 connection_data[
"host"] =
"oracle" 424 connection_data[
"secrets"] = {
"login" : username,
"password" : password}
426 return connection_data
429 if dictionary[
"host"] !=
"sqlite":
430 if dictionary[
"host"] !=
"frontier":
433 user = dictionary[
"secrets"][
"login"]
434 pwd = dictionary[
"secrets"][
"password"]
437 return create_engine(connection.build_oracle_url(user, pwd, dictionary[
"database_name"]), label_length=6)
439 return create_engine(connection.build_oracle_url(user, pwd, dictionary[
"database_name"]), label_length=6, poolclass=NullPool)
444 return create_engine(connection.build_frontier_url(dictionary[
"database_name"], dictionary[
"schema"]), label_length=6)
446 return create_engine(connection.build_frontier_url(dictionary[
"database_name"], dictionary[
"schema"]), label_length=6, poolclass=NullPool)
449 return create_engine(
"sqlite:///%s" % dictionary[
"database_name"])
452 def connect(connection_data, mode="r", map_blobs=False, secrets=None, pooling=True):
454 Utility method for user - set up a connection object. 456 con =
connection(connection_data=connection_data, mode=mode, map_blobs=map_blobs, secrets=secrets, pooling=pooling)
def global_tag(self, pkargs)
def session_independent_object(object, schema=None)
def connect(connection_data, mode="r", map_blobs=False, secrets=None, pooling=True)
def build_frontier_url(db_name, schema)
def object(self, class_name, pkargs)
def object(self, model, pk_to_value)
S & print(S &os, JobReport::InputFile const &f)
static void * communicate(void *obj)
def model(self, model_name)
def write_and_commit(self, object)
def __init__(self, connection)
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
def global_tag_map(self, pkargs)
def _cms_frontier_string(database, schema="cms_conditions")
def engine_from_dictionary(dictionary, pooling=True)
def payload(self, pkargs)
def build_oracle_url(user, pwd, db_name)
def _get_CMS_frontier_connection_string(database)
def new_connection_dictionary(connection_data, secrets=None, mode="r")
def apply_filters(orm_query, orm_class, filters)
def _oracle_match_format(self, string)
def __init__(self, connection_data, mode=None, map_blobs=False, secrets=None, pooling=False)
def search_everything(self, string, amount=10)
def _get_netrc_data(netrc_file, key)
def _cms_oracle_string(user, pwd, db_name)