CMS 3D CMS Logo

querying_tests.py
Go to the documentation of this file.
1 import unittest
2 import sys
3 import datetime
4 import pprint
5 import subprocess
6 import os
7 
8 import CondCore.Utilities.CondDBFW.querying as querying
9 import CondCore.Utilities.CondDBFW.data_sources as data_sources
10 import CondCore.Utilities.CondDBFW.data_formats as data_formats
11 import CondCore.Utilities.CondDBFW.shell as shell
12 import CondCore.Utilities.CondDBFW.models as models
13 
14 from CondCore.Utilities.CondDBFW.utils import to_timestamp, to_datetime, friendly_since
15 from CondCore.Utilities.CondDBFW.models import Range, Radius
16 
17 prod_connection_string = "frontier://FrontierProd/CMS_CONDITIONS"
18 secrets_source = None
19 
20 class querying_tests(unittest.TestCase):
21  def setUp(self):
22  self.connection = querying.connect(prod_connection_string, secrets=secrets_source)
23  self.global_tag_name = "74X_dataRun1_HLT_frozen_v2"
25  self.assertTrue(self.connection != None)
26  def tearDown(self):
27  self.connection.tear_down()
28 
29 def factory_tests(querying_tests):
30 
31  """
32  Factory
33  """
34 
35  def test_check_keys_in_models(self):
36  """
37  Verifies that each key that is required is present in self.connection's model dictionary,
38  and also that self.connection has a proxy method for each model that is generated.
39  """
40  keys = ["globaltag", "globaltagmap", "tag", "iov", "payload"]
41  for key in keys:
42  self.assertTrue(key in self.connection.models.keys())
43  proxy_method_names = ["global_tag", "global_tag_map", "tag", "iov", "payload"]
44  for name in proxy_method_names:
45  self.assertTrue(hasattr(self.connection, name))
46 
47  def test_raw_factory_calls_all_models(self):
48  """
49  Verifies that the factory object held by the connection generates classes belonging to the type
50  held by self.connection.
51  """
52  keys = ["globaltag", "globaltagmap", "tag", "iov", "payload"]
53  for key in keys:
54  self.assertTrue(isinstance(self.connection.factory.object(key), self.connection.models[key]))
55 
56 def global_tag_tests(querying_tests):
57 
58  """
59  Global Tags
60  """
61 
62  def test_get_global_tag(self):
63  # hard coded global tag for now
64  global_tag = self.connection.global_tag(name=self.global_tag_name)
65  self.assertTrue(global_tag.name != None)
66  self.assertTrue(global_tag.tags() != None)
67  self.assertTrue(isinstance(global_tag, self.connection.models["globaltag"]))
68 
69  def test_get_empty_global_tag(self):
70  empty_gt = self.connection.global_tag()
71  self.assertTrue(isinstance(empty_gt, self.connection.models["globaltag"]))
72  self.assertTrue(empty_gt.empty)
73 
74  def test_all_global_tags_empty_gt(self):
75  empty_gt = self.connection.global_tag()
76  all_gts = empty_gt.all(amount=10)
77  self.assertTrue(all_gts != None)
78  self.assertTrue(len(all_gts.data()) != 0)
79 
80  def test_all_method_parameters(self):
81  if self.connection.connection_data["host"].lower() == "frontier":
82  print("Cannot query for timestamps on frontier connection.")
83  return
84  empty_gt = self.connection.global_tag()
85  sample_time = datetime.datetime(year=2016, month=1, day=1)
86  now = datetime.datetime.now()
87  time_range = Range(sample_time, now)
88  time_radius = Radius(sample_time, datetime.timedelta(weeks=4))
89  all_gts_in_interval = empty_gt.all(insertion_time=time_range).data()
90  for gt in all_gts_in_interval:
91  self.assertTrue(sample_time <= gt.insertion_time <= now)
92 
93  def test_as_dicts_method_without_timestamps_conversion(self):
94  global_tag = self.connection.global_tag(name=self.global_tag_name)
95  dict_form = global_tag.as_dicts(convert_timestamps=False)
96  self.assertTrue(isinstance(dict_form, dict))
97  self.assertTrue(dict_form["insertion_time"], datetime.datetime)
98  self.assertTrue(dict_form["snapshot_time"], datetime.datetime)
99 
100  def test_as_dicts_method_with_timestamps_conversion(self):
101  global_tag = self.connection.global_tag(name=self.global_tag_name)
102  dict_form = global_tag.as_dicts(convert_timestamps=True)
103  self.assertTrue(isinstance(dict_form, dict))
104  self.assertTrue(dict_form["insertion_time"], str)
105  self.assertTrue(dict_form["snapshot_time"], str)
106 
107  # now check that the timestamp encoded in the strings is the same as the
108  # datetime object that were converted
109  self.assertTrue(to_datetime(dict_form["insertion_time"]) == global_tag.insertion_time)
110  self.assertTrue(to_datetime(dict_form["snapshot_time"]) == global_tag.snapshot_time)
111 
112  def test_get_tag_maps_in_global_tag(self):
113  global_tag = self.connection.global_tag(name=self.global_tag_name)
114 
115  # get global tag maps
116  global_tag_maps = global_tag.tags()
117  self.assertTrue(isinstance(global_tag_maps, data_sources.json_list))
118  global_tag_maps_list = global_tag_maps.data()
119  self.assertTrue(isinstance(global_tag_maps_list, list))
120  self.assertTrue(len(global_tag_maps_list) != 0)
121 
122  for gt_map in global_tag_maps_list:
123  self.assertTrue(isinstance(gt_map, self.connection.models["globaltagmap"]))
124 
125  def test_get_tag_maps_in_global_tag_with_parameters(self):
126  global_tag = self.connection.global_tag(name=self.global_tag_name)
127 
128  global_tag_maps_specific_record = global_tag.tags(record="AlCaRecoTriggerBitsRcd")
129 
130  self.assertTrue(isinstance(global_tag_maps_specific_record, data_sources.json_list))
131 
132  gt_maps_spec_record_list = global_tag_maps_specific_record.data()
133  self.assertTrue(isinstance(gt_maps_spec_record_list, list))
134  # this global tag is old, so this test is unlikely to fail since
135  # its global tag maps will not fail
136  self.assertTrue(len(gt_maps_spec_record_list) == 3)
137 
138  def test_get_all_iovs_within_range(self):
139  global_tag = self.connection.global_tag(name=self.global_tag_name)
140 
141  since_range = self.connection.range(200000, 300000)
142  iovs = global_tag.iovs(since=since_range)
143  self.assertTrue(isinstance(iovs, data_sources.json_list))
144  iovs_list = iovs.data()
145  self.assertTrue(isinstance(iovs_list, list))
146 
147  for iov in iovs_list:
148  self.assertTrue(isinstance(iov, self.connection.models["iov"]))
149  self.assertTrue(200000 <= iov.since <= 300000)
150 
151  def test_gt_diff(self):
152  gt1 = self.connection.global_tag(name="74X_dataRun1_v1")
153  gt2 = self.connection.global_tag(name="74X_dataRun1_v3")
154  difference_by_arithmetic = gt1 - gt2
155  difference_by_method = gt1.diff(gt2)
156 
157  # verify both methods of difference
158  self.assertTrue(isinstance(difference_by_arithmetic, data_sources.json_list))
159  self.assertTrue(isinstance(difference_by_method, data_sources.json_list))
160 
161  difference_arithmetic_list = difference_by_arithmetic.data()
162  difference_method_list = difference_by_method.data()
163  self.assertTrue(isinstance(difference_arithmetic_list, list))
164  self.assertTrue(isinstance(difference_method_list, list))
165 
166  self.assertTrue(len(difference_arithmetic_list) == len(difference_method_list))
167 
168  for n in range(len(difference_arithmetic_list)):
169  self.assertTrue(difference_arithmetic_list[n]["%s Tag" % gt1.name] != difference_arithmetic_list[n]["%s Tag" % gt2.name])
170  self.assertTrue(difference_method_list[n]["%s Tag" % gt1.name] != difference_method_list[n]["%s Tag" % gt2.name])
171 
173 
174 
175  """
176  Global Tag Map
177  """
178 
180  global_tag_name = "74X_dataRun1_HLT_frozen_v2"
181  tag_name = "AlCaRecoTriggerBits_MuonDQM_v1_hlt"
182  gt_map = self.connection.global_tag_map(global_tag_name=self.global_tag_name, tag_name=tag_name)
183  self.assertTrue(isinstance(gt_map, self.connection.models["globaltagmap"]))
184 
186  empty_gt_map = self.connection.global_tag_map()
187  self.assertTrue(isinstance(empty_gt_map, self.connection.models["globaltagmap"]))
188  self.assertTrue(empty_gt_map.empty)
189 
191 
192  """
193  Tags
194  """
195 
196  def test_get_tag(self):
197  # hard code tag for now
198  tag_name = "EBAlignment_measured_v01_express"
199  tag = self.connection.tag(name=tag_name)
200  # this tag exists, so shouldn't be NoneType
201  # we gave the name, so that should at least be in the tag object
202  self.assertTrue(tag.name != None)
203  self.assertTrue(isinstance(tag, self.connection.models["tag"]))
204 
206  tag = self.connection.tag()
207  self.assertTrue(isinstance(tag, self.connection.models["tag"]))
208  self.assertTrue(tag.empty)
209 
211  tag_name = "EBAlignment_measured_v01_express"
212  tag = self.connection.tag(name=tag_name)
213  self.assertTrue(tag != None)
214  parent_global_tags = tag.parent_global_tags()
215  self.assertTrue(parent_global_tags != None)
216 
218  empty_tag = self.connection.tag()
219  all_tags = empty_tag.all(amount=10)
220  self.assertTrue(all_tags != None)
221  # there are always tags in the db
222  self.assertTrue(len(all_tags.data()) != 0)
223 
225  tag_name = "EBAlignment_measured_v01_express"
226  empty_tag = self.connection.tag(name=tag_name)
227  all_tags = empty_tag.all(amount=10)
228  self.assertTrue(all_tags != None)
229  # there are always tags in the db
230  self.assertTrue(len(all_tags.data()) != 0)
231 
233 
234  """
235  IOVs
236  """
237 
238  def test_get_iov(self):
239  tag_name = "EBAlignment_measured_v01_express"
240  tag = self.connection.tag(name=tag_name)
241  iovs = tag.iovs()
242  self.assertTrue(isinstance(iovs, data_sources.json_list))
243  raw_list = iovs.data()
244  self.assertTrue(isinstance(raw_list, list))
245  first_iov = raw_list[0]
246  self.assertTrue(isinstance(first_iov, self.connection.models["iov"]))
247 
249  tag_name = "EBAlignment_measured_v01_express"
250  iovs = self.connection.iov(tag_name=tag_name)
251  self.assertTrue(isinstance(iovs, data_sources.json_list))
252  raw_list = iovs.data()
253  self.assertTrue(isinstance(raw_list, list))
254  first_iov = raw_list[0]
255  self.assertTrue(isinstance(first_iov, self.connection.models["iov"]))
256 
258  empty_iov = self.connection.iov()
259  self.assertTrue(isinstance(empty_iov, self.connection.models["iov"]))
260  self.assertTrue(empty_iov.empty)
261 
263 
264  """
265  Payloads
266  """
267 
268  def test_get_payload(self):
269  # hard coded payload for now
270  payload_hash = "00172cd62d8abae41915978d815ae62cc08ad8b9"
271  payload = self.connection.payload(hash=payload_hash)
272  self.assertTrue(isinstance(payload, self.connection.models["payload"]))
273 
275  payload = self.connection.payload()
276  self.assertTrue(isinstance(payload, self.connection.models["payload"]))
277  self.assertTrue(payload.empty)
278 
280  payload_hash = "00172cd62d8abae41915978d815ae62cc08ad8b9"
281  payload = self.connection.payload(hash=payload_hash)
282  self.assertTrue(payload != None)
283  parent_tags = payload.parent_tags()
284  self.assertTrue(parent_tags != None)
285 
287  payload_hash = "00172cd62d8abae41915978d815ae62cc08ad8b9"
288  payload = self.connection.payload(hash=payload_hash)
289  self.assertTrue(payload != None)
290  parent_tags = payload.parent_tags()
291  self.assertTrue(isinstance(parent_tags, data_sources.json_list))
292 
294 
295  """
296  Misc
297  """
298 
300  string_for_non_empty_result = "ecal"
301  data = self.connection.search_everything(string_for_non_empty_result)
302  self.assertTrue(len(data.get("global_tags").data()) != 0)
303  self.assertTrue(len(data.get("tags").data()) != 0)
304  self.assertTrue(len(data.get("payloads").data()) != 0)
305 
307 
308  """
309  Results types
310  """
311 
313  tags = self.connection.tag(time_type="Run")
314  self.assertTrue(len(tags.data()) > 1)
315 
317  tag = self.connection.tag()
318  self.assertTrue(tag.empty)
319 
321  tag = self.connection.tag(name="dfasdf")
322  self.assertTrue(tag == None)
323 
324  def tearDown(self):
325  self.connection.tear_down()
def test_all_tags_non_empty_tag(self)
def factory_tests(querying_tests)
def connect(connection_data, mode="r", map_blobs=False, secrets=None, pooling=True)
Definition: querying.py:450
std::string print(const Track &, edm::Verbosity=edm::Concise)
Track print utility.
Definition: print.cc:10
PixelRecoRange< float > Range
def test_get_parent_global_tags(self)
def to_datetime(date_string)
Definition: utils.py:12
def test_get_iovs_by_iov_query(self)
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
def global_tag_tests(querying_tests)