CMS 3D CMS Logo

querying.py
Go to the documentation of this file.
1 """
2 
3 connection class translates either a connection string for sqlite, oracle of frontier into a connection object.
4 Also sets up ORM with SQLAlchemy.
5 
6 connection class can also take a pre-constructed engine - useful for web services.
7 
8 """
9 from __future__ import print_function
10 
11 import sqlalchemy
12 from sqlalchemy import create_engine, text, or_
13 from sqlalchemy.orm import sessionmaker
14 from sqlalchemy.pool import NullPool
15 import datetime
16 from data_sources import json_data_node
17 from copy import deepcopy
18 import models
19 import traceback
20 import os
21 import netrc
22 import sys
23 
25  engine = None
26  connection = None
27  session = None
28  connection_data = None
29  netrc_authenticators = None
30  secrets = None
31  """
32 
33  Given a connection string, parses the connection string and connects.
34 
35  """
36  def __init__(self, connection_data, mode=None, map_blobs=False, secrets=None, pooling=False):
37 
38  self._pooling = pooling
39 
40  # add querying utility properties
41  # these must belong to the connection since the way in which their values are handled
42  # depends on the database being connected to.
46  self.regexp.connection_object = self
47 
48  if type(connection_data) in [str, unicode]:
49  # if we've been given a connection string, process it
50  self.connection_data = new_connection_dictionary(connection_data, secrets=secrets, mode=mode)
51  self.schema = self.connection_data.get("schema") if self.connection_data.get("schema") != None else ""
52 
53  self.range.database_type = self.connection_data["host"]
54  self.radius.database_type = self.connection_data["host"]
55  self.regexp.database_type = self.connection_data["host"]
56  else:
57  self.connection_data = connection_data
58  # assume we have an engine
59  # we need to take the string representation so we know which type of db we're aiming at
60  engine_string = str(connection_data)
61  db_type = None
62  if "oracle" in engine_string:
63  db_type = "oracle"
64  elif "frontier" in engine_string:
65  db_type = "frontier"
66  elif "sqlite" in engine_string:
67  db_type = "sqlite"
68 
69  self.range.database_type = db_type
70  self.radius.database_type = db_type
71  self.regexp.database_type = db_type
72 
73  import models as ms
74  self.models = ms.generate(map_blobs)
75  #self.base = self.models["Base"]
76 
77  def setup(self):
78  """
79  Setup engine with given credentials from netrc file, and make a session maker.
80  """
81 
82  if isinstance(self.connection_data, dict):
84  else:
85  # we've been given an engine by the user
86  # use it as the engine
87  self.engine = self.connection_data
88 
89  self.sessionmaker = sessionmaker(bind=self.engine)
90  self.session = self.sessionmaker()
91  self.factory = factory(self)
92 
93  # assign correct schema for database name to each model
94  tmp_models_dict = {}
95  for key in self.models:
96  if self.models[key].__class__ == sqlalchemy.ext.declarative.api.DeclarativeMeta\
97  and str(self.models[key].__name__) != "Base":
98 
99  if isinstance(self.connection_data, dict):
100  # we can only extract the secrets and schema individuall
101  # if we were given a dictionary... if we were given an engine
102  # we can't do this without parsing the connection string from the engine
103  # - a wide range of which it will be difficult to support!
104  self.models[key].__table__.schema = self.connection_data["schema"]
105  self.models[key].secrets = self.connection_data["secrets"]
106 
107  self.models[key].session = self.session
108  # isn't used anywhere - comment it out for now
109  #self.models[key].authentication = self.netrc_authenticators
110  self.models[key].connection = self
111  tmp_models_dict[key.lower()] = self.models[key]
112  tmp_models_dict[key.lower()].empty = False
113 
114  self.models = tmp_models_dict
115 
116  return self
117 
118  @staticmethod
120  try:
121  import subprocess
122  return subprocess.Popen(['cmsGetFnConnect', 'frontier://%s' % database], stdout = subprocess.PIPE).communicate()[0].strip()
123  except:
124  raise Exception("Frontier connections can only be constructed when inside a CMSSW environment.")
125 
126  @staticmethod
127  def _cms_frontier_string(database, schema="cms_conditions"):
128  """
129  Get database string for frontier.
130  """
131  import urllib
132  return 'oracle+frontier://@%s/%s' % (urllib.quote_plus(connection._get_CMS_frontier_connection_string(database)), schema)
133 
134  @staticmethod
135  def _cms_oracle_string(user, pwd, db_name):
136  """
137  Get database string for oracle.
138  """
139  return 'oracle://%s:%s@%s' % (user, pwd, db_name)
140 
141  @staticmethod
142  def build_oracle_url(user, pwd, db_name):
143  """
144  Build the connection url, and get credentials from self.secrets dictionary.
145  """
146 
147  database_url = connection._cms_oracle_string(user, pwd, db_name)
148 
149  try:
150  url = sqlalchemy.engine.url.make_url(database_url)
151  if url.password is None:
152  url.password = pwd
153  except sqlalchemy.exc.ArgumentError:
154  url = sqlalchemy.engine.url.make_url('sqlite:///%s' % db_name)
155  return url
156 
157  @staticmethod
158  def build_frontier_url(db_name, schema):
159  database_url = connection._cms_frontier_string(db_name, schema)
160 
161  try:
162  url = sqlalchemy.engine.url.make_url(database_url)
163  except sqlalchemy.exc.ArgumentError:
164  """
165  Is this needed for a use case?
166  """
167  url = sqlalchemy.engine.url.make_url('sqlite:///%s' % db_name)
168  return url
169 
170  # currently just commits and closes the current session (ends transaction, closes connection)
171  # may do other things later
172  def tear_down(self):
173  try:
174  self.session.commit()
175  self.close_session()
176  except:
177  return "Couldn't tear down connection on engine %s." % str(self.engine)
178 
179  def close_session(self):
180  self.session.close()
181  return True
182 
183  def hard_close(self):
184  self.engine.dispose()
185  return True
186 
187  # get model based on given model name
188  def model(self, model_name):
189  if model_name.__class__ == sqlalchemy.ext.declarative.api.DeclarativeMeta:
190  model_name = model_name.__name__
191  model_name = model_name.replace("_", "")
192  return self.models[model_name]
193 
194  # model should be the class the developer wants to be instantiated
195  # pk_to_value maps primary keys to values
196  def object(self, model, pk_to_value):
197  if self.session == None:
198  return None
199  model_data = self.session.query(model)
200  for pk in pk_to_value:
201  model_data = model_data.filter(model.__dict__[pk] == pk_to_value[pk])
202  return model_data.first()
203 
204  def global_tag(self, **pkargs):
205  return self.factory.object("globaltag", **pkargs)
206 
207  def global_tag_map(self, **pkargs):
208  return self.factory.object("globaltagmap", **pkargs)
209 
210  """def global_tag_map_request(self, **pkargs):
211  return self.factory.object("globaltagmaprequest", **pkargs)"""
212 
213  def tag(self, **pkargs):
214  return self.factory.object("tag", **pkargs)
215 
216  def iov(self, **pkargs):
217  return self.factory.object("iov", **pkargs)
218 
219  def payload(self, **pkargs):
220  return self.factory.object("payload", **pkargs)
221 
222  """def record(self, **pkargs):
223  return self.factory.object("record", **pkargs)"""
224 
225  # adds %% at the beginning and end so LIKE in SQL searches all of the string
226  def _oracle_match_format(self, string):
227  return "%%%s%%" % string
228 
229  # returns dictionary mapping object type to a list of all objects found in the search
230  def search_everything(self, string, amount=10):
231  string = self._oracle_match_format(string)
232 
233  gt = self.model("globaltag")
234  global_tags = self.session.query(gt).filter(or_(
235  gt.name.ilike(string),
236  gt.description.ilike(string),
237  gt.release.ilike(string)
238  )).limit(amount)
239  tag = self.model("tag")
240  tags = self.session.query(tag).filter(or_(
241  tag.name.ilike(string),
242  tag.object_type.ilike(string),
243  tag.description.ilike(string))
244  ).limit(amount)
245  iov = self.model("iov")
246  iovs = self.session.query(iov).filter(or_(
247  iov.tag_name.ilike(string),
248  iov.since.ilike(string),
249  iov.payload_hash.ilike(string),
250  iov.insertion_time.ilike(string)
251  )).limit(amount)
252  payload = self.model("payload")
253  payloads = self.session.query(payload).filter(or_(
254  payload.hash.ilike(string),
255  payload.object_type.ilike(string),
256  payload.insertion_time.ilike(string)
257  )).limit(amount)
258 
259  return json_data_node.make({
260  "global_tags" : global_tags.all(),
261  "tags" : tags.all(),
262  "iovs" : iovs.all(),
263  "payloads" : payloads.all()
264  })
265 
266  def write(self, object):
267  new_object = models.session_independent_object(object, schema=self.schema)
268  self.session.add(new_object)
269  return new_object
270 
271  def commit(self):
272  try:
273  self.session.commit()
274  except:
275  traceback.print_exc()
276  self.session.rollback()
277 
278  def write_and_commit(self, object):
279  if isinstance(object, list):
280  for item in object:
281  self.write_and_commit(item)
282  else:
283  # should be changed to deal with errors - add them to exception handling if they appear
284  self.write(object)
285  self.commit()
286 
287  def rollback(self):
288  try:
289  self.session.rollback()
290  except:
291  traceback.print_exc()
292  print("Session couldn't be rolled back.")
293 
294 class factory():
295  """
296  Contains methods for creating objects.
297  """
298  def __init__(self, connection):
299  self.connection = connection
300 
301  # class_name is the class name of the model to be used
302  # pkargs is a dictionary of keyword arguments used as primary key values
303  # this dictionary will be used to populate the object of type name class_name
304  def object(self, class_name, **pkargs):
305  from data_sources import json_list
306  from models import apply_filters
307  # get the class that self.connection holds from the class name
308  model = self.connection.model(class_name)
309 
310  if self.connection.session == None:
311  return None
312 
313  # query for the ORM object, and return the appropriate object (None, CondDBFW object, or json_list)
314  model_data = self.connection.session.query(model)
315  if len(pkargs.items()) != 0:
316  # apply the filters defined in **kwargs
317  model_data = apply_filters(model_data, model, **pkargs)
318  amount = pkargs["amount"] if "amount" in pkargs.keys() else None
319  model_data = model_data.limit(amount)
320  if model_data.count() > 1:
321  # if we have multiple objects, return a json_list
322  return json_list(model_data.all())
323  elif model_data.count() == 1:
324  # if we have a single object, return that object
325  return model_data.first()
326  else:
327  # if we have no objects returned, return None
328  return None
329  else:
330  # no column arguments were given, so return an empty object
331  new_object = model()
332  new_object.empty = True
333  return new_object
334 
335 def _get_netrc_data(netrc_file, key):
336  """
337  Returns a dictionary {login : ..., account : ..., password : ...}
338  """
339  try:
340  headers = ["login", "account", "password"]
341  authenticator_tuple = netrc.netrc(netrc_file).authenticators(key)
342  if authenticator_tuple == None:
343  raise Exception("netrc file must contain key '%s'." % key)
344  except:
345  raise Exception("Couldn't get credentials from netrc file.")
346  return dict(zip(headers, authenticator_tuple))
347 
348 def new_connection_dictionary(connection_data, secrets=None, mode="r"):
349  """
350  Function used to construct connection data dictionaries - internal to framework.
351  """
352  frontier_str_length = len("frontier://")
353  sqlite_str_length = len("sqlite://")
354  #sqlite_file_str_length = len("sqlite_file://")
355  oracle_str_length = len("oracle://")
356 
357  if type(connection_data) in [str, unicode] and connection_data[0:frontier_str_length] == "frontier://":
358  """
359  frontier://database_name/schema
360  """
361  db_name = connection_data[frontier_str_length:].split("/")[0]
362  schema = connection_data[frontier_str_length:].split("/")[1]
363  connection_data = {}
364  connection_data["database_name"] = db_name
365  connection_data["schema"] = schema
366  connection_data["host"] = "frontier"
367  connection_data["secrets"] = None
368  elif type(connection_data) in [str, unicode] and connection_data[0:sqlite_str_length] == "sqlite://":
369  """
370  sqlite://database_file_name
371  """
372  # for now, just support "sqlite://" format for sqlite connection strings
373  db_name = connection_data[sqlite_str_length:]
374  schema = ""
375  connection_data = {}
376  connection_data["database_name"] = os.path.abspath(db_name)
377  connection_data["schema"] = schema
378  connection_data["host"] = "sqlite"
379  connection_data["secrets"] = None
380  elif type(connection_data) in [str, unicode] and connection_data[0:oracle_str_length] == "oracle://":
381  """
382  oracle://account:password@database_name
383  or
384  oracle://database_name/schema (requires a separate method of authentication - either dictionary or netrc)
385  """
386  new_connection_string = connection_data[oracle_str_length:]
387 
388  if ":" in new_connection_string:
389  # the user has given a password - usually in the case of the db upload service
390  database_name = new_connection_string[new_connection_string.index("@")+1:]
391  schema_name = new_connection_string[0:new_connection_string.index(":")]
392  # set username based on connection string
393  username = new_connection_string[0:new_connection_string.index(":")]
394  password = new_connection_string[new_connection_string.index(":")+1:new_connection_string.index("@")]
395  else:
396  mode_to_netrc_key_suffix = {"r" : "read", "w" : "write"}
397  database_name = new_connection_string[0:new_connection_string.index("/")]
398  schema_name = new_connection_string[new_connection_string.index("/")+1:]
399  if secrets == None:
400  username = str(raw_input("Enter the username you want to connect to the schema '%s' with: " % (schema_name)))
401  password = str(raw_input("Enter the password for the user '%s' in database '%s': " % (username, database_name)))
402  else:
403  if isinstance(secrets, str):
404  netrc_key = "%s/%s/%s" % (database_name, schema_name, mode_to_netrc_key_suffix[mode])
405  netrc_data = _get_netrc_data(secrets, key=netrc_key)
406  # take the username from the netrc entry corresponding to the mode the database is opened in
407  # eg, if the user has given mode="read", the database_name/schema_name/read entry will be taken
408  username = netrc_data["login"]
409  password = netrc_data["password"]
410  elif isinstance(secrets, dict):
411  username = secrets["user"]
412  password = secrets["password"]
413  else:
414  raise Exception("Invalid type given for secrets. Either an str or a dict must be given.")
415 
416  #print("Connected to database %s, schema %s, with username %s." % (database_name, schema_name, username))
417 
418  connection_data = {}
419  connection_data["database_name"] = database_name
420  connection_data["schema"] = schema_name
421  connection_data["password"] = password
422  connection_data["host"] = "oracle"
423  connection_data["secrets"] = {"login" : username, "password" : password}
424 
425  return connection_data
426 
427 def engine_from_dictionary(dictionary, pooling=True):
428  if dictionary["host"] != "sqlite":
429  if dictionary["host"] != "frontier":
430  # probably oracle
431  # if not frontier, we have to authenticate
432  user = dictionary["secrets"]["login"]
433  pwd = dictionary["secrets"]["password"]
434  # set max label length for oracle
435  if pooling:
436  return create_engine(connection.build_oracle_url(user, pwd, dictionary["database_name"]), label_length=6)
437  else:
438  return create_engine(connection.build_oracle_url(user, pwd, dictionary["database_name"]), label_length=6, poolclass=NullPool)
439  else:
440  # if frontier, no need to authenticate
441  # set max label length for frontier
442  if pooling:
443  return create_engine(connection.build_frontier_url(dictionary["database_name"], dictionary["schema"]), label_length=6)
444  else:
445  return create_engine(connection.build_frontier_url(dictionary["database_name"], dictionary["schema"]), label_length=6, poolclass=NullPool)
446  else:
447  # if host is sqlite, making the url is easy - no authentication
448  return create_engine("sqlite:///%s" % dictionary["database_name"])
449 
450 
451 def connect(connection_data, mode="r", map_blobs=False, secrets=None, pooling=True):
452  """
453  Utility method for user - set up a connection object.
454  """
455  con = connection(connection_data=connection_data, mode=mode, map_blobs=map_blobs, secrets=secrets, pooling=pooling)
456  con = con.setup()
457  return con
def global_tag(self, pkargs)
Definition: querying.py:204
def session_independent_object(object, schema=None)
Definition: models.py:32
def connect(connection_data, mode="r", map_blobs=False, secrets=None, pooling=True)
Definition: querying.py:451
def rollback(self)
Definition: querying.py:287
def build_frontier_url(db_name, schema)
Definition: querying.py:158
def object(self, class_name, pkargs)
Definition: querying.py:304
def object(self, model, pk_to_value)
Definition: querying.py:196
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:65
static void * communicate(void *obj)
Definition: DQMNet.cc:1251
def model(self, model_name)
Definition: querying.py:188
def write_and_commit(self, object)
Definition: querying.py:278
def hard_close(self)
Definition: querying.py:183
def __init__(self, connection)
Definition: querying.py:298
def setup(self)
Definition: querying.py:77
def close_session(self)
Definition: querying.py:179
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
def global_tag_map(self, pkargs)
Definition: querying.py:207
def write(self, object)
Definition: querying.py:266
def _cms_frontier_string(database, schema="cms_conditions")
Definition: querying.py:127
def engine_from_dictionary(dictionary, pooling=True)
Definition: querying.py:427
def payload(self, pkargs)
Definition: querying.py:219
def build_oracle_url(user, pwd, db_name)
Definition: querying.py:142
def iov(self, pkargs)
Definition: querying.py:216
def _get_CMS_frontier_connection_string(database)
Definition: querying.py:119
def commit(self)
Definition: querying.py:271
def new_connection_dictionary(connection_data, secrets=None, mode="r")
Definition: querying.py:348
def apply_filters(orm_query, orm_class, filters)
Definition: models.py:182
def _oracle_match_format(self, string)
Definition: querying.py:226
def __init__(self, connection_data, mode=None, map_blobs=False, secrets=None, pooling=False)
Definition: querying.py:36
def tear_down(self)
Definition: querying.py:172
def tag(self, pkargs)
Definition: querying.py:213
#define str(s)
def search_everything(self, string, amount=10)
Definition: querying.py:230
def _get_netrc_data(netrc_file, key)
Definition: querying.py:335
double split
Definition: MVATrainer.cc:139
def _cms_oracle_string(user, pwd, db_name)
Definition: querying.py:135