3 connection class translates either a connection string for sqlite, oracle of frontier into a connection object. 4 Also sets up ORM with SQLAlchemy. 6 connection class can also take a pre-constructed engine - useful for web services. 11 from sqlalchemy
import create_engine, text, or_
12 from sqlalchemy.orm
import sessionmaker
13 from sqlalchemy.pool
import NullPool
15 from .data_sources
import json_data_node
16 from copy
import deepcopy
27 connection_data =
None 28 netrc_authenticators =
None 32 Given a connection string, parses the connection string and connects. 35 def __init__(self, connection_data, mode=None, map_blobs=False, secrets=None, pooling=False):
45 self.
regexp.connection_object = self
47 if type(connection_data)
in [str, str]:
59 engine_string =
str(connection_data)
61 if "oracle" in engine_string:
63 elif "frontier" in engine_string:
65 elif "sqlite" in engine_string:
68 self.
range.database_type = db_type
69 self.
radius.database_type = db_type
70 self.
regexp.database_type = db_type
72 from .
import models
as ms
78 Setup engine with given credentials from netrc file, and make a session maker. 95 if self.
models[key].__class__ == sqlalchemy.ext.declarative.api.DeclarativeMeta\
96 and str(self.
models[key].__name__) !=
"Base":
109 self.
models[key].connection = self
110 tmp_models_dict[key.lower()] = self.
models[key]
111 tmp_models_dict[key.lower()].empty =
False 113 self.
models = tmp_models_dict
121 return subprocess.Popen([
'cmsGetFnConnect',
'frontier://%s' % database], stdout = subprocess.PIPE).
communicate()[0].
strip()
123 raise Exception(
"Frontier connections can only be constructed when inside a CMSSW environment.")
128 Get database string for frontier. 130 import urllib.request, urllib.parse, urllib.error
131 return 'oracle+frontier://@%s/%s' % (urllib.parse.quote_plus(connection._get_CMS_frontier_connection_string(database)), schema)
136 Get database string for oracle. 138 return 'oracle://%s:%s@%s' % (user, pwd, db_name)
143 Build the connection url, and get credentials from self.secrets dictionary. 146 database_url = connection._cms_oracle_string(user, pwd, db_name)
149 url = sqlalchemy.engine.url.make_url(database_url)
150 if url.password
is None:
152 except sqlalchemy.exc.ArgumentError:
153 url = sqlalchemy.engine.url.make_url(
'sqlite:///%s' % db_name)
158 database_url = connection._cms_frontier_string(db_name, schema)
161 url = sqlalchemy.engine.url.make_url(database_url)
162 except sqlalchemy.exc.ArgumentError:
164 Is this needed for a use case? 166 url = sqlalchemy.engine.url.make_url(
'sqlite:///%s' % db_name)
176 return "Couldn't tear down connection on engine %s." %
str(self.
engine)
188 if model_name.__class__ == sqlalchemy.ext.declarative.api.DeclarativeMeta:
189 model_name = model_name.__name__
190 model_name = model_name.replace(
"_",
"")
191 return self.
models[model_name]
199 for pk
in pk_to_value:
200 model_data = model_data.filter(model.__dict__[pk] == pk_to_value[pk])
201 return model_data.first()
209 """def global_tag_map_request(self, **pkargs): 210 return self.factory.object("globaltagmaprequest", **pkargs)""" 224 """def record(self, **pkargs): 225 return self.factory.object("record", **pkargs)""" 229 return "%%%s%%" % string
235 gt = self.
model(
"globaltag")
237 gt.name.ilike(string),
238 gt.description.ilike(string),
239 gt.release.ilike(string)
241 tag = self.
model(
"tag")
243 tag.name.ilike(string),
244 tag.object_type.ilike(string),
245 tag.description.ilike(string))
247 iov = self.
model(
"iov")
249 iov.tag_name.ilike(string),
250 iov.since.ilike(string),
251 iov.payload_hash.ilike(string),
252 iov.insertion_time.ilike(string)
254 payload = self.
model(
"payload")
256 payload.hash.ilike(string),
257 payload.object_type.ilike(string),
258 payload.insertion_time.ilike(string)
261 return json_data_node.make({
262 "global_tags" : global_tags.all(),
265 "payloads" : payloads.all()
277 traceback.print_exc()
281 if type(object) == list:
293 traceback.print_exc()
294 print(
"Session couldn't be rolled back.")
298 Contains methods for creating objects. 307 from .data_sources
import json_list
308 from .models
import apply_filters
316 model_data = self.
connection.session.query(model)
317 if len(list(pkargs.items())) != 0:
320 amount = pkargs[
"amount"]
if "amount" in list(pkargs.keys())
else None 321 model_data = model_data.limit(amount)
322 if model_data.count() > 1:
324 return json_list(model_data.all())
325 elif model_data.count() == 1:
327 return model_data.first()
334 new_object.empty =
True 339 Returns a dictionary {login : ..., account : ..., password : ...} 342 headers = [
"login",
"account",
"password"]
343 authenticator_tuple = netrc.netrc(netrc_file).authenticators(key)
344 if authenticator_tuple ==
None:
345 raise Exception(
"netrc file must contain key '%s'." % key)
347 raise Exception(
"Couldn't get credentials from netrc file.")
348 return dict(list(
zip(headers, authenticator_tuple)))
352 Function used to construct connection data dictionaries - internal to framework. 354 frontier_str_length = len(
"frontier://")
355 sqlite_str_length = len(
"sqlite://")
357 oracle_str_length = len(
"oracle://")
359 if type(connection_data)
in [str, str]
and connection_data[0:frontier_str_length] ==
"frontier://":
361 frontier://database_name/schema 363 db_name = connection_data[frontier_str_length:].
split(
"/")[0]
364 schema = connection_data[frontier_str_length:].
split(
"/")[1]
366 connection_data[
"database_name"] = db_name
367 connection_data[
"schema"] = schema
368 connection_data[
"host"] =
"frontier" 369 connection_data[
"secrets"] =
None 370 elif type(connection_data)
in [str, str]
and connection_data[0:sqlite_str_length] ==
"sqlite://":
372 sqlite://database_file_name 375 db_name = connection_data[sqlite_str_length:]
378 connection_data[
"database_name"] = os.path.abspath(db_name)
379 connection_data[
"schema"] = schema
380 connection_data[
"host"] =
"sqlite" 381 connection_data[
"secrets"] =
None 382 elif type(connection_data)
in [str, str]
and connection_data[0:oracle_str_length] ==
"oracle://":
384 oracle://account:password@database_name 386 oracle://database_name/schema (requires a separate method of authentication - either dictionary or netrc) 388 new_connection_string = connection_data[oracle_str_length:]
390 if ":" in new_connection_string:
392 database_name = new_connection_string[new_connection_string.index(
"@")+1:]
393 schema_name = new_connection_string[0:new_connection_string.index(
":")]
395 username = new_connection_string[0:new_connection_string.index(
":")]
396 password = new_connection_string[new_connection_string.index(
":")+1:new_connection_string.index(
"@")]
398 mode_to_netrc_key_suffix = {
"r" : "read", "w" : "write"}
399 database_name = new_connection_string[0:new_connection_string.index(
"/")]
400 schema_name = new_connection_string[new_connection_string.index(
"/")+1:]
402 username =
str(
input(
"Enter the username you want to connect to the schema '%s' with: " % (schema_name)))
403 password =
str(
input(
"Enter the password for the user '%s' in database '%s': " % (username, database_name)))
405 if type(secrets) == str:
406 netrc_key =
"%s/%s/%s" % (database_name, schema_name, mode_to_netrc_key_suffix[mode])
410 username = netrc_data[
"login"]
411 password = netrc_data[
"password"]
412 elif type(secrets) == dict:
413 username = secrets[
"user"]
414 password = secrets[
"password"]
416 raise Exception(
"Invalid type given for secrets. Either an str or a dict must be given.")
421 connection_data[
"database_name"] = database_name
422 connection_data[
"schema"] = schema_name
423 connection_data[
"password"] = password
424 connection_data[
"host"] =
"oracle" 425 connection_data[
"secrets"] = {
"login" : username,
"password" : password}
427 return connection_data
430 if dictionary[
"host"] !=
"sqlite":
431 if dictionary[
"host"] !=
"frontier":
434 user = dictionary[
"secrets"][
"login"]
435 pwd = dictionary[
"secrets"][
"password"]
438 return create_engine(connection.build_oracle_url(user, pwd, dictionary[
"database_name"]), label_length=6)
440 return create_engine(connection.build_oracle_url(user, pwd, dictionary[
"database_name"]), label_length=6, poolclass=NullPool)
445 return create_engine(connection.build_frontier_url(dictionary[
"database_name"], dictionary[
"schema"]), label_length=6)
447 return create_engine(connection.build_frontier_url(dictionary[
"database_name"], dictionary[
"schema"]), label_length=6, poolclass=NullPool)
450 return create_engine(
"sqlite:///%s" % dictionary[
"database_name"])
453 def connect(connection_data, mode="r", map_blobs=False, secrets=None, pooling=True):
455 Utility method for user - set up a connection object. 457 con =
connection(connection_data=connection_data, mode=mode, map_blobs=map_blobs, secrets=secrets, pooling=pooling)
def global_tag(self, pkargs)
def session_independent_object(object, schema=None)
def connect(connection_data, mode="r", map_blobs=False, secrets=None, pooling=True)
def build_frontier_url(db_name, schema)
def object(self, class_name, pkargs)
def object(self, model, pk_to_value)
static void * communicate(void *obj)
def model(self, model_name)
def write_and_commit(self, object)
def tag_authorization(self, pkargs)
static std::string const input
def __init__(self, connection)
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
def global_tag_map(self, pkargs)
def _cms_frontier_string(database, schema="cms_conditions")
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
def engine_from_dictionary(dictionary, pooling=True)
def split(sequence, size)
def payload(self, pkargs)
def build_oracle_url(user, pwd, db_name)
def _get_CMS_frontier_connection_string(database)
def new_connection_dictionary(connection_data, secrets=None, mode="r")
void add(std::map< std::string, TH1 *> &h, TH1 *hist)
def apply_filters(orm_query, orm_class, filters)
def _oracle_match_format(self, string)
def __init__(self, connection_data, mode=None, map_blobs=False, secrets=None, pooling=False)
def search_everything(self, string, amount=10)
def _get_netrc_data(netrc_file, key)
def _cms_oracle_string(user, pwd, db_name)