CMS 3D CMS Logo

querying.py
Go to the documentation of this file.
1 """
2 
3 connection class translates either a connection string for sqlite, oracle of frontier into a connection object.
4 Also sets up ORM with SQLAlchemy.
5 
6 connection class can also take a pre-constructed engine - useful for web services.
7 
8 """
9 from __future__ import print_function
10 from __future__ import absolute_import
11 
12 import sqlalchemy
13 from sqlalchemy import create_engine, text, or_
14 from sqlalchemy.orm import sessionmaker
15 from sqlalchemy.pool import NullPool
16 import datetime
17 from .data_sources import json_data_node
18 from copy import deepcopy
19 from . import models
20 import traceback
21 import os
22 import netrc
23 import sys
24 
26  engine = None
27  connection = None
28  session = None
29  connection_data = None
30  netrc_authenticators = None
31  secrets = None
32  """
33 
34  Given a connection string, parses the connection string and connects.
35 
36  """
37  def __init__(self, connection_data, mode=None, map_blobs=False, secrets=None, pooling=False):
38 
39  self._pooling = pooling
40 
41  # add querying utility properties
42  # these must belong to the connection since the way in which their values are handled
43  # depends on the database being connected to.
47  self.regexp.connection_object = self
48 
49  if type(connection_data) in [str, unicode]:
50  # if we've been given a connection string, process it
51  self.connection_data = new_connection_dictionary(connection_data, secrets=secrets, mode=mode)
52  self.schema = self.connection_data.get("schema") if self.connection_data.get("schema") != None else ""
53 
54  self.range.database_type = self.connection_data["host"]
55  self.radius.database_type = self.connection_data["host"]
56  self.regexp.database_type = self.connection_data["host"]
57  else:
58  self.connection_data = connection_data
59  # assume we have an engine
60  # we need to take the string representation so we know which type of db we're aiming at
61  engine_string = str(connection_data)
62  db_type = None
63  if "oracle" in engine_string:
64  db_type = "oracle"
65  elif "frontier" in engine_string:
66  db_type = "frontier"
67  elif "sqlite" in engine_string:
68  db_type = "sqlite"
69 
70  self.range.database_type = db_type
71  self.radius.database_type = db_type
72  self.regexp.database_type = db_type
73 
74  from . import models as ms
75  self.models = ms.generate(map_blobs)
76  #self.base = self.models["Base"]
77 
78  def setup(self):
79  """
80  Setup engine with given credentials from netrc file, and make a session maker.
81  """
82 
83  if isinstance(self.connection_data, dict):
85  else:
86  # we've been given an engine by the user
87  # use it as the engine
88  self.engine = self.connection_data
89 
90  self.sessionmaker = sessionmaker(bind=self.engine)
91  self.session = self.sessionmaker()
92  self.factory = factory(self)
93 
94  # assign correct schema for database name to each model
95  tmp_models_dict = {}
96  for key in self.models:
97  if self.models[key].__class__ == sqlalchemy.ext.declarative.api.DeclarativeMeta\
98  and str(self.models[key].__name__) != "Base":
99 
100  if isinstance(self.connection_data, dict):
101  # we can only extract the secrets and schema individuall
102  # if we were given a dictionary... if we were given an engine
103  # we can't do this without parsing the connection string from the engine
104  # - a wide range of which it will be difficult to support!
105  self.models[key].__table__.schema = self.connection_data["schema"]
106  self.models[key].secrets = self.connection_data["secrets"]
107 
108  self.models[key].session = self.session
109  # isn't used anywhere - comment it out for now
110  #self.models[key].authentication = self.netrc_authenticators
111  self.models[key].connection = self
112  tmp_models_dict[key.lower()] = self.models[key]
113  tmp_models_dict[key.lower()].empty = False
114 
115  self.models = tmp_models_dict
116 
117  return self
118 
119  @staticmethod
121  try:
122  import subprocess
123  return subprocess.Popen(['cmsGetFnConnect', 'frontier://%s' % database], stdout = subprocess.PIPE).communicate()[0].strip()
124  except:
125  raise Exception("Frontier connections can only be constructed when inside a CMSSW environment.")
126 
127  @staticmethod
128  def _cms_frontier_string(database, schema="cms_conditions"):
129  """
130  Get database string for frontier.
131  """
132  import urllib
133  return 'oracle+frontier://@%s/%s' % (urllib.quote_plus(connection._get_CMS_frontier_connection_string(database)), schema)
134 
135  @staticmethod
136  def _cms_oracle_string(user, pwd, db_name):
137  """
138  Get database string for oracle.
139  """
140  return 'oracle://%s:%s@%s' % (user, pwd, db_name)
141 
142  @staticmethod
143  def build_oracle_url(user, pwd, db_name):
144  """
145  Build the connection url, and get credentials from self.secrets dictionary.
146  """
147 
148  database_url = connection._cms_oracle_string(user, pwd, db_name)
149 
150  try:
151  url = sqlalchemy.engine.url.make_url(database_url)
152  if url.password is None:
153  url.password = pwd
154  except sqlalchemy.exc.ArgumentError:
155  url = sqlalchemy.engine.url.make_url('sqlite:///%s' % db_name)
156  return url
157 
158  @staticmethod
159  def build_frontier_url(db_name, schema):
160  database_url = connection._cms_frontier_string(db_name, schema)
161 
162  try:
163  url = sqlalchemy.engine.url.make_url(database_url)
164  except sqlalchemy.exc.ArgumentError:
165  """
166  Is this needed for a use case?
167  """
168  url = sqlalchemy.engine.url.make_url('sqlite:///%s' % db_name)
169  return url
170 
171  # currently just commits and closes the current session (ends transaction, closes connection)
172  # may do other things later
173  def tear_down(self):
174  try:
175  self.session.commit()
176  self.close_session()
177  except:
178  return "Couldn't tear down connection on engine %s." % str(self.engine)
179 
180  def close_session(self):
181  self.session.close()
182  return True
183 
184  def hard_close(self):
185  self.engine.dispose()
186  return True
187 
188  # get model based on given model name
189  def model(self, model_name):
190  if model_name.__class__ == sqlalchemy.ext.declarative.api.DeclarativeMeta:
191  model_name = model_name.__name__
192  model_name = model_name.replace("_", "")
193  return self.models[model_name]
194 
195  # model should be the class the developer wants to be instantiated
196  # pk_to_value maps primary keys to values
197  def object(self, model, pk_to_value):
198  if self.session == None:
199  return None
200  model_data = self.session.query(model)
201  for pk in pk_to_value:
202  model_data = model_data.filter(model.__dict__[pk] == pk_to_value[pk])
203  return model_data.first()
204 
205  def global_tag(self, **pkargs):
206  return self.factory.object("globaltag", **pkargs)
207 
208  def global_tag_map(self, **pkargs):
209  return self.factory.object("globaltagmap", **pkargs)
210 
211  """def global_tag_map_request(self, **pkargs):
212  return self.factory.object("globaltagmaprequest", **pkargs)"""
213 
214  def tag(self, **pkargs):
215  return self.factory.object("tag", **pkargs)
216 
217  def iov(self, **pkargs):
218  return self.factory.object("iov", **pkargs)
219 
220  def payload(self, **pkargs):
221  return self.factory.object("payload", **pkargs)
222 
223  """def record(self, **pkargs):
224  return self.factory.object("record", **pkargs)"""
225 
226  # adds %% at the beginning and end so LIKE in SQL searches all of the string
227  def _oracle_match_format(self, string):
228  return "%%%s%%" % string
229 
230  # returns dictionary mapping object type to a list of all objects found in the search
231  def search_everything(self, string, amount=10):
232  string = self._oracle_match_format(string)
233 
234  gt = self.model("globaltag")
235  global_tags = self.session.query(gt).filter(or_(
236  gt.name.ilike(string),
237  gt.description.ilike(string),
238  gt.release.ilike(string)
239  )).limit(amount)
240  tag = self.model("tag")
241  tags = self.session.query(tag).filter(or_(
242  tag.name.ilike(string),
243  tag.object_type.ilike(string),
244  tag.description.ilike(string))
245  ).limit(amount)
246  iov = self.model("iov")
247  iovs = self.session.query(iov).filter(or_(
248  iov.tag_name.ilike(string),
249  iov.since.ilike(string),
250  iov.payload_hash.ilike(string),
251  iov.insertion_time.ilike(string)
252  )).limit(amount)
253  payload = self.model("payload")
254  payloads = self.session.query(payload).filter(or_(
255  payload.hash.ilike(string),
256  payload.object_type.ilike(string),
257  payload.insertion_time.ilike(string)
258  )).limit(amount)
259 
260  return json_data_node.make({
261  "global_tags" : global_tags.all(),
262  "tags" : tags.all(),
263  "iovs" : iovs.all(),
264  "payloads" : payloads.all()
265  })
266 
267  def write(self, object):
268  new_object = models.session_independent_object(object, schema=self.schema)
269  self.session.add(new_object)
270  return new_object
271 
272  def commit(self):
273  try:
274  self.session.commit()
275  except:
276  traceback.print_exc()
277  self.session.rollback()
278 
279  def write_and_commit(self, object):
280  if isinstance(object, list):
281  for item in object:
282  self.write_and_commit(item)
283  else:
284  # should be changed to deal with errors - add them to exception handling if they appear
285  self.write(object)
286  self.commit()
287 
288  def rollback(self):
289  try:
290  self.session.rollback()
291  except:
292  traceback.print_exc()
293  print("Session couldn't be rolled back.")
294 
295 class factory():
296  """
297  Contains methods for creating objects.
298  """
299  def __init__(self, connection):
300  self.connection = connection
301 
302  # class_name is the class name of the model to be used
303  # pkargs is a dictionary of keyword arguments used as primary key values
304  # this dictionary will be used to populate the object of type name class_name
305  def object(self, class_name, **pkargs):
306  from .data_sources import json_list
307  from .models import apply_filters
308  # get the class that self.connection holds from the class name
309  model = self.connection.model(class_name)
310 
311  if self.connection.session == None:
312  return None
313 
314  # query for the ORM object, and return the appropriate object (None, CondDBFW object, or json_list)
315  model_data = self.connection.session.query(model)
316  if len(pkargs.items()) != 0:
317  # apply the filters defined in **kwargs
318  model_data = apply_filters(model_data, model, **pkargs)
319  amount = pkargs["amount"] if "amount" in pkargs.keys() else None
320  model_data = model_data.limit(amount)
321  if model_data.count() > 1:
322  # if we have multiple objects, return a json_list
323  return json_list(model_data.all())
324  elif model_data.count() == 1:
325  # if we have a single object, return that object
326  return model_data.first()
327  else:
328  # if we have no objects returned, return None
329  return None
330  else:
331  # no column arguments were given, so return an empty object
332  new_object = model()
333  new_object.empty = True
334  return new_object
335 
336 def _get_netrc_data(netrc_file, key):
337  """
338  Returns a dictionary {login : ..., account : ..., password : ...}
339  """
340  try:
341  headers = ["login", "account", "password"]
342  authenticator_tuple = netrc.netrc(netrc_file).authenticators(key)
343  if authenticator_tuple == None:
344  raise Exception("netrc file must contain key '%s'." % key)
345  except:
346  raise Exception("Couldn't get credentials from netrc file.")
347  return dict(zip(headers, authenticator_tuple))
348 
349 def new_connection_dictionary(connection_data, secrets=None, mode="r"):
350  """
351  Function used to construct connection data dictionaries - internal to framework.
352  """
353  frontier_str_length = len("frontier://")
354  sqlite_str_length = len("sqlite://")
355  #sqlite_file_str_length = len("sqlite_file://")
356  oracle_str_length = len("oracle://")
357 
358  if type(connection_data) in [str, unicode] and connection_data[0:frontier_str_length] == "frontier://":
359  """
360  frontier://database_name/schema
361  """
362  db_name = connection_data[frontier_str_length:].split("/")[0]
363  schema = connection_data[frontier_str_length:].split("/")[1]
364  connection_data = {}
365  connection_data["database_name"] = db_name
366  connection_data["schema"] = schema
367  connection_data["host"] = "frontier"
368  connection_data["secrets"] = None
369  elif type(connection_data) in [str, unicode] and connection_data[0:sqlite_str_length] == "sqlite://":
370  """
371  sqlite://database_file_name
372  """
373  # for now, just support "sqlite://" format for sqlite connection strings
374  db_name = connection_data[sqlite_str_length:]
375  schema = ""
376  connection_data = {}
377  connection_data["database_name"] = os.path.abspath(db_name)
378  connection_data["schema"] = schema
379  connection_data["host"] = "sqlite"
380  connection_data["secrets"] = None
381  elif type(connection_data) in [str, unicode] and connection_data[0:oracle_str_length] == "oracle://":
382  """
383  oracle://account:password@database_name
384  or
385  oracle://database_name/schema (requires a separate method of authentication - either dictionary or netrc)
386  """
387  new_connection_string = connection_data[oracle_str_length:]
388 
389  if ":" in new_connection_string:
390  # the user has given a password - usually in the case of the db upload service
391  database_name = new_connection_string[new_connection_string.index("@")+1:]
392  schema_name = new_connection_string[0:new_connection_string.index(":")]
393  # set username based on connection string
394  username = new_connection_string[0:new_connection_string.index(":")]
395  password = new_connection_string[new_connection_string.index(":")+1:new_connection_string.index("@")]
396  else:
397  mode_to_netrc_key_suffix = {"r" : "read", "w" : "write"}
398  database_name = new_connection_string[0:new_connection_string.index("/")]
399  schema_name = new_connection_string[new_connection_string.index("/")+1:]
400  if secrets == None:
401  username = str(raw_input("Enter the username you want to connect to the schema '%s' with: " % (schema_name)))
402  password = str(raw_input("Enter the password for the user '%s' in database '%s': " % (username, database_name)))
403  else:
404  if isinstance(secrets, str):
405  netrc_key = "%s/%s/%s" % (database_name, schema_name, mode_to_netrc_key_suffix[mode])
406  netrc_data = _get_netrc_data(secrets, key=netrc_key)
407  # take the username from the netrc entry corresponding to the mode the database is opened in
408  # eg, if the user has given mode="read", the database_name/schema_name/read entry will be taken
409  username = netrc_data["login"]
410  password = netrc_data["password"]
411  elif isinstance(secrets, dict):
412  username = secrets["user"]
413  password = secrets["password"]
414  else:
415  raise Exception("Invalid type given for secrets. Either an str or a dict must be given.")
416 
417  #print("Connected to database %s, schema %s, with username %s." % (database_name, schema_name, username))
418 
419  connection_data = {}
420  connection_data["database_name"] = database_name
421  connection_data["schema"] = schema_name
422  connection_data["password"] = password
423  connection_data["host"] = "oracle"
424  connection_data["secrets"] = {"login" : username, "password" : password}
425 
426  return connection_data
427 
428 def engine_from_dictionary(dictionary, pooling=True):
429  if dictionary["host"] != "sqlite":
430  if dictionary["host"] != "frontier":
431  # probably oracle
432  # if not frontier, we have to authenticate
433  user = dictionary["secrets"]["login"]
434  pwd = dictionary["secrets"]["password"]
435  # set max label length for oracle
436  if pooling:
437  return create_engine(connection.build_oracle_url(user, pwd, dictionary["database_name"]), label_length=6)
438  else:
439  return create_engine(connection.build_oracle_url(user, pwd, dictionary["database_name"]), label_length=6, poolclass=NullPool)
440  else:
441  # if frontier, no need to authenticate
442  # set max label length for frontier
443  if pooling:
444  return create_engine(connection.build_frontier_url(dictionary["database_name"], dictionary["schema"]), label_length=6)
445  else:
446  return create_engine(connection.build_frontier_url(dictionary["database_name"], dictionary["schema"]), label_length=6, poolclass=NullPool)
447  else:
448  # if host is sqlite, making the url is easy - no authentication
449  return create_engine("sqlite:///%s" % dictionary["database_name"])
450 
451 
452 def connect(connection_data, mode="r", map_blobs=False, secrets=None, pooling=True):
453  """
454  Utility method for user - set up a connection object.
455  """
456  con = connection(connection_data=connection_data, mode=mode, map_blobs=map_blobs, secrets=secrets, pooling=pooling)
457  con = con.setup()
458  return con
querying.connection.close_session
def close_session(self)
Definition: querying.py:180
querying.connection.setup
def setup(self)
Definition: querying.py:78
querying.connection.write
def write(self, object)
Definition: querying.py:267
querying.factory.connection
connection
Definition: querying.py:300
querying.connection.sessionmaker
sessionmaker
Definition: querying.py:90
querying._get_netrc_data
def _get_netrc_data(netrc_file, key)
Definition: querying.py:336
querying.factory.__init__
def __init__(self, connection)
Definition: querying.py:299
querying.connection.regexp
regexp
Definition: querying.py:46
querying.connection.payload
def payload(self, **pkargs)
Definition: querying.py:220
digitizers_cfi.strip
strip
Definition: digitizers_cfi.py:19
querying.connection._pooling
_pooling
Definition: querying.py:39
cms::xerces::dispose
void dispose(XMLCh *ptr)
Definition: XercesStrUtils.h:15
querying.new_connection_dictionary
def new_connection_dictionary(connection_data, secrets=None, mode="r")
Definition: querying.py:349
querying.connection
Definition: querying.py:25
querying.connection._get_CMS_frontier_connection_string
def _get_CMS_frontier_connection_string(database)
Definition: querying.py:120
querying.connection.commit
def commit(self)
Definition: querying.py:272
models.Radius
Definition: models.py:102
querying.factory.object
def object(self, class_name, **pkargs)
Definition: querying.py:305
querying.connect
def connect(connection_data, mode="r", map_blobs=False, secrets=None, pooling=True)
Definition: querying.py:452
cms::dd::split
std::vector< std::string_view > split(std::string_view, const char *)
querying.connection.schema
schema
Definition: querying.py:52
querying.connection.build_oracle_url
def build_oracle_url(user, pwd, db_name)
Definition: querying.py:143
models.RegExp
Definition: models.py:132
ReggeGribovPartonMC_EposLHC_2760GeV_PbPb_cfi.model
model
Definition: ReggeGribovPartonMC_EposLHC_2760GeV_PbPb_cfi.py:11
querying.connection.global_tag
def global_tag(self, **pkargs)
Definition: querying.py:205
querying.connection.__init__
def __init__(self, connection_data, mode=None, map_blobs=False, secrets=None, pooling=False)
Definition: querying.py:37
querying.connection.search_everything
def search_everything(self, string, amount=10)
Definition: querying.py:231
querying.connection.radius
radius
Definition: querying.py:45
query
Definition: query.py:1
querying.connection.models
models
Definition: querying.py:75
querying.connection._cms_oracle_string
def _cms_oracle_string(user, pwd, db_name)
Definition: querying.py:136
querying.connection._oracle_match_format
def _oracle_match_format(self, string)
Definition: querying.py:227
str
#define str(s)
Definition: TestProcessor.cc:48
communicate
static void * communicate(void *obj)
Definition: DQMNet.cc:1049
querying.connection.tear_down
def tear_down(self)
Definition: querying.py:173
ALCARECOTkAlBeamHalo_cff.filter
filter
Definition: ALCARECOTkAlBeamHalo_cff.py:27
querying.connection.global_tag_map
def global_tag_map(self, **pkargs)
Definition: querying.py:208
querying.connection.tag
def tag(self, **pkargs)
Definition: querying.py:214
PVValHelper::add
void add(std::map< std::string, TH1 * > &h, TH1 *hist)
Definition: PVValidationHelpers.cc:12
Exception
querying.connection.engine
engine
Definition: querying.py:84
querying.connection.build_frontier_url
def build_frontier_url(db_name, schema)
Definition: querying.py:159
edm::print
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:66
querying.connection.object
def object(self, model, pk_to_value)
Definition: querying.py:197
ComparisonHelper::zip
OutputIterator zip(InputIterator1 first1, InputIterator1 last1, InputIterator2 first2, InputIterator2 last2, OutputIterator result, Compare comp)
Definition: L1TStage2CaloLayer1.h:38
querying.connection.rollback
def rollback(self)
Definition: querying.py:288
querying.connection.factory
factory
Definition: querying.py:92
querying.connection.model
def model(self, model_name)
Definition: querying.py:189
querying.engine_from_dictionary
def engine_from_dictionary(dictionary, pooling=True)
Definition: querying.py:428
querying.connection.range
range
Definition: querying.py:44
models.Range
Definition: models.py:118
remoteMonitoring_LED_IterMethod_cfg.limit
limit
Definition: remoteMonitoring_LED_IterMethod_cfg.py:427
querying.connection.iov
def iov(self, **pkargs)
Definition: querying.py:217
models.session_independent_object
def session_independent_object(object, schema=None)
Definition: models.py:33
querying.factory
Definition: querying.py:295
querying.connection.write_and_commit
def write_and_commit(self, object)
Definition: querying.py:279
querying.connection.connection_data
connection_data
Definition: querying.py:51
models.apply_filters
def apply_filters(orm_query, orm_class, **filters)
Definition: models.py:183
querying.connection._cms_frontier_string
def _cms_frontier_string(database, schema="cms_conditions")
Definition: querying.py:128
querying.connection.session
session
Definition: querying.py:91
querying.connection.hard_close
def hard_close(self)
Definition: querying.py:184