CMS 3D CMS Logo

querying.py
Go to the documentation of this file.
1 """
2 
3 connection class translates either a connection string for sqlite, oracle of frontier into a connection object.
4 Also sets up ORM with SQLAlchemy.
5 
6 connection class can also take a pre-constructed engine - useful for web services.
7 
8 """
9 
10 import sqlalchemy
11 from sqlalchemy import create_engine, text, or_
12 from sqlalchemy.orm import sessionmaker
13 from sqlalchemy.pool import NullPool
14 import datetime
15 from data_sources import json_data_node
16 from copy import deepcopy
17 import models
18 import traceback
19 import os
20 import netrc
21 import sys
22 
24  engine = None
25  connection = None
26  session = None
27  connection_data = None
28  netrc_authenticators = None
29  secrets = None
30  """
31 
32  Given a connection string, parses the connection string and connects.
33 
34  """
35  def __init__(self, connection_data, mode=None, map_blobs=False, secrets=None, pooling=False):
36 
37  self._pooling = pooling
38 
39  # add querying utility properties
40  # these must belong to the connection since the way in which their values are handled
41  # depends on the database being connected to.
45  self.regexp.connection_object = self
46 
47  if type(connection_data) in [str, unicode]:
48  # if we've been given a connection string, process it
49  self.connection_data = new_connection_dictionary(connection_data, secrets=secrets, mode=mode)
50  self.schema = self.connection_data.get("schema") if self.connection_data.get("schema") != None else ""
51 
52  self.range.database_type = self.connection_data["host"]
53  self.radius.database_type = self.connection_data["host"]
54  self.regexp.database_type = self.connection_data["host"]
55  else:
56  self.connection_data = connection_data
57  # assume we have an engine
58  # we need to take the string representation so we know which type of db we're aiming at
59  engine_string = str(connection_data)
60  db_type = None
61  if "oracle" in engine_string:
62  db_type = "oracle"
63  elif "frontier" in engine_string:
64  db_type = "frontier"
65  elif "sqlite" in engine_string:
66  db_type = "sqlite"
67 
68  self.range.database_type = db_type
69  self.radius.database_type = db_type
70  self.regexp.database_type = db_type
71 
72  import models as ms
73  self.models = ms.generate(map_blobs)
74  #self.base = self.models["Base"]
75 
76  def setup(self):
77  """
78  Setup engine with given credentials from netrc file, and make a session maker.
79  """
80 
81  if type(self.connection_data) == dict:
83  else:
84  # we've been given an engine by the user
85  # use it as the engine
86  self.engine = self.connection_data
87 
88  self.sessionmaker = sessionmaker(bind=self.engine)
89  self.session = self.sessionmaker()
90  self.factory = factory(self)
91 
92  # assign correct schema for database name to each model
93  tmp_models_dict = {}
94  for key in self.models:
95  if self.models[key].__class__ == sqlalchemy.ext.declarative.api.DeclarativeMeta\
96  and str(self.models[key].__name__) != "Base":
97 
98  if type(self.connection_data) == dict:
99  # we can only extract the secrets and schema individuall
100  # if we were given a dictionary... if we were given an engine
101  # we can't do this without parsing the connection string from the engine
102  # - a wide range of which it will be difficult to support!
103  self.models[key].__table__.schema = self.connection_data["schema"]
104  self.models[key].secrets = self.connection_data["secrets"]
105 
106  self.models[key].session = self.session
107  # isn't used anywhere - comment it out for now
108  #self.models[key].authentication = self.netrc_authenticators
109  self.models[key].connection = self
110  tmp_models_dict[key.lower()] = self.models[key]
111  tmp_models_dict[key.lower()].empty = False
112 
113  self.models = tmp_models_dict
114 
115  return self
116 
117  @staticmethod
119  try:
120  import subprocess
121  return subprocess.Popen(['cmsGetFnConnect', 'frontier://%s' % database], stdout = subprocess.PIPE).communicate()[0].strip()
122  except:
123  raise Exception("Frontier connections can only be constructed when inside a CMSSW environment.")
124 
125  @staticmethod
126  def _cms_frontier_string(database, schema="cms_conditions"):
127  """
128  Get database string for frontier.
129  """
130  import urllib
131  return 'oracle+frontier://@%s/%s' % (urllib.quote_plus(connection._get_CMS_frontier_connection_string(database)), schema)
132 
133  @staticmethod
134  def _cms_oracle_string(user, pwd, db_name):
135  """
136  Get database string for oracle.
137  """
138  return 'oracle://%s:%s@%s' % (user, pwd, db_name)
139 
140  @staticmethod
141  def build_oracle_url(user, pwd, db_name):
142  """
143  Build the connection url, and get credentials from self.secrets dictionary.
144  """
145 
146  database_url = connection._cms_oracle_string(user, pwd, db_name)
147 
148  try:
149  url = sqlalchemy.engine.url.make_url(database_url)
150  if url.password is None:
151  url.password = pwd
152  except sqlalchemy.exc.ArgumentError:
153  url = sqlalchemy.engine.url.make_url('sqlite:///%s' % db_name)
154  return url
155 
156  @staticmethod
157  def build_frontier_url(db_name, schema):
158  database_url = connection._cms_frontier_string(db_name, schema)
159 
160  try:
161  url = sqlalchemy.engine.url.make_url(database_url)
162  except sqlalchemy.exc.ArgumentError:
163  """
164  Is this needed for a use case?
165  """
166  url = sqlalchemy.engine.url.make_url('sqlite:///%s' % db_name)
167  return url
168 
169  # currently just commits and closes the current session (ends transaction, closes connection)
170  # may do other things later
171  def tear_down(self):
172  try:
173  self.session.commit()
174  self.close_session()
175  except:
176  return "Couldn't tear down connection on engine %s." % str(self.engine)
177 
178  def close_session(self):
179  self.session.close()
180  return True
181 
182  def hard_close(self):
183  self.engine.dispose()
184  return True
185 
186  # get model based on given model name
187  def model(self, model_name):
188  if model_name.__class__ == sqlalchemy.ext.declarative.api.DeclarativeMeta:
189  model_name = model_name.__name__
190  model_name = model_name.replace("_", "")
191  return self.models[model_name]
192 
193  # model should be the class the developer wants to be instantiated
194  # pk_to_value maps primary keys to values
195  def object(self, model, pk_to_value):
196  if self.session == None:
197  return None
198  model_data = self.session.query(model)
199  for pk in pk_to_value:
200  model_data = model_data.filter(model.__dict__[pk] == pk_to_value[pk])
201  return model_data.first()
202 
203  def global_tag(self, **pkargs):
204  return self.factory.object("globaltag", **pkargs)
205 
206  def global_tag_map(self, **pkargs):
207  return self.factory.object("globaltagmap", **pkargs)
208 
209  """def global_tag_map_request(self, **pkargs):
210  return self.factory.object("globaltagmaprequest", **pkargs)"""
211 
212  def tag(self, **pkargs):
213  return self.factory.object("tag", **pkargs)
214 
215  def iov(self, **pkargs):
216  return self.factory.object("iov", **pkargs)
217 
218  def payload(self, **pkargs):
219  return self.factory.object("payload", **pkargs)
220 
221  """def record(self, **pkargs):
222  return self.factory.object("record", **pkargs)"""
223 
224  # adds %% at the beginning and end so LIKE in SQL searches all of the string
225  def _oracle_match_format(self, string):
226  return "%%%s%%" % string
227 
228  # returns dictionary mapping object type to a list of all objects found in the search
229  def search_everything(self, string, amount=10):
230  string = self._oracle_match_format(string)
231 
232  gt = self.model("globaltag")
233  global_tags = self.session.query(gt).filter(or_(
234  gt.name.ilike(string),
235  gt.description.ilike(string),
236  gt.release.ilike(string)
237  )).limit(amount)
238  tag = self.model("tag")
239  tags = self.session.query(tag).filter(or_(
240  tag.name.ilike(string),
241  tag.object_type.ilike(string),
242  tag.description.ilike(string))
243  ).limit(amount)
244  iov = self.model("iov")
245  iovs = self.session.query(iov).filter(or_(
246  iov.tag_name.ilike(string),
247  iov.since.ilike(string),
248  iov.payload_hash.ilike(string),
249  iov.insertion_time.ilike(string)
250  )).limit(amount)
251  payload = self.model("payload")
252  payloads = self.session.query(payload).filter(or_(
253  payload.hash.ilike(string),
254  payload.object_type.ilike(string),
255  payload.insertion_time.ilike(string)
256  )).limit(amount)
257 
258  return json_data_node.make({
259  "global_tags" : global_tags.all(),
260  "tags" : tags.all(),
261  "iovs" : iovs.all(),
262  "payloads" : payloads.all()
263  })
264 
265  def write(self, object):
266  new_object = models.session_independent_object(object, schema=self.schema)
267  self.session.add(new_object)
268  return new_object
269 
270  def commit(self):
271  try:
272  self.session.commit()
273  except:
274  traceback.print_exc()
275  self.session.rollback()
276 
277  def write_and_commit(self, object):
278  if type(object) == list:
279  for item in object:
280  self.write_and_commit(item)
281  else:
282  # should be changed to deal with errors - add them to exception handling if they appear
283  self.write(object)
284  self.commit()
285 
286  def rollback(self):
287  try:
288  self.session.rollback()
289  except:
290  traceback.print_exc()
291  print("Session couldn't be rolled back.")
292 
293 class factory():
294  """
295  Contains methods for creating objects.
296  """
297  def __init__(self, connection):
298  self.connection = connection
299 
300  # class_name is the class name of the model to be used
301  # pkargs is a dictionary of keyword arguments used as primary key values
302  # this dictionary will be used to populate the object of type name class_name
303  def object(self, class_name, **pkargs):
304  from data_sources import json_list
305  from models import apply_filters
306  # get the class that self.connection holds from the class name
307  model = self.connection.model(class_name)
308 
309  if self.connection.session == None:
310  return None
311 
312  # query for the ORM object, and return the appropriate object (None, CondDBFW object, or json_list)
313  model_data = self.connection.session.query(model)
314  if len(pkargs.items()) != 0:
315  # apply the filters defined in **kwargs
316  model_data = apply_filters(model_data, model, **pkargs)
317  amount = pkargs["amount"] if "amount" in pkargs.keys() else None
318  model_data = model_data.limit(amount)
319  if model_data.count() > 1:
320  # if we have multiple objects, return a json_list
321  return json_list(model_data.all())
322  elif model_data.count() == 1:
323  # if we have a single object, return that object
324  return model_data.first()
325  else:
326  # if we have no objects returned, return None
327  return None
328  else:
329  # no column arguments were given, so return an empty object
330  new_object = model()
331  new_object.empty = True
332  return new_object
333 
334 def _get_netrc_data(netrc_file, key):
335  """
336  Returns a dictionary {login : ..., account : ..., password : ...}
337  """
338  try:
339  headers = ["login", "account", "password"]
340  authenticator_tuple = netrc.netrc(netrc_file).authenticators(key)
341  if authenticator_tuple == None:
342  raise Exception("netrc file must contain key '%s'." % key)
343  except:
344  raise Exception("Couldn't get credentials from netrc file.")
345  return dict(zip(headers, authenticator_tuple))
346 
347 def new_connection_dictionary(connection_data, secrets=None, mode="r"):
348  """
349  Function used to construct connection data dictionaries - internal to framework.
350  """
351  frontier_str_length = len("frontier://")
352  sqlite_str_length = len("sqlite://")
353  #sqlite_file_str_length = len("sqlite_file://")
354  oracle_str_length = len("oracle://")
355 
356  if type(connection_data) in [str, unicode] and connection_data[0:frontier_str_length] == "frontier://":
357  """
358  frontier://database_name/schema
359  """
360  db_name = connection_data[frontier_str_length:].split("/")[0]
361  schema = connection_data[frontier_str_length:].split("/")[1]
362  connection_data = {}
363  connection_data["database_name"] = db_name
364  connection_data["schema"] = schema
365  connection_data["host"] = "frontier"
366  connection_data["secrets"] = None
367  elif type(connection_data) in [str, unicode] and connection_data[0:sqlite_str_length] == "sqlite://":
368  """
369  sqlite://database_file_name
370  """
371  # for now, just support "sqlite://" format for sqlite connection strings
372  db_name = connection_data[sqlite_str_length:]
373  schema = ""
374  connection_data = {}
375  connection_data["database_name"] = os.path.abspath(db_name)
376  connection_data["schema"] = schema
377  connection_data["host"] = "sqlite"
378  connection_data["secrets"] = None
379  elif type(connection_data) in [str, unicode] and connection_data[0:oracle_str_length] == "oracle://":
380  """
381  oracle://account:password@database_name
382  or
383  oracle://database_name/schema (requires a separate method of authentication - either dictionary or netrc)
384  """
385  new_connection_string = connection_data[oracle_str_length:]
386 
387  if ":" in new_connection_string:
388  # the user has given a password - usually in the case of the db upload service
389  database_name = new_connection_string[new_connection_string.index("@")+1:]
390  schema_name = new_connection_string[0:new_connection_string.index(":")]
391  # set username based on connection string
392  username = new_connection_string[0:new_connection_string.index(":")]
393  password = new_connection_string[new_connection_string.index(":")+1:new_connection_string.index("@")]
394  else:
395  mode_to_netrc_key_suffix = {"r" : "read", "w" : "write"}
396  database_name = new_connection_string[0:new_connection_string.index("/")]
397  schema_name = new_connection_string[new_connection_string.index("/")+1:]
398  if secrets == None:
399  username = str(raw_input("Enter the username you want to connect to the schema '%s' with: " % (schema_name)))
400  password = str(raw_input("Enter the password for the user '%s' in database '%s': " % (username, database_name)))
401  else:
402  if type(secrets) == str:
403  netrc_key = "%s/%s/%s" % (database_name, schema_name, mode_to_netrc_key_suffix[mode])
404  netrc_data = _get_netrc_data(secrets, key=netrc_key)
405  # take the username from the netrc entry corresponding to the mode the database is opened in
406  # eg, if the user has given mode="read", the database_name/schema_name/read entry will be taken
407  username = netrc_data["login"]
408  password = netrc_data["password"]
409  elif type(secrets) == dict:
410  username = secrets["user"]
411  password = secrets["password"]
412  else:
413  raise Exception("Invalid type given for secrets. Either an str or a dict must be given.")
414 
415  #print("Connected to database %s, schema %s, with username %s." % (database_name, schema_name, username))
416 
417  connection_data = {}
418  connection_data["database_name"] = database_name
419  connection_data["schema"] = schema_name
420  connection_data["password"] = password
421  connection_data["host"] = "oracle"
422  connection_data["secrets"] = {"login" : username, "password" : password}
423 
424  return connection_data
425 
426 def engine_from_dictionary(dictionary, pooling=True):
427  if dictionary["host"] != "sqlite":
428  if dictionary["host"] != "frontier":
429  # probably oracle
430  # if not frontier, we have to authenticate
431  user = dictionary["secrets"]["login"]
432  pwd = dictionary["secrets"]["password"]
433  # set max label length for oracle
434  if pooling:
435  return create_engine(connection.build_oracle_url(user, pwd, dictionary["database_name"]), label_length=6)
436  else:
437  return create_engine(connection.build_oracle_url(user, pwd, dictionary["database_name"]), label_length=6, poolclass=NullPool)
438  else:
439  # if frontier, no need to authenticate
440  # set max label length for frontier
441  if pooling:
442  return create_engine(connection.build_frontier_url(dictionary["database_name"], dictionary["schema"]), label_length=6)
443  else:
444  return create_engine(connection.build_frontier_url(dictionary["database_name"], dictionary["schema"]), label_length=6, poolclass=NullPool)
445  else:
446  # if host is sqlite, making the url is easy - no authentication
447  return create_engine("sqlite:///%s" % dictionary["database_name"])
448 
449 
450 def connect(connection_data, mode="r", map_blobs=False, secrets=None, pooling=True):
451  """
452  Utility method for user - set up a connection object.
453  """
454  con = connection(connection_data=connection_data, mode=mode, map_blobs=map_blobs, secrets=secrets, pooling=pooling)
455  con = con.setup()
456  return con
def global_tag(self, pkargs)
Definition: querying.py:203
def session_independent_object(object, schema=None)
Definition: models.py:31
def connect(connection_data, mode="r", map_blobs=False, secrets=None, pooling=True)
Definition: querying.py:450
def rollback(self)
Definition: querying.py:286
def build_frontier_url(db_name, schema)
Definition: querying.py:157
def object(self, class_name, pkargs)
Definition: querying.py:303
def object(self, model, pk_to_value)
Definition: querying.py:195
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:65
static void * communicate(void *obj)
Definition: DQMNet.cc:1251
def model(self, model_name)
Definition: querying.py:187
def write_and_commit(self, object)
Definition: querying.py:277
def hard_close(self)
Definition: querying.py:182
def __init__(self, connection)
Definition: querying.py:297
def setup(self)
Definition: querying.py:76
def close_session(self)
Definition: querying.py:178
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
def global_tag_map(self, pkargs)
Definition: querying.py:206
def write(self, object)
Definition: querying.py:265
def _cms_frontier_string(database, schema="cms_conditions")
Definition: querying.py:126
def engine_from_dictionary(dictionary, pooling=True)
Definition: querying.py:426
def payload(self, pkargs)
Definition: querying.py:218
def build_oracle_url(user, pwd, db_name)
Definition: querying.py:141
def iov(self, pkargs)
Definition: querying.py:215
def _get_CMS_frontier_connection_string(database)
Definition: querying.py:118
def commit(self)
Definition: querying.py:270
def new_connection_dictionary(connection_data, secrets=None, mode="r")
Definition: querying.py:347
def apply_filters(orm_query, orm_class, filters)
Definition: models.py:181
def _oracle_match_format(self, string)
Definition: querying.py:225
def __init__(self, connection_data, mode=None, map_blobs=False, secrets=None, pooling=False)
Definition: querying.py:35
def tear_down(self)
Definition: querying.py:171
def tag(self, pkargs)
Definition: querying.py:212
def search_everything(self, string, amount=10)
Definition: querying.py:229
def _get_netrc_data(netrc_file, key)
Definition: querying.py:334
double split
Definition: MVATrainer.cc:139
def _cms_oracle_string(user, pwd, db_name)
Definition: querying.py:134