CMS 3D CMS Logo

querying_tests.py
Go to the documentation of this file.
1 from __future__ import print_function
2 import unittest
3 import sys
4 import datetime
5 import pprint
6 import subprocess
7 import os
8 
9 import CondCore.Utilities.CondDBFW.querying as querying
10 import CondCore.Utilities.CondDBFW.data_sources as data_sources
11 import CondCore.Utilities.CondDBFW.data_formats as data_formats
12 import CondCore.Utilities.CondDBFW.shell as shell
13 import CondCore.Utilities.CondDBFW.models as models
14 
15 from CondCore.Utilities.CondDBFW.utils import to_timestamp, to_datetime, friendly_since
16 from CondCore.Utilities.CondDBFW.models import Range, Radius
17 
18 prod_connection_string = "frontier://FrontierProd/CMS_CONDITIONS"
19 secrets_source = None
20 
21 class querying_tests(unittest.TestCase):
22  def setUp(self):
23  self.connection = querying.connect(prod_connection_string, secrets=secrets_source)
24  self.global_tag_name = "74X_dataRun1_HLT_frozen_v2"
26  self.assertTrue(self.connection != None)
27  def tearDown(self):
28  self.connection.tear_down()
29 
30 def factory_tests(querying_tests):
31 
32  """
33  Factory
34  """
35 
36  def test_check_keys_in_models(self):
37  """
38  Verifies that each key that is required is present in self.connection's model dictionary,
39  and also that self.connection has a proxy method for each model that is generated.
40  """
41  keys = ["globaltag", "globaltagmap", "tag", "iov", "payload"]
42  for key in keys:
43  self.assertTrue(key in self.connection.models.keys())
44  proxy_method_names = ["global_tag", "global_tag_map", "tag", "iov", "payload"]
45  for name in proxy_method_names:
46  self.assertTrue(hasattr(self.connection, name))
47 
48  def test_raw_factory_calls_all_models(self):
49  """
50  Verifies that the factory object held by the connection generates classes belonging to the type
51  held by self.connection.
52  """
53  keys = ["globaltag", "globaltagmap", "tag", "iov", "payload"]
54  for key in keys:
55  self.assertTrue(isinstance(self.connection.factory.object(key), self.connection.models[key]))
56 
57 def global_tag_tests(querying_tests):
58 
59  """
60  Global Tags
61  """
62 
63  def test_get_global_tag(self):
64  # hard coded global tag for now
65  global_tag = self.connection.global_tag(name=self.global_tag_name)
66  self.assertTrue(global_tag.name != None)
67  self.assertTrue(global_tag.tags() != None)
68  self.assertTrue(isinstance(global_tag, self.connection.models["globaltag"]))
69 
70  def test_get_empty_global_tag(self):
71  empty_gt = self.connection.global_tag()
72  self.assertTrue(isinstance(empty_gt, self.connection.models["globaltag"]))
73  self.assertTrue(empty_gt.empty)
74 
75  def test_all_global_tags_empty_gt(self):
76  empty_gt = self.connection.global_tag()
77  all_gts = empty_gt.all(amount=10)
78  self.assertTrue(all_gts != None)
79  self.assertTrue(len(all_gts.data()) != 0)
80 
81  def test_all_method_parameters(self):
82  if self.connection.connection_data["host"].lower() == "frontier":
83  print("Cannot query for timestamps on frontier connection.")
84  return
85  empty_gt = self.connection.global_tag()
86  sample_time = datetime.datetime(year=2016, month=1, day=1)
87  now = datetime.datetime.now()
88  time_range = Range(sample_time, now)
89  time_radius = Radius(sample_time, datetime.timedelta(weeks=4))
90  all_gts_in_interval = empty_gt.all(insertion_time=time_range).data()
91  for gt in all_gts_in_interval:
92  self.assertTrue(sample_time <= gt.insertion_time <= now)
93 
94  def test_as_dicts_method_without_timestamps_conversion(self):
95  global_tag = self.connection.global_tag(name=self.global_tag_name)
96  dict_form = global_tag.as_dicts(convert_timestamps=False)
97  self.assertTrue(isinstance(dict_form, dict))
98  self.assertTrue(dict_form["insertion_time"], datetime.datetime)
99  self.assertTrue(dict_form["snapshot_time"], datetime.datetime)
100 
101  def test_as_dicts_method_with_timestamps_conversion(self):
102  global_tag = self.connection.global_tag(name=self.global_tag_name)
103  dict_form = global_tag.as_dicts(convert_timestamps=True)
104  self.assertTrue(isinstance(dict_form, dict))
105  self.assertTrue(dict_form["insertion_time"], str)
106  self.assertTrue(dict_form["snapshot_time"], str)
107 
108  # now check that the timestamp encoded in the strings is the same as the
109  # datetime object that were converted
110  self.assertTrue(to_datetime(dict_form["insertion_time"]) == global_tag.insertion_time)
111  self.assertTrue(to_datetime(dict_form["snapshot_time"]) == global_tag.snapshot_time)
112 
113  def test_get_tag_maps_in_global_tag(self):
114  global_tag = self.connection.global_tag(name=self.global_tag_name)
115 
116  # get global tag maps
117  global_tag_maps = global_tag.tags()
118  self.assertTrue(isinstance(global_tag_maps, data_sources.json_list))
119  global_tag_maps_list = global_tag_maps.data()
120  self.assertTrue(isinstance(global_tag_maps_list, list))
121  self.assertTrue(len(global_tag_maps_list) != 0)
122 
123  for gt_map in global_tag_maps_list:
124  self.assertTrue(isinstance(gt_map, self.connection.models["globaltagmap"]))
125 
126  def test_get_tag_maps_in_global_tag_with_parameters(self):
127  global_tag = self.connection.global_tag(name=self.global_tag_name)
128 
129  global_tag_maps_specific_record = global_tag.tags(record="AlCaRecoTriggerBitsRcd")
130 
131  self.assertTrue(isinstance(global_tag_maps_specific_record, data_sources.json_list))
132 
133  gt_maps_spec_record_list = global_tag_maps_specific_record.data()
134  self.assertTrue(isinstance(gt_maps_spec_record_list, list))
135  # this global tag is old, so this test is unlikely to fail since
136  # its global tag maps will not fail
137  self.assertTrue(len(gt_maps_spec_record_list) == 3)
138 
139  def test_get_all_iovs_within_range(self):
140  global_tag = self.connection.global_tag(name=self.global_tag_name)
141 
142  since_range = self.connection.range(200000, 300000)
143  iovs = global_tag.iovs(since=since_range)
144  self.assertTrue(isinstance(iovs, data_sources.json_list))
145  iovs_list = iovs.data()
146  self.assertTrue(isinstance(iovs_list, list))
147 
148  for iov in iovs_list:
149  self.assertTrue(isinstance(iov, self.connection.models["iov"]))
150  self.assertTrue(200000 <= iov.since <= 300000)
151 
152  def test_gt_diff(self):
153  gt1 = self.connection.global_tag(name="74X_dataRun1_v1")
154  gt2 = self.connection.global_tag(name="74X_dataRun1_v3")
155  difference_by_arithmetic = gt1 - gt2
156  difference_by_method = gt1.diff(gt2)
157 
158  # verify both methods of difference
159  self.assertTrue(isinstance(difference_by_arithmetic, data_sources.json_list))
160  self.assertTrue(isinstance(difference_by_method, data_sources.json_list))
161 
162  difference_arithmetic_list = difference_by_arithmetic.data()
163  difference_method_list = difference_by_method.data()
164  self.assertTrue(isinstance(difference_arithmetic_list, list))
165  self.assertTrue(isinstance(difference_method_list, list))
166 
167  self.assertTrue(len(difference_arithmetic_list) == len(difference_method_list))
168 
169  for n in range(len(difference_arithmetic_list)):
170  self.assertTrue(difference_arithmetic_list[n]["%s Tag" % gt1.name] != difference_arithmetic_list[n]["%s Tag" % gt2.name])
171  self.assertTrue(difference_method_list[n]["%s Tag" % gt1.name] != difference_method_list[n]["%s Tag" % gt2.name])
172 
174 
175 
176  """
177  Global Tag Map
178  """
179 
181  global_tag_name = "74X_dataRun1_HLT_frozen_v2"
182  tag_name = "AlCaRecoTriggerBits_MuonDQM_v1_hlt"
183  gt_map = self.connection.global_tag_map(global_tag_name=self.global_tag_name, tag_name=tag_name)
184  self.assertTrue(isinstance(gt_map, self.connection.models["globaltagmap"]))
185 
187  empty_gt_map = self.connection.global_tag_map()
188  self.assertTrue(isinstance(empty_gt_map, self.connection.models["globaltagmap"]))
189  self.assertTrue(empty_gt_map.empty)
190 
192 
193  """
194  Tags
195  """
196 
197  def test_get_tag(self):
198  # hard code tag for now
199  tag_name = "EBAlignment_measured_v01_express"
200  tag = self.connection.tag(name=tag_name)
201  # this tag exists, so shouldn't be NoneType
202  # we gave the name, so that should at least be in the tag object
203  self.assertTrue(tag.name != None)
204  self.assertTrue(isinstance(tag, self.connection.models["tag"]))
205 
207  tag = self.connection.tag()
208  self.assertTrue(isinstance(tag, self.connection.models["tag"]))
209  self.assertTrue(tag.empty)
210 
212  tag_name = "EBAlignment_measured_v01_express"
213  tag = self.connection.tag(name=tag_name)
214  self.assertTrue(tag != None)
215  parent_global_tags = tag.parent_global_tags()
216  self.assertTrue(parent_global_tags != None)
217 
219  empty_tag = self.connection.tag()
220  all_tags = empty_tag.all(amount=10)
221  self.assertTrue(all_tags != None)
222  # there are always tags in the db
223  self.assertTrue(len(all_tags.data()) != 0)
224 
226  tag_name = "EBAlignment_measured_v01_express"
227  empty_tag = self.connection.tag(name=tag_name)
228  all_tags = empty_tag.all(amount=10)
229  self.assertTrue(all_tags != None)
230  # there are always tags in the db
231  self.assertTrue(len(all_tags.data()) != 0)
232 
234 
235  """
236  IOVs
237  """
238 
239  def test_get_iov(self):
240  tag_name = "EBAlignment_measured_v01_express"
241  tag = self.connection.tag(name=tag_name)
242  iovs = tag.iovs()
243  self.assertTrue(isinstance(iovs, data_sources.json_list))
244  raw_list = iovs.data()
245  self.assertTrue(isinstance(raw_list, list))
246  first_iov = raw_list[0]
247  self.assertTrue(isinstance(first_iov, self.connection.models["iov"]))
248 
250  tag_name = "EBAlignment_measured_v01_express"
251  iovs = self.connection.iov(tag_name=tag_name)
252  self.assertTrue(isinstance(iovs, data_sources.json_list))
253  raw_list = iovs.data()
254  self.assertTrue(isinstance(raw_list, list))
255  first_iov = raw_list[0]
256  self.assertTrue(isinstance(first_iov, self.connection.models["iov"]))
257 
259  empty_iov = self.connection.iov()
260  self.assertTrue(isinstance(empty_iov, self.connection.models["iov"]))
261  self.assertTrue(empty_iov.empty)
262 
264 
265  """
266  Payloads
267  """
268 
269  def test_get_payload(self):
270  # hard coded payload for now
271  payload_hash = "00172cd62d8abae41915978d815ae62cc08ad8b9"
272  payload = self.connection.payload(hash=payload_hash)
273  self.assertTrue(isinstance(payload, self.connection.models["payload"]))
274 
276  payload = self.connection.payload()
277  self.assertTrue(isinstance(payload, self.connection.models["payload"]))
278  self.assertTrue(payload.empty)
279 
281  payload_hash = "00172cd62d8abae41915978d815ae62cc08ad8b9"
282  payload = self.connection.payload(hash=payload_hash)
283  self.assertTrue(payload != None)
284  parent_tags = payload.parent_tags()
285  self.assertTrue(parent_tags != None)
286 
288  payload_hash = "00172cd62d8abae41915978d815ae62cc08ad8b9"
289  payload = self.connection.payload(hash=payload_hash)
290  self.assertTrue(payload != None)
291  parent_tags = payload.parent_tags()
292  self.assertTrue(isinstance(parent_tags, data_sources.json_list))
293 
295 
296  """
297  Misc
298  """
299 
301  string_for_non_empty_result = "ecal"
302  data = self.connection.search_everything(string_for_non_empty_result)
303  self.assertTrue(len(data.get("global_tags").data()) != 0)
304  self.assertTrue(len(data.get("tags").data()) != 0)
305  self.assertTrue(len(data.get("payloads").data()) != 0)
306 
308 
309  """
310  Results types
311  """
312 
314  tags = self.connection.tag(time_type="Run")
315  self.assertTrue(len(tags.data()) > 1)
316 
318  tag = self.connection.tag()
319  self.assertTrue(tag.empty)
320 
322  tag = self.connection.tag(name="dfasdf")
323  self.assertTrue(tag == None)
324 
325  def tearDown(self):
326  self.connection.tear_down()
def test_all_tags_non_empty_tag(self)
def factory_tests(querying_tests)
def connect(connection_data, mode="r", map_blobs=False, secrets=None, pooling=True)
Definition: querying.py:451
PixelRecoRange< float > Range
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:65
def test_get_parent_global_tags(self)
def to_datetime(date_string)
Definition: utils.py:12
def test_get_iovs_by_iov_query(self)
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
def global_tag_tests(querying_tests)