CMS 3D CMS Logo

querying.py
Go to the documentation of this file.
1 """
2 
3 connection class translates either a connection string for sqlite, oracle of frontier into a connection object.
4 Also sets up ORM with SQLAlchemy.
5 
6 connection class can also take a pre-constructed engine - useful for web services.
7 
8 """
9 
10 import sqlalchemy
11 from sqlalchemy import create_engine, text, or_
12 from sqlalchemy.orm import sessionmaker
13 from sqlalchemy.pool import NullPool
14 import datetime
15 from .data_sources import json_data_node
16 from copy import deepcopy
17 from . import models
18 import traceback
19 import os
20 import netrc
21 import sys
22 
24  engine = None
25  connection = None
26  session = None
27  connection_data = None
28  netrc_authenticators = None
29  secrets = None
30  """
31 
32  Given a connection string, parses the connection string and connects.
33 
34  """
35  def __init__(self, connection_data, mode=None, map_blobs=False, secrets=None, pooling=False):
36 
37  self._pooling = pooling
38 
39  # add querying utility properties
40  # these must belong to the connection since the way in which their values are handled
41  # depends on the database being connected to.
45  self.regexp.connection_object = self
46 
47  if type(connection_data) in [str, str]:
48  # if we've been given a connection string, process it
49  self.connection_data = new_connection_dictionary(connection_data, secrets=secrets, mode=mode)
50  self.schema = self.connection_data.get("schema") if self.connection_data.get("schema") != None else ""
51 
52  self.range.database_type = self.connection_data["host"]
53  self.radius.database_type = self.connection_data["host"]
54  self.regexp.database_type = self.connection_data["host"]
55  else:
56  self.connection_data = connection_data
57  # assume we have an engine
58  # we need to take the string representation so we know which type of db we're aiming at
59  engine_string = str(connection_data)
60  db_type = None
61  if "oracle" in engine_string:
62  db_type = "oracle"
63  elif "frontier" in engine_string:
64  db_type = "frontier"
65  elif "sqlite" in engine_string:
66  db_type = "sqlite"
67 
68  self.range.database_type = db_type
69  self.radius.database_type = db_type
70  self.regexp.database_type = db_type
71 
72  from . import models as ms
73  self.models = ms.generate(map_blobs)
74  #self.base = self.models["Base"]
75 
76  def setup(self):
77  """
78  Setup engine with given credentials from netrc file, and make a session maker.
79  """
80 
81  if type(self.connection_data) == dict:
83  else:
84  # we've been given an engine by the user
85  # use it as the engine
86  self.engine = self.connection_data
87 
88  self.sessionmaker = sessionmaker(bind=self.engine)
89  self.session = self.sessionmaker()
90  self.factory = factory(self)
91 
92  # assign correct schema for database name to each model
93  tmp_models_dict = {}
94  for key in self.models:
95  if self.models[key].__class__ == sqlalchemy.ext.declarative.api.DeclarativeMeta\
96  and str(self.models[key].__name__) != "Base":
97 
98  if type(self.connection_data) == dict:
99  # we can only extract the secrets and schema individuall
100  # if we were given a dictionary... if we were given an engine
101  # we can't do this without parsing the connection string from the engine
102  # - a wide range of which it will be difficult to support!
103  self.models[key].__table__.schema = self.connection_data["schema"]
104  self.models[key].secrets = self.connection_data["secrets"]
105 
106  self.models[key].session = self.session
107  # isn't used anywhere - comment it out for now
108  #self.models[key].authentication = self.netrc_authenticators
109  self.models[key].connection = self
110  tmp_models_dict[key.lower()] = self.models[key]
111  tmp_models_dict[key.lower()].empty = False
112 
113  self.models = tmp_models_dict
114 
115  return self
116 
117  @staticmethod
119  try:
120  import subprocess
121  return subprocess.Popen(['cmsGetFnConnect', 'frontier://%s' % database], stdout = subprocess.PIPE).communicate()[0].strip()
122  except:
123  raise Exception("Frontier connections can only be constructed when inside a CMSSW environment.")
124 
125  @staticmethod
126  def _cms_frontier_string(database, schema="cms_conditions"):
127  """
128  Get database string for frontier.
129  """
130  import urllib.request, urllib.parse, urllib.error
131  return 'oracle+frontier://@%s/%s' % (urllib.parse.quote_plus(connection._get_CMS_frontier_connection_string(database)), schema)
132 
133  @staticmethod
134  def _cms_oracle_string(user, pwd, db_name):
135  """
136  Get database string for oracle.
137  """
138  return 'oracle://%s:%s@%s' % (user, pwd, db_name)
139 
140  @staticmethod
141  def build_oracle_url(user, pwd, db_name):
142  """
143  Build the connection url, and get credentials from self.secrets dictionary.
144  """
145 
146  database_url = connection._cms_oracle_string(user, pwd, db_name)
147 
148  try:
149  url = sqlalchemy.engine.url.make_url(database_url)
150  if url.password is None:
151  url.password = pwd
152  except sqlalchemy.exc.ArgumentError:
153  url = sqlalchemy.engine.url.make_url('sqlite:///%s' % db_name)
154  return url
155 
156  @staticmethod
157  def build_frontier_url(db_name, schema):
158  database_url = connection._cms_frontier_string(db_name, schema)
159 
160  try:
161  url = sqlalchemy.engine.url.make_url(database_url)
162  except sqlalchemy.exc.ArgumentError:
163  """
164  Is this needed for a use case?
165  """
166  url = sqlalchemy.engine.url.make_url('sqlite:///%s' % db_name)
167  return url
168 
169  # currently just commits and closes the current session (ends transaction, closes connection)
170  # may do other things later
171  def tear_down(self):
172  try:
173  self.session.commit()
174  self.close_session()
175  except:
176  return "Couldn't tear down connection on engine %s." % str(self.engine)
177 
178  def close_session(self):
179  self.session.close()
180  return True
181 
182  def hard_close(self):
183  self.engine.dispose()
184  return True
185 
186  # get model based on given model name
187  def model(self, model_name):
188  if model_name.__class__ == sqlalchemy.ext.declarative.api.DeclarativeMeta:
189  model_name = model_name.__name__
190  model_name = model_name.replace("_", "")
191  return self.models[model_name]
192 
193  # model should be the class the developer wants to be instantiated
194  # pk_to_value maps primary keys to values
195  def object(self, model, pk_to_value):
196  if self.session == None:
197  return None
198  model_data = self.session.query(model)
199  for pk in pk_to_value:
200  model_data = model_data.filter(model.__dict__[pk] == pk_to_value[pk])
201  return model_data.first()
202 
203  def global_tag(self, **pkargs):
204  return self.factory.object("globaltag", **pkargs)
205 
206  def global_tag_map(self, **pkargs):
207  return self.factory.object("globaltagmap", **pkargs)
208 
209  """def global_tag_map_request(self, **pkargs):
210  return self.factory.object("globaltagmaprequest", **pkargs)"""
211 
212  def tag(self, **pkargs):
213  return self.factory.object("tag", **pkargs)
214 
215  def tag_authorization(self, **pkargs):
216  return self.factory.object("tagauthorization", **pkargs)
217 
218  def iov(self, **pkargs):
219  return self.factory.object("iov", **pkargs)
220 
221  def payload(self, **pkargs):
222  return self.factory.object("payload", **pkargs)
223 
224  """def record(self, **pkargs):
225  return self.factory.object("record", **pkargs)"""
226 
227  # adds %% at the beginning and end so LIKE in SQL searches all of the string
228  def _oracle_match_format(self, string):
229  return "%%%s%%" % string
230 
231  # returns dictionary mapping object type to a list of all objects found in the search
232  def search_everything(self, string, amount=10):
233  string = self._oracle_match_format(string)
234 
235  gt = self.model("globaltag")
236  global_tags = self.session.query(gt).filter(or_(
237  gt.name.ilike(string),
238  gt.description.ilike(string),
239  gt.release.ilike(string)
240  )).limit(amount)
241  tag = self.model("tag")
242  tags = self.session.query(tag).filter(or_(
243  tag.name.ilike(string),
244  tag.object_type.ilike(string),
245  tag.description.ilike(string))
246  ).limit(amount)
247  iov = self.model("iov")
248  iovs = self.session.query(iov).filter(or_(
249  iov.tag_name.ilike(string),
250  iov.since.ilike(string),
251  iov.payload_hash.ilike(string),
252  iov.insertion_time.ilike(string)
253  )).limit(amount)
254  payload = self.model("payload")
255  payloads = self.session.query(payload).filter(or_(
256  payload.hash.ilike(string),
257  payload.object_type.ilike(string),
258  payload.insertion_time.ilike(string)
259  )).limit(amount)
260 
261  return json_data_node.make({
262  "global_tags" : global_tags.all(),
263  "tags" : tags.all(),
264  "iovs" : iovs.all(),
265  "payloads" : payloads.all()
266  })
267 
268  def write(self, object):
269  new_object = models.session_independent_object(object, schema=self.schema)
270  self.session.add(new_object)
271  return new_object
272 
273  def commit(self):
274  try:
275  self.session.commit()
276  except:
277  traceback.print_exc()
278  self.session.rollback()
279 
280  def write_and_commit(self, object):
281  if type(object) == list:
282  for item in object:
283  self.write_and_commit(item)
284  else:
285  # should be changed to deal with errors - add them to exception handling if they appear
286  self.write(object)
287  self.commit()
288 
289  def rollback(self):
290  try:
291  self.session.rollback()
292  except:
293  traceback.print_exc()
294  print("Session couldn't be rolled back.")
295 
296 class factory():
297  """
298  Contains methods for creating objects.
299  """
300  def __init__(self, connection):
301  self.connection = connection
302 
303  # class_name is the class name of the model to be used
304  # pkargs is a dictionary of keyword arguments used as primary key values
305  # this dictionary will be used to populate the object of type name class_name
306  def object(self, class_name, **pkargs):
307  from .data_sources import json_list
308  from .models import apply_filters
309  # get the class that self.connection holds from the class name
310  model = self.connection.model(class_name)
311 
312  if self.connection.session == None:
313  return None
314 
315  # query for the ORM object, and return the appropriate object (None, CondDBFW object, or json_list)
316  model_data = self.connection.session.query(model)
317  if len(list(pkargs.items())) != 0:
318  # apply the filters defined in **kwargs
319  model_data = apply_filters(model_data, model, **pkargs)
320  amount = pkargs["amount"] if "amount" in list(pkargs.keys()) else None
321  model_data = model_data.limit(amount)
322  if model_data.count() > 1:
323  # if we have multiple objects, return a json_list
324  return json_list(model_data.all())
325  elif model_data.count() == 1:
326  # if we have a single object, return that object
327  return model_data.first()
328  else:
329  # if we have no objects returned, return None
330  return None
331  else:
332  # no column arguments were given, so return an empty object
333  new_object = model()
334  new_object.empty = True
335  return new_object
336 
337 def _get_netrc_data(netrc_file, key):
338  """
339  Returns a dictionary {login : ..., account : ..., password : ...}
340  """
341  try:
342  headers = ["login", "account", "password"]
343  authenticator_tuple = netrc.netrc(netrc_file).authenticators(key)
344  if authenticator_tuple == None:
345  raise Exception("netrc file must contain key '%s'." % key)
346  except:
347  raise Exception("Couldn't get credentials from netrc file.")
348  return dict(list(zip(headers, authenticator_tuple)))
349 
350 def new_connection_dictionary(connection_data, secrets=None, mode="r"):
351  """
352  Function used to construct connection data dictionaries - internal to framework.
353  """
354  frontier_str_length = len("frontier://")
355  sqlite_str_length = len("sqlite://")
356  #sqlite_file_str_length = len("sqlite_file://")
357  oracle_str_length = len("oracle://")
358 
359  if type(connection_data) in [str, str] and connection_data[0:frontier_str_length] == "frontier://":
360  """
361  frontier://database_name/schema
362  """
363  db_name = connection_data[frontier_str_length:].split("/")[0]
364  schema = connection_data[frontier_str_length:].split("/")[1]
365  connection_data = {}
366  connection_data["database_name"] = db_name
367  connection_data["schema"] = schema
368  connection_data["host"] = "frontier"
369  connection_data["secrets"] = None
370  elif type(connection_data) in [str, str] and connection_data[0:sqlite_str_length] == "sqlite://":
371  """
372  sqlite://database_file_name
373  """
374  # for now, just support "sqlite://" format for sqlite connection strings
375  db_name = connection_data[sqlite_str_length:]
376  schema = ""
377  connection_data = {}
378  connection_data["database_name"] = os.path.abspath(db_name)
379  connection_data["schema"] = schema
380  connection_data["host"] = "sqlite"
381  connection_data["secrets"] = None
382  elif type(connection_data) in [str, str] and connection_data[0:oracle_str_length] == "oracle://":
383  """
384  oracle://account:password@database_name
385  or
386  oracle://database_name/schema (requires a separate method of authentication - either dictionary or netrc)
387  """
388  new_connection_string = connection_data[oracle_str_length:]
389 
390  if ":" in new_connection_string:
391  # the user has given a password - usually in the case of the db upload service
392  database_name = new_connection_string[new_connection_string.index("@")+1:]
393  schema_name = new_connection_string[0:new_connection_string.index(":")]
394  # set username based on connection string
395  username = new_connection_string[0:new_connection_string.index(":")]
396  password = new_connection_string[new_connection_string.index(":")+1:new_connection_string.index("@")]
397  else:
398  mode_to_netrc_key_suffix = {"r" : "read", "w" : "write"}
399  database_name = new_connection_string[0:new_connection_string.index("/")]
400  schema_name = new_connection_string[new_connection_string.index("/")+1:]
401  if secrets == None:
402  username = str(input("Enter the username you want to connect to the schema '%s' with: " % (schema_name)))
403  password = str(input("Enter the password for the user '%s' in database '%s': " % (username, database_name)))
404  else:
405  if type(secrets) == str:
406  netrc_key = "%s/%s/%s" % (database_name, schema_name, mode_to_netrc_key_suffix[mode])
407  netrc_data = _get_netrc_data(secrets, key=netrc_key)
408  # take the username from the netrc entry corresponding to the mode the database is opened in
409  # eg, if the user has given mode="read", the database_name/schema_name/read entry will be taken
410  username = netrc_data["login"]
411  password = netrc_data["password"]
412  elif type(secrets) == dict:
413  username = secrets["user"]
414  password = secrets["password"]
415  else:
416  raise Exception("Invalid type given for secrets. Either an str or a dict must be given.")
417 
418  #print("Connected to database %s, schema %s, with username %s." % (database_name, schema_name, username))
419 
420  connection_data = {}
421  connection_data["database_name"] = database_name
422  connection_data["schema"] = schema_name
423  connection_data["password"] = password
424  connection_data["host"] = "oracle"
425  connection_data["secrets"] = {"login" : username, "password" : password}
426 
427  return connection_data
428 
429 def engine_from_dictionary(dictionary, pooling=True):
430  if dictionary["host"] != "sqlite":
431  if dictionary["host"] != "frontier":
432  # probably oracle
433  # if not frontier, we have to authenticate
434  user = dictionary["secrets"]["login"]
435  pwd = dictionary["secrets"]["password"]
436  # set max label length for oracle
437  if pooling:
438  return create_engine(connection.build_oracle_url(user, pwd, dictionary["database_name"]), label_length=6)
439  else:
440  return create_engine(connection.build_oracle_url(user, pwd, dictionary["database_name"]), label_length=6, poolclass=NullPool)
441  else:
442  # if frontier, no need to authenticate
443  # set max label length for frontier
444  if pooling:
445  return create_engine(connection.build_frontier_url(dictionary["database_name"], dictionary["schema"]), label_length=6)
446  else:
447  return create_engine(connection.build_frontier_url(dictionary["database_name"], dictionary["schema"]), label_length=6, poolclass=NullPool)
448  else:
449  # if host is sqlite, making the url is easy - no authentication
450  return create_engine("sqlite:///%s" % dictionary["database_name"])
451 
452 
453 def connect(connection_data, mode="r", map_blobs=False, secrets=None, pooling=True):
454  """
455  Utility method for user - set up a connection object.
456  """
457  con = connection(connection_data=connection_data, mode=mode, map_blobs=map_blobs, secrets=secrets, pooling=pooling)
458  con = con.setup()
459  return con
def global_tag(self, pkargs)
Definition: querying.py:203
def session_independent_object(object, schema=None)
Definition: models.py:31
def connect(connection_data, mode="r", map_blobs=False, secrets=None, pooling=True)
Definition: querying.py:453
def rollback(self)
Definition: querying.py:289
def build_frontier_url(db_name, schema)
Definition: querying.py:157
def object(self, class_name, pkargs)
Definition: querying.py:306
def object(self, model, pk_to_value)
Definition: querying.py:195
static void * communicate(void *obj)
Definition: DQMNet.cc:1057
def model(self, model_name)
Definition: querying.py:187
void dispose(XMLCh *ptr)
def write_and_commit(self, object)
Definition: querying.py:280
def tag_authorization(self, pkargs)
Definition: querying.py:215
static std::string const input
Definition: EdmProvDump.cc:47
Definition: query.py:1
def hard_close(self)
Definition: querying.py:182
def __init__(self, connection)
Definition: querying.py:300
def setup(self)
Definition: querying.py:76
def close_session(self)
Definition: querying.py:178
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
def global_tag_map(self, pkargs)
Definition: querying.py:206
def write(self, object)
Definition: querying.py:268
def _cms_frontier_string(database, schema="cms_conditions")
Definition: querying.py:126
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
def engine_from_dictionary(dictionary, pooling=True)
Definition: querying.py:429
def payload(self, pkargs)
Definition: querying.py:221
def build_oracle_url(user, pwd, db_name)
Definition: querying.py:141
def iov(self, pkargs)
Definition: querying.py:218
def _get_CMS_frontier_connection_string(database)
Definition: querying.py:118
def commit(self)
Definition: querying.py:273
def new_connection_dictionary(connection_data, secrets=None, mode="r")
Definition: querying.py:350
void add(std::map< std::string, TH1 *> &h, TH1 *hist)
def apply_filters(orm_query, orm_class, filters)
Definition: models.py:181
def _oracle_match_format(self, string)
Definition: querying.py:228
def __init__(self, connection_data, mode=None, map_blobs=False, secrets=None, pooling=False)
Definition: querying.py:35
def tear_down(self)
Definition: querying.py:171
def tag(self, pkargs)
Definition: querying.py:212
#define str(s)
def search_everything(self, string, amount=10)
Definition: querying.py:232
def _get_netrc_data(netrc_file, key)
Definition: querying.py:337
def _cms_oracle_string(user, pwd, db_name)
Definition: querying.py:134