3 connection class translates either a connection string for sqlite, oracle of frontier into a connection object. 4 Also sets up ORM with SQLAlchemy. 6 connection class can also take a pre-constructed engine - useful for web services. 9 from __future__
import print_function
12 from sqlalchemy
import create_engine, text, or_
13 from sqlalchemy.orm
import sessionmaker
14 from sqlalchemy.pool
import NullPool
16 from data_sources
import json_data_node
17 from copy
import deepcopy
28 connection_data =
None 29 netrc_authenticators =
None 33 Given a connection string, parses the connection string and connects. 36 def __init__(self, connection_data, mode=None, map_blobs=False, secrets=None, pooling=False):
46 self.regexp.connection_object = self
48 if type(connection_data)
in [str, unicode]:
51 self.
schema = self.connection_data.get(
"schema")
if self.connection_data.get(
"schema") !=
None else "" 60 engine_string =
str(connection_data)
62 if "oracle" in engine_string:
64 elif "frontier" in engine_string:
66 elif "sqlite" in engine_string:
69 self.range.database_type = db_type
70 self.radius.database_type = db_type
71 self.regexp.database_type = db_type
79 Setup engine with given credentials from netrc file, and make a session maker. 96 if self.
models[key].__class__ == sqlalchemy.ext.declarative.api.DeclarativeMeta\
97 and str(self.
models[key].__name__) !=
"Base":
110 self.
models[key].connection = self
111 tmp_models_dict[key.lower()] = self.
models[key]
112 tmp_models_dict[key.lower()].empty =
False 114 self.
models = tmp_models_dict
122 return subprocess.Popen([
'cmsGetFnConnect',
'frontier://%s' % database], stdout = subprocess.PIPE).
communicate()[0].
strip()
124 raise Exception(
"Frontier connections can only be constructed when inside a CMSSW environment.")
129 Get database string for frontier. 132 return 'oracle+frontier://@%s/%s' % (urllib.quote_plus(connection._get_CMS_frontier_connection_string(database)), schema)
137 Get database string for oracle. 139 return 'oracle://%s:%s@%s' % (user, pwd, db_name)
144 Build the connection url, and get credentials from self.secrets dictionary. 147 database_url = connection._cms_oracle_string(user, pwd, db_name)
150 url = sqlalchemy.engine.url.make_url(database_url)
151 if url.password
is None:
153 except sqlalchemy.exc.ArgumentError:
154 url = sqlalchemy.engine.url.make_url(
'sqlite:///%s' % db_name)
159 database_url = connection._cms_frontier_string(db_name, schema)
162 url = sqlalchemy.engine.url.make_url(database_url)
163 except sqlalchemy.exc.ArgumentError:
165 Is this needed for a use case? 167 url = sqlalchemy.engine.url.make_url(
'sqlite:///%s' % db_name)
174 self.session.commit()
177 return "Couldn't tear down connection on engine %s." %
str(self.
engine)
184 self.engine.dispose()
189 if model_name.__class__ == sqlalchemy.ext.declarative.api.DeclarativeMeta:
190 model_name = model_name.__name__
191 model_name = model_name.replace(
"_",
"")
192 return self.
models[model_name]
199 model_data = self.session.query(model)
200 for pk
in pk_to_value:
201 model_data = model_data.filter(model.__dict__[pk] == pk_to_value[pk])
202 return model_data.first()
205 return self.factory.object(
"globaltag", **pkargs)
208 return self.factory.object(
"globaltagmap", **pkargs)
210 """def global_tag_map_request(self, **pkargs): 211 return self.factory.object("globaltagmaprequest", **pkargs)""" 214 return self.factory.object(
"tag", **pkargs)
217 return self.factory.object(
"iov", **pkargs)
220 return self.factory.object(
"payload", **pkargs)
222 """def record(self, **pkargs): 223 return self.factory.object("record", **pkargs)""" 227 return "%%%s%%" % string
233 gt = self.
model(
"globaltag")
234 global_tags = self.session.query(gt).
filter(or_(
235 gt.name.ilike(string),
236 gt.description.ilike(string),
237 gt.release.ilike(string)
239 tag = self.
model(
"tag")
240 tags = self.session.query(tag).
filter(or_(
241 tag.name.ilike(string),
242 tag.object_type.ilike(string),
243 tag.description.ilike(string))
245 iov = self.
model(
"iov")
246 iovs = self.session.query(iov).
filter(or_(
247 iov.tag_name.ilike(string),
248 iov.since.ilike(string),
249 iov.payload_hash.ilike(string),
250 iov.insertion_time.ilike(string)
252 payload = self.
model(
"payload")
253 payloads = self.session.query(payload).
filter(or_(
254 payload.hash.ilike(string),
255 payload.object_type.ilike(string),
256 payload.insertion_time.ilike(string)
259 return json_data_node.make({
260 "global_tags" : global_tags.all(),
263 "payloads" : payloads.all()
268 self.session.add(new_object)
273 self.session.commit()
275 traceback.print_exc()
276 self.session.rollback()
279 if isinstance(object, list):
289 self.session.rollback()
291 traceback.print_exc()
292 print(
"Session couldn't be rolled back.")
296 Contains methods for creating objects. 305 from data_sources
import json_list
306 from models
import apply_filters
308 model = self.connection.model(class_name)
310 if self.connection.session ==
None:
314 model_data = self.connection.session.query(model)
315 if len(pkargs.items()) != 0:
318 amount = pkargs[
"amount"]
if "amount" in pkargs.keys()
else None 319 model_data = model_data.limit(amount)
320 if model_data.count() > 1:
322 return json_list(model_data.all())
323 elif model_data.count() == 1:
325 return model_data.first()
332 new_object.empty =
True 337 Returns a dictionary {login : ..., account : ..., password : ...} 340 headers = [
"login",
"account",
"password"]
341 authenticator_tuple = netrc.netrc(netrc_file).authenticators(key)
342 if authenticator_tuple ==
None:
343 raise Exception(
"netrc file must contain key '%s'." % key)
345 raise Exception(
"Couldn't get credentials from netrc file.")
346 return dict(
zip(headers, authenticator_tuple))
350 Function used to construct connection data dictionaries - internal to framework. 352 frontier_str_length = len(
"frontier://")
353 sqlite_str_length = len(
"sqlite://")
355 oracle_str_length = len(
"oracle://")
357 if type(connection_data)
in [str, unicode]
and connection_data[0:frontier_str_length] ==
"frontier://":
359 frontier://database_name/schema 361 db_name = connection_data[frontier_str_length:].
split(
"/")[0]
362 schema = connection_data[frontier_str_length:].
split(
"/")[1]
364 connection_data[
"database_name"] = db_name
365 connection_data[
"schema"] = schema
366 connection_data[
"host"] =
"frontier" 367 connection_data[
"secrets"] =
None 368 elif type(connection_data)
in [str, unicode]
and connection_data[0:sqlite_str_length] ==
"sqlite://":
370 sqlite://database_file_name 373 db_name = connection_data[sqlite_str_length:]
376 connection_data[
"database_name"] = os.path.abspath(db_name)
377 connection_data[
"schema"] = schema
378 connection_data[
"host"] =
"sqlite" 379 connection_data[
"secrets"] =
None 380 elif type(connection_data)
in [str, unicode]
and connection_data[0:oracle_str_length] ==
"oracle://":
382 oracle://account:password@database_name 384 oracle://database_name/schema (requires a separate method of authentication - either dictionary or netrc) 386 new_connection_string = connection_data[oracle_str_length:]
388 if ":" in new_connection_string:
390 database_name = new_connection_string[new_connection_string.index(
"@")+1:]
391 schema_name = new_connection_string[0:new_connection_string.index(
":")]
393 username = new_connection_string[0:new_connection_string.index(
":")]
394 password = new_connection_string[new_connection_string.index(
":")+1:new_connection_string.index(
"@")]
396 mode_to_netrc_key_suffix = {
"r" : "read", "w" : "write"}
397 database_name = new_connection_string[0:new_connection_string.index(
"/")]
398 schema_name = new_connection_string[new_connection_string.index(
"/")+1:]
400 username =
str(raw_input(
"Enter the username you want to connect to the schema '%s' with: " % (schema_name)))
401 password =
str(raw_input(
"Enter the password for the user '%s' in database '%s': " % (username, database_name)))
403 if isinstance(secrets, str):
404 netrc_key =
"%s/%s/%s" % (database_name, schema_name, mode_to_netrc_key_suffix[mode])
408 username = netrc_data[
"login"]
409 password = netrc_data[
"password"]
410 elif isinstance(secrets, dict):
411 username = secrets[
"user"]
412 password = secrets[
"password"]
414 raise Exception(
"Invalid type given for secrets. Either an str or a dict must be given.")
419 connection_data[
"database_name"] = database_name
420 connection_data[
"schema"] = schema_name
421 connection_data[
"password"] = password
422 connection_data[
"host"] =
"oracle" 423 connection_data[
"secrets"] = {
"login" : username,
"password" : password}
425 return connection_data
428 if dictionary[
"host"] !=
"sqlite":
429 if dictionary[
"host"] !=
"frontier":
432 user = dictionary[
"secrets"][
"login"]
433 pwd = dictionary[
"secrets"][
"password"]
436 return create_engine(connection.build_oracle_url(user, pwd, dictionary[
"database_name"]), label_length=6)
438 return create_engine(connection.build_oracle_url(user, pwd, dictionary[
"database_name"]), label_length=6, poolclass=NullPool)
443 return create_engine(connection.build_frontier_url(dictionary[
"database_name"], dictionary[
"schema"]), label_length=6)
445 return create_engine(connection.build_frontier_url(dictionary[
"database_name"], dictionary[
"schema"]), label_length=6, poolclass=NullPool)
448 return create_engine(
"sqlite:///%s" % dictionary[
"database_name"])
451 def connect(connection_data, mode="r", map_blobs=False, secrets=None, pooling=True):
453 Utility method for user - set up a connection object. 455 con =
connection(connection_data=connection_data, mode=mode, map_blobs=map_blobs, secrets=secrets, pooling=pooling)
def global_tag(self, pkargs)
def session_independent_object(object, schema=None)
def connect(connection_data, mode="r", map_blobs=False, secrets=None, pooling=True)
def build_frontier_url(db_name, schema)
def object(self, class_name, pkargs)
def object(self, model, pk_to_value)
S & print(S &os, JobReport::InputFile const &f)
static void * communicate(void *obj)
def model(self, model_name)
def write_and_commit(self, object)
def __init__(self, connection)
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
def global_tag_map(self, pkargs)
def _cms_frontier_string(database, schema="cms_conditions")
def engine_from_dictionary(dictionary, pooling=True)
def payload(self, pkargs)
def build_oracle_url(user, pwd, db_name)
def _get_CMS_frontier_connection_string(database)
def new_connection_dictionary(connection_data, secrets=None, mode="r")
def apply_filters(orm_query, orm_class, filters)
def _oracle_match_format(self, string)
def __init__(self, connection_data, mode=None, map_blobs=False, secrets=None, pooling=False)
def search_everything(self, string, amount=10)
def _get_netrc_data(netrc_file, key)
def _cms_oracle_string(user, pwd, db_name)