CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
testing_classes.py
Go to the documentation of this file.
1 """
2 
3 This file contains all testing suites to be used to test the framework.
4 
5 Note: avoid checking for lengths of lists, or characteristics of data from db.
6 The db may change + this shouldn't cause tests to fail.
7 
8 TODO: Change code so that all connections are used when testing queries - if this isn't too bad a thing to do with the DBs.
9 
10 """
11 
12 import unittest
13 import sys
14 import datetime
15 import pprint
16 
17 import CondCore.Utilities.CondDBFW.querying as querying
18 import CondCore.Utilities.CondDBFW.data_sources as data_sources
19 import CondCore.Utilities.CondDBFW.data_formats as data_formats
20 import CondCore.Utilities.CondDBFW.shell as shell
21 from CondCore.Utilities.CondDBFW import querying_framework_api as api
22 
23 secrets_file = "/afs/cern.ch/cms/DB/conddb/.cms_cond/netrc"
24 
25 class querying_tests(unittest.TestCase):
26 
27  def setUp(self):
28  connection_data = {"db_alias" : "orapro", "host" : "oracle", "schema" : "cms_conditions", "secrets" : secrets_file}
29  self.connection = querying.connect(connection_data)
30 
32  self.assertTrue(self.connection != None)
33 
34  def test_get_tag(self):
35  # hard code tag for now
36  tag_name = "EBAlignment_measured_v01_express"
37  tag = self.connection.tag(name=tag_name)
38  # this tag exists, so shouldn't be NoneType
39  self.assertTrue(tag != None)
40  # we gave the name, so that should at least be in the tag object
41  self.assertTrue(tag.name != None)
42  self.assertEqual(tag.__class__.__name__.lower(), "tag")
43 
44  def test_get_empty_tag(self):
45  tag = self.connection.tag()
46  self.assertTrue(tag != None)
47  self.assertEqual(tag.__class__.__name__.lower(), "tag")
48 
50  # hard coded global tag for now
51  global_tag_name = "74X_dataRun1_HLT_frozen_v2"
52  global_tag = self.connection.global_tag(name=global_tag_name)
53  self.assertTrue(global_tag != None)
54  self.assertTrue(global_tag.name != None)
55  self.assertTrue(global_tag.tags() != None)
56  self.assertEqual(global_tag.__class__.__name__.lower(), "globaltag")
57 
58  def test_get_payload(self):
59  # hard coded payload for now
60  payload_hash = "00172cd62d8abae41915978d815ae62cc08ad8b9"
61  payload = self.connection.payload(hash=payload_hash)
62  self.assertTrue(payload != None)
63  self.assertEqual(payload.__class__.__name__.lower(), "payload")
64 
66  payload = self.connection.payload()
67  self.assertTrue(payload != None)
68  self.assertEqual(payload.__class__.__name__.lower(), "payload")
69 
71  payload_hash = "00172cd62d8abae41915978d815ae62cc08ad8b9"
72  payload = self.connection.payload(hash=payload_hash)
73  self.assertTrue(payload != None)
74  parent_tags = payload.parent_tags()
75  self.assertTrue(parent_tags != None)
76 
78  tag_name = "EBAlignment_measured_v01_express"
79  tag = self.connection.tag(name=tag_name)
80  self.assertTrue(tag != None)
81  parent_global_tags = tag.parent_global_tags()
82  self.assertTrue(parent_global_tags != None)
83 
85  empty_tag = self.connection.tag()
86  all_tags = empty_tag.all(10)
87  self.assertTrue(all_tags != None)
88  # there are always tags in the db
89  self.assertTrue(len(all_tags.data()) != 0)
90 
92  tag_name = "EBAlignment_measured_v01_express"
93  empty_tag = self.connection.tag(name=tag_name)
94  all_tags = empty_tag.all(10)
95  self.assertTrue(all_tags != None)
96  # there are always tags in the db
97  self.assertTrue(len(all_tags.data()) != 0)
98 
100  empty_gt = self.connection.global_tag()
101  all_gts = empty_gt.all(10)
102  self.assertTrue(all_gts != None)
103  self.assertTrue(len(all_gts.data()) != 0)
104 
106  string_for_non_empty_result = "ecal"
107  data = self.connection.search_everything(string_for_non_empty_result)
108  self.assertTrue(len(data.get("global_tags").data()) != 0)
109  self.assertTrue(len(data.get("tags").data()) != 0)
110  self.assertTrue(len(data.get("payloads").data()) != 0)
111 
113  tags = self.connection.tag(time_type="Run")
114  self.assertTrue(len(tags.data()) > 1)
115 
117  tag = self.connection.tag(name="EBAlignment_hlt")
118  self.assertEqual(tag.__class__.__name__.lower(), "tag")
119 
121  tag = self.connection.tag()
122  self.assertTrue(tag.empty)
123 
125  tag = self.connection.tag(name="dfasdf")
126  self.assertTrue(tag == None)
127 
128  def tearDown(self):
129  self.connection.close_session()
130 
131 class data_sources_tests(unittest.TestCase):
132 
133  def setUp(self):
134  connection_data = {"db_alias" : "orapro", "host" : "oracle", "schema" : "cms_conditions", "secrets" : secrets_file}
135  self.connection = querying.connect(connection_data)
136 
138  test_list = list(range(0, 10))
139  json_list_object = data_sources.json_data_node.make(test_list)
140  self.assertTrue(json_list_object != None)
141  self.assertEqual(json_list_object.data(), test_list)
142  for n in range(0, len(test_list)):
143  self.assertEqual(json_list_object.get(n).data(), test_list[n])
144 
146  test_dict = {"key1" : "value1", "key2" : "value2", "key3" : "value3"}
147  json_dict_object = data_sources.json_data_node.make(test_dict)
148  self.assertTrue(json_dict_object != None)
149  self.assertEqual(json_dict_object.data(), test_dict)
150  for key in test_dict:
151  self.assertEqual(json_dict_object.get(key).data(), test_dict[key])
152 
154  structure = {"key1" : [{"a" : 1, "b" : 3}, {"a" : 4, "b" : 8}], "key2" : ["header1", "header2", "header3"]}
155  json_structure_object = data_sources.json_data_node.make(structure)
156  self.assertEqual(json_structure_object.get("key1").data(), structure["key1"])
157  self.assertEqual(json_structure_object.get("key2").data(), structure["key2"])
158 
160  structure = {"key1" : [{"a" : 1, "b" : 3}, {"a" : 4, "b" : 8}], "key2" : ["header1", "header2", "header3"]}
161  new_structure = data_sources.json_data_node.make({})
162  new_structure.add_key([], "key1")
163  new_structure.get("key1").add_child({"a" : 1, "b" : 3})
164  new_structure.get("key1").add_child({"a" : 4, "b" : 8})
165  new_structure.add_key([], "key2")
166  new_structure.get("key2").add_child("header1")
167  new_structure.get("key2").add_child("header2")
168  new_structure.get("key2").add_child("header3")
169  self.assertEqual(new_structure.data(), structure)
170 
171  def test_check_types(self):
172  test_list = list(range(0, 10))
173  test_dict = {"key1" : "value1", "key2" : "value2", "key3" : "value3"}
174  json_list_object = data_sources.json_data_node.make(test_list)
175  json_dict_object = data_sources.json_data_node.make(test_dict)
176  self.assertEqual(json_list_object.__class__.__name__, "json_list")
177  self.assertEqual(json_dict_object.__class__.__name__, "json_dict")
178 
180  all_tags = self.connection.tag().all(10)
181  self.assertEqual(all_tags.__class__.__name__, "json_list")
182 
184  tag_name = "EBAlignment_measured_v01_express"
185  tag = self.connection.tag(name=tag_name)
186  self.assertTrue(tag != None)
187  parent_gts = tag.parent_global_tags()
188  self.assertEqual(parent_gts.__class__.__name__, "json_list")
189 
191  payload_hash = "00172cd62d8abae41915978d815ae62cc08ad8b9"
192  payload = self.connection.payload(hash=payload_hash)
193  self.assertTrue(payload != None)
194  parent_tags = payload.parent_tags()
195  self.assertEqual(parent_tags.__class__.__name__, "json_list")
196 
198  global_tag_name = "74X_dataRun1_HLT_frozen_v2"
199  global_tag = self.connection.global_tag(name=global_tag_name)
200  self.assertTrue(global_tag != None)
201  iovs = global_tag.iovs(valid=True)
202 
204  global_tag_name = "74X_dataRun1_HLT_frozen_v2"
205  global_tag = self.connection.global_tag(name=global_tag_name)
206  self.assertTrue(global_tag != None)
207  iovs = global_tag.iovs(valid=True)
208  self.assertEqual(iovs.__class__.__name__, "json_list")
209  snapshot_time = global_tag.snapshot_time
210  for iov in iovs:
211  insertion_time_as_datetime = datetime.datetime.strptime(iov.insertion_time, '%Y-%m-%d %H:%M:%S,%f')
212  self.assertTrue(insertion_time_as_datetime < snapshot_time)
213 
214  def tearDown(self):
215  self.connection.close_session()
216 
217 class data_formats_tests(unittest.TestCase):
218 
219  def setUp(self):
220  connection_data = {"db_alias" : "orapro", "host" : "oracle", "schema" : "cms_conditions", "secrets" : secrets_file}
221  self.connection = querying.connect(connection_data)
222 
224  tags = self.connection.tag().all(10)
225  list_of_dicts = data_formats._objects_to_dicts(tags)
226  self.assertEqual(list_of_dicts.__class__.__name__, "json_list")
227  for tag in tags:
228  self.assertEqual(tag.__class__.__name__.lower(), "tag")
229 
231  models_to_test = map(self.connection.model, ["global_tag", "tag", "payload", "iov"])
232  for model in models_to_test:
233  model_name = self.connection.class_name_to_column(model).lower()
234  objects = getattr(self.connection, model_name)().all(5)
235  dicts = data_formats._objects_to_dicts(objects)
236  orm_objects = data_formats._dicts_to_orm_objects(self.connection.model(model_name), dicts)
237  self.assertTrue(dicts != None)
238  for obj in orm_objects:
239  self.assertEqual(self.connection.class_name_to_column(obj.__class__).lower(), model_name)
240  headers = model.headers
241  for header in headers:
242  try:
243  test = getattr(obj, header)
244  header_exists = True
245  except:
246  print("'%s' doesn't exist." % header)
247  header_exists = False
248  self.assertTrue(header_exists)
249 
250  def tearDown(self):
251  self.connection.close_session()
252 
253 class shell_tests(unittest.TestCase):
254 
255  def setUp(self):
256  connection_data = {"db_alias" : "orapro", "host" : "oracle", "schema" : "cms_conditions", "secrets" : secrets_file}
257  self.connection = querying.connect(connection_data)
258 
259  def test_init_shell(self):
260  connection_data = {"db_alias" : "orapro", "host" : "oracle", "schema" : "cms_conditions", "secrets" : secrets_file}
261  connection = shell.connect(connection_data)
262  self.assertTrue(connection != None)
263 
264  def tearDown(self):
265  self.connection.close_session()
266 
267 class connection_tests(unittest.TestCase):
268 
269  def setUp(self):
270  pass
271 
273  connection_data = {"db_alias" : "orapro", "host" : "oracle", "schema" : "cms_conditions", "secrets" : secrets_file}
274  connection = querying.connect(connection_data)
275  self.assertTrue(connection != None)
276  # can cause failure if the database is down
277  close_session_result = connection.close_session()
278  self.assertEqual(close_session_result, True)
279 
280  """def test_oradev_connect(self):
281  connection_data = {"db_alias" : "oradev", "host" : "oracle", "schema" : "", "secrets" : secrets_file_1}
282  connection = querying.connect(connection_data)
283  self.assertTrue(connection != None)
284  # can cause failure if the database is down
285  close_session_result = connection.close_session()
286  self.assertEqual(close_session_result, True)"""
287 
289  for alias in ["pro", "arc", "int", "dev"]:
290  connection_data = {"db_alias" : alias, "host" : "frontier", "schema" : "cms_conditions"}
291  connection = querying.connect(connection_data)
292  # can cause failure if the database is down
293  close_session_result = connection.close_session()
294  self.assertEqual(close_session_result, True)
295 
297  connection_data = {"db_alias" : "pro", "host" : "frontier", "schema" : "cms_conditions"}
298  connection = querying.connect(connection_data)
299  # query for a tag
300  tag = connection.tag(name="EBAlignment_measured_v06_offline")
301  # can cause failure if the database is down
302  close_session_result = connection.close_session()
303  self.assertEqual(close_session_result, True)
304 
305  def tearDown(self):
306  pass
307 
308 class script_tests(unittest.TestCase):
309 
310  def setUp(self):
311  self.connection_data = {"db_alias" : "orapro", "host" : "oracle", "schema" : "cms_conditions", "secrets" : secrets_file}
313 
314  def test_script(self):
315  class script():
316  def script(self_instance, connection):
317  tag = self.connection.tag(name="EBAlignment_hlt")
318  valid_iovs = tag.iovs()
319  return valid_iovs
320  api_obj = api(self.connection_data)
321  data = api_obj.run_script(script(), output=False)
322  #pprint.pprint(data.data())
323  self.assertEqual(data.get(0).data().payload_hash, "1480c559bbbdacfec514c3cbcf2eb978403efd74")
324 
326  class script():
327  @data_formats.objects_to_dicts
328  def script(self_instance, connection):
329  tag = self.connection.tag(name="EBAlignment_hlt")
330  valid_iovs = tag.iovs()
331  return valid_iovs
332  api_obj = api(self.connection_data)
333  data = api_obj.run_script(script(), output=False)
334  #pprint.pprint(data.data())
335  self.assertEqual(data.get(0, "payload_hash").data(), "1480c559bbbdacfec514c3cbcf2eb978403efd74")
336 
337  def tearDown(self):
338  self.connection.close_session()
def connect
Definition: shell.py:10
std::string print(const Track &, edm::Verbosity=edm::Concise)
Track print utility.
Definition: print.cc:10
def _dicts_to_orm_objects
def _objects_to_dicts
Definition: data_formats.py:96
def connect
Definition: querying.py:352
tuple api
Definition: doHarvest.py:42
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run