1 from __future__
import print_function
9 import CondCore.Utilities.CondDBFW.querying
as querying
10 import CondCore.Utilities.CondDBFW.data_sources
as data_sources
11 import CondCore.Utilities.CondDBFW.data_formats
as data_formats
12 import CondCore.Utilities.CondDBFW.shell
as shell
13 import CondCore.Utilities.CondDBFW.models
as models
15 from CondCore.Utilities.CondDBFW.utils
import to_timestamp, to_datetime, friendly_since
16 from CondCore.Utilities.CondDBFW.models
import Range, Radius
18 prod_connection_string =
"frontier://FrontierProd/CMS_CONDITIONS" 28 self.connection.tear_down()
36 def test_check_keys_in_models(self):
38 Verifies that each key that is required is present in self.connection's model dictionary, 39 and also that self.connection has a proxy method for each model that is generated. 41 keys = [
"globaltag",
"globaltagmap",
"tag",
"iov",
"payload"]
43 self.assertTrue(key
in self.connection.models.keys())
44 proxy_method_names = [
"global_tag",
"global_tag_map",
"tag",
"iov",
"payload"]
45 for name
in proxy_method_names:
46 self.assertTrue(hasattr(self.connection, name))
48 def test_raw_factory_calls_all_models(self):
50 Verifies that the factory object held by the connection generates classes belonging to the type 51 held by self.connection. 53 keys = [
"globaltag",
"globaltagmap",
"tag",
"iov",
"payload"]
55 self.assertTrue(isinstance(self.connection.factory.object(key), self.connection.models[key]))
63 def test_get_global_tag(self):
65 global_tag = self.connection.global_tag(name=self.global_tag_name)
66 self.assertTrue(global_tag.name !=
None)
67 self.assertTrue(global_tag.tags() !=
None)
68 self.assertTrue(isinstance(global_tag, self.connection.models[
"globaltag"]))
70 def test_get_empty_global_tag(self):
71 empty_gt = self.connection.global_tag()
72 self.assertTrue(isinstance(empty_gt, self.connection.models[
"globaltag"]))
73 self.assertTrue(empty_gt.empty)
75 def test_all_global_tags_empty_gt(self):
76 empty_gt = self.connection.global_tag()
77 all_gts = empty_gt.all(amount=10)
78 self.assertTrue(all_gts !=
None)
79 self.assertTrue(len(all_gts.data()) != 0)
81 def test_all_method_parameters(self):
82 if self.connection.connection_data[
"host"].lower() ==
"frontier":
83 print(
"Cannot query for timestamps on frontier connection.")
85 empty_gt = self.connection.global_tag()
86 sample_time = datetime.datetime(year=2016, month=1, day=1)
87 now = datetime.datetime.now()
88 time_range =
Range(sample_time, now)
89 time_radius =
Radius(sample_time, datetime.timedelta(weeks=4))
90 all_gts_in_interval = empty_gt.all(insertion_time=time_range).
data()
91 for gt
in all_gts_in_interval:
92 self.assertTrue(sample_time <= gt.insertion_time <= now)
94 def test_as_dicts_method_without_timestamps_conversion(self):
95 global_tag = self.connection.global_tag(name=self.global_tag_name)
96 dict_form = global_tag.as_dicts(convert_timestamps=
False)
97 self.assertTrue(isinstance(dict_form, dict))
98 self.assertTrue(dict_form[
"insertion_time"], datetime.datetime)
99 self.assertTrue(dict_form[
"snapshot_time"], datetime.datetime)
101 def test_as_dicts_method_with_timestamps_conversion(self):
102 global_tag = self.connection.global_tag(name=self.global_tag_name)
103 dict_form = global_tag.as_dicts(convert_timestamps=
True)
104 self.assertTrue(isinstance(dict_form, dict))
105 self.assertTrue(dict_form[
"insertion_time"], str)
106 self.assertTrue(dict_form[
"snapshot_time"], str)
110 self.assertTrue(
to_datetime(dict_form[
"insertion_time"]) == global_tag.insertion_time)
111 self.assertTrue(
to_datetime(dict_form[
"snapshot_time"]) == global_tag.snapshot_time)
113 def test_get_tag_maps_in_global_tag(self):
114 global_tag = self.connection.global_tag(name=self.global_tag_name)
117 global_tag_maps = global_tag.tags()
119 global_tag_maps_list = global_tag_maps.data()
120 self.assertTrue(isinstance(global_tag_maps_list, list))
121 self.assertTrue(len(global_tag_maps_list) != 0)
123 for gt_map
in global_tag_maps_list:
124 self.assertTrue(isinstance(gt_map, self.connection.models[
"globaltagmap"]))
126 def test_get_tag_maps_in_global_tag_with_parameters(self):
127 global_tag = self.connection.global_tag(name=self.global_tag_name)
129 global_tag_maps_specific_record = global_tag.tags(record=
"AlCaRecoTriggerBitsRcd")
133 gt_maps_spec_record_list = global_tag_maps_specific_record.data()
134 self.assertTrue(isinstance(gt_maps_spec_record_list, list))
137 self.assertTrue(len(gt_maps_spec_record_list) == 3)
139 def test_get_all_iovs_within_range(self):
140 global_tag = self.connection.global_tag(name=self.global_tag_name)
142 since_range = self.connection.range(200000, 300000)
143 iovs = global_tag.iovs(since=since_range)
145 iovs_list = iovs.data()
146 self.assertTrue(isinstance(iovs_list, list))
148 for iov
in iovs_list:
149 self.assertTrue(isinstance(iov, self.connection.models[
"iov"]))
150 self.assertTrue(200000 <= iov.since <= 300000)
152 def test_gt_diff(self):
153 gt1 = self.connection.global_tag(name=
"74X_dataRun1_v1")
154 gt2 = self.connection.global_tag(name=
"74X_dataRun1_v3")
155 difference_by_arithmetic = gt1 - gt2
156 difference_by_method = gt1.diff(gt2)
162 difference_arithmetic_list = difference_by_arithmetic.data()
163 difference_method_list = difference_by_method.data()
164 self.assertTrue(isinstance(difference_arithmetic_list, list))
165 self.assertTrue(isinstance(difference_method_list, list))
167 self.assertTrue(len(difference_arithmetic_list) == len(difference_method_list))
169 for n
in range(len(difference_arithmetic_list)):
170 self.assertTrue(difference_arithmetic_list[n][
"%s Tag" % gt1.name] != difference_arithmetic_list[n][
"%s Tag" % gt2.name])
171 self.assertTrue(difference_method_list[n][
"%s Tag" % gt1.name] != difference_method_list[n][
"%s Tag" % gt2.name])
181 global_tag_name =
"74X_dataRun1_HLT_frozen_v2" 182 tag_name =
"AlCaRecoTriggerBits_MuonDQM_v1_hlt" 183 gt_map = self.connection.global_tag_map(global_tag_name=self.
global_tag_name, tag_name=tag_name)
184 self.assertTrue(isinstance(gt_map, self.connection.models[
"globaltagmap"]))
187 empty_gt_map = self.connection.global_tag_map()
188 self.assertTrue(isinstance(empty_gt_map, self.connection.models[
"globaltagmap"]))
189 self.assertTrue(empty_gt_map.empty)
199 tag_name =
"EBAlignment_measured_v01_express" 200 tag = self.connection.tag(name=tag_name)
203 self.assertTrue(tag.name !=
None)
204 self.assertTrue(isinstance(tag, self.connection.models[
"tag"]))
207 tag = self.connection.tag()
208 self.assertTrue(isinstance(tag, self.connection.models[
"tag"]))
209 self.assertTrue(tag.empty)
212 tag_name =
"EBAlignment_measured_v01_express" 213 tag = self.connection.tag(name=tag_name)
214 self.assertTrue(tag !=
None)
215 parent_global_tags = tag.parent_global_tags()
216 self.assertTrue(parent_global_tags !=
None)
219 empty_tag = self.connection.tag()
220 all_tags = empty_tag.all(amount=10)
221 self.assertTrue(all_tags !=
None)
223 self.assertTrue(len(all_tags.data()) != 0)
226 tag_name =
"EBAlignment_measured_v01_express" 227 empty_tag = self.connection.tag(name=tag_name)
228 all_tags = empty_tag.all(amount=10)
229 self.assertTrue(all_tags !=
None)
231 self.assertTrue(len(all_tags.data()) != 0)
240 tag_name =
"EBAlignment_measured_v01_express" 241 tag = self.connection.tag(name=tag_name)
244 raw_list = iovs.data()
245 self.assertTrue(isinstance(raw_list, list))
246 first_iov = raw_list[0]
247 self.assertTrue(isinstance(first_iov, self.connection.models[
"iov"]))
250 tag_name =
"EBAlignment_measured_v01_express" 251 iovs = self.connection.iov(tag_name=tag_name)
253 raw_list = iovs.data()
254 self.assertTrue(isinstance(raw_list, list))
255 first_iov = raw_list[0]
256 self.assertTrue(isinstance(first_iov, self.connection.models[
"iov"]))
259 empty_iov = self.connection.iov()
260 self.assertTrue(isinstance(empty_iov, self.connection.models[
"iov"]))
261 self.assertTrue(empty_iov.empty)
271 payload_hash =
"00172cd62d8abae41915978d815ae62cc08ad8b9" 272 payload = self.connection.payload(hash=payload_hash)
273 self.assertTrue(isinstance(payload, self.connection.models[
"payload"]))
276 payload = self.connection.payload()
277 self.assertTrue(isinstance(payload, self.connection.models[
"payload"]))
278 self.assertTrue(payload.empty)
281 payload_hash =
"00172cd62d8abae41915978d815ae62cc08ad8b9" 282 payload = self.connection.payload(hash=payload_hash)
283 self.assertTrue(payload !=
None)
284 parent_tags = payload.parent_tags()
285 self.assertTrue(parent_tags !=
None)
288 payload_hash =
"00172cd62d8abae41915978d815ae62cc08ad8b9" 289 payload = self.connection.payload(hash=payload_hash)
290 self.assertTrue(payload !=
None)
291 parent_tags = payload.parent_tags()
301 string_for_non_empty_result =
"ecal" 302 data = self.connection.search_everything(string_for_non_empty_result)
303 self.assertTrue(len(data.get(
"global_tags").
data()) != 0)
304 self.assertTrue(len(data.get(
"tags").
data()) != 0)
305 self.assertTrue(len(data.get(
"payloads").
data()) != 0)
314 tags = self.connection.tag(time_type=
"Run")
315 self.assertTrue(len(tags.data()) > 1)
318 tag = self.connection.tag()
319 self.assertTrue(tag.empty)
322 tag = self.connection.tag(name=
"dfasdf")
323 self.assertTrue(tag ==
None)
326 self.connection.tear_down()
def test_all_tags_non_empty_tag(self)
def factory_tests(querying_tests)
def test_factory_no_result(self)
def test_all_tags_empty_tag(self)
def connect(connection_data, mode="r", map_blobs=False, secrets=None, pooling=True)
PixelRecoRange< float > Range
def test_factory_empty_result(self)
def test_get_empty_global_tag_map(self)
S & print(S &os, JobReport::InputFile const &f)
def test_get_empty_tag(self)
def test_get_parent_tags_payload(self)
def test_get_global_tag_map(self)
def test_get_parent_global_tags(self)
def to_datetime(date_string)
def test_get_payload(self)
def test_type_parent_tags(self)
def test_get_empty_iov(self)
def test_factory_multiple_results(self)
def test_get_iovs_by_iov_query(self)
def test_get_empty_payload(self)
char data[epos_bytes_allocation]
def global_tag_tests(querying_tests)
def test_check_connection(self)
def test_search_everything(self)