8 import CondCore.Utilities.CondDBFW.querying
as querying
9 import CondCore.Utilities.CondDBFW.data_sources
as data_sources
10 import CondCore.Utilities.CondDBFW.data_formats
as data_formats
11 import CondCore.Utilities.CondDBFW.shell
as shell
12 import CondCore.Utilities.CondDBFW.models
as models
14 from CondCore.Utilities.CondDBFW.utils
import to_timestamp, to_datetime, friendly_since
15 from CondCore.Utilities.CondDBFW.models
import Range, Radius
17 prod_connection_string =
"frontier://FrontierProd/CMS_CONDITIONS" 27 self.connection.tear_down()
35 def test_check_keys_in_models(self):
37 Verifies that each key that is required is present in self.connection's model dictionary, 38 and also that self.connection has a proxy method for each model that is generated. 40 keys = [
"globaltag",
"globaltagmap",
"tag",
"iov",
"payload"]
42 self.assertTrue(key
in self.connection.models.keys())
43 proxy_method_names = [
"global_tag",
"global_tag_map",
"tag",
"iov",
"payload"]
44 for name
in proxy_method_names:
45 self.assertTrue(hasattr(self.connection, name))
47 def test_raw_factory_calls_all_models(self):
49 Verifies that the factory object held by the connection generates classes belonging to the type 50 held by self.connection. 52 keys = [
"globaltag",
"globaltagmap",
"tag",
"iov",
"payload"]
54 self.assertTrue(isinstance(self.connection.factory.object(key), self.connection.models[key]))
62 def test_get_global_tag(self):
64 global_tag = self.connection.global_tag(name=self.global_tag_name)
65 self.assertTrue(global_tag.name !=
None)
66 self.assertTrue(global_tag.tags() !=
None)
67 self.assertTrue(isinstance(global_tag, self.connection.models[
"globaltag"]))
69 def test_get_empty_global_tag(self):
70 empty_gt = self.connection.global_tag()
71 self.assertTrue(isinstance(empty_gt, self.connection.models[
"globaltag"]))
72 self.assertTrue(empty_gt.empty)
74 def test_all_global_tags_empty_gt(self):
75 empty_gt = self.connection.global_tag()
76 all_gts = empty_gt.all(amount=10)
77 self.assertTrue(all_gts !=
None)
78 self.assertTrue(len(all_gts.data()) != 0)
80 def test_all_method_parameters(self):
81 if self.connection.connection_data[
"host"].lower() ==
"frontier":
82 print(
"Cannot query for timestamps on frontier connection.")
84 empty_gt = self.connection.global_tag()
85 sample_time = datetime.datetime(year=2016, month=1, day=1)
86 now = datetime.datetime.now()
87 time_range =
Range(sample_time, now)
88 time_radius = Radius(sample_time, datetime.timedelta(weeks=4))
89 all_gts_in_interval = empty_gt.all(insertion_time=time_range).
data()
90 for gt
in all_gts_in_interval:
91 self.assertTrue(sample_time <= gt.insertion_time <= now)
93 def test_as_dicts_method_without_timestamps_conversion(self):
94 global_tag = self.connection.global_tag(name=self.global_tag_name)
95 dict_form = global_tag.as_dicts(convert_timestamps=
False)
96 self.assertTrue(isinstance(dict_form, dict))
97 self.assertTrue(dict_form[
"insertion_time"], datetime.datetime)
98 self.assertTrue(dict_form[
"snapshot_time"], datetime.datetime)
100 def test_as_dicts_method_with_timestamps_conversion(self):
101 global_tag = self.connection.global_tag(name=self.global_tag_name)
102 dict_form = global_tag.as_dicts(convert_timestamps=
True)
103 self.assertTrue(isinstance(dict_form, dict))
104 self.assertTrue(dict_form[
"insertion_time"], str)
105 self.assertTrue(dict_form[
"snapshot_time"], str)
109 self.assertTrue(
to_datetime(dict_form[
"insertion_time"]) == global_tag.insertion_time)
110 self.assertTrue(
to_datetime(dict_form[
"snapshot_time"]) == global_tag.snapshot_time)
112 def test_get_tag_maps_in_global_tag(self):
113 global_tag = self.connection.global_tag(name=self.global_tag_name)
116 global_tag_maps = global_tag.tags()
118 global_tag_maps_list = global_tag_maps.data()
119 self.assertTrue(isinstance(global_tag_maps_list, list))
120 self.assertTrue(len(global_tag_maps_list) != 0)
122 for gt_map
in global_tag_maps_list:
123 self.assertTrue(isinstance(gt_map, self.connection.models[
"globaltagmap"]))
125 def test_get_tag_maps_in_global_tag_with_parameters(self):
126 global_tag = self.connection.global_tag(name=self.global_tag_name)
128 global_tag_maps_specific_record = global_tag.tags(record=
"AlCaRecoTriggerBitsRcd")
132 gt_maps_spec_record_list = global_tag_maps_specific_record.data()
133 self.assertTrue(isinstance(gt_maps_spec_record_list, list))
136 self.assertTrue(len(gt_maps_spec_record_list) == 3)
138 def test_get_all_iovs_within_range(self):
139 global_tag = self.connection.global_tag(name=self.global_tag_name)
141 since_range = self.connection.range(200000, 300000)
142 iovs = global_tag.iovs(since=since_range)
144 iovs_list = iovs.data()
145 self.assertTrue(isinstance(iovs_list, list))
147 for iov
in iovs_list:
148 self.assertTrue(isinstance(iov, self.connection.models[
"iov"]))
149 self.assertTrue(200000 <= iov.since <= 300000)
151 def test_gt_diff(self):
152 gt1 = self.connection.global_tag(name=
"74X_dataRun1_v1")
153 gt2 = self.connection.global_tag(name=
"74X_dataRun1_v3")
154 difference_by_arithmetic = gt1 - gt2
155 difference_by_method = gt1.diff(gt2)
161 difference_arithmetic_list = difference_by_arithmetic.data()
162 difference_method_list = difference_by_method.data()
163 self.assertTrue(isinstance(difference_arithmetic_list, list))
164 self.assertTrue(isinstance(difference_method_list, list))
166 self.assertTrue(len(difference_arithmetic_list) == len(difference_method_list))
168 for n
in range(len(difference_arithmetic_list)):
169 self.assertTrue(difference_arithmetic_list[n][
"%s Tag" % gt1.name] != difference_arithmetic_list[n][
"%s Tag" % gt2.name])
170 self.assertTrue(difference_method_list[n][
"%s Tag" % gt1.name] != difference_method_list[n][
"%s Tag" % gt2.name])
180 global_tag_name =
"74X_dataRun1_HLT_frozen_v2" 181 tag_name =
"AlCaRecoTriggerBits_MuonDQM_v1_hlt" 182 gt_map = self.connection.global_tag_map(global_tag_name=self.
global_tag_name, tag_name=tag_name)
183 self.assertTrue(isinstance(gt_map, self.connection.models[
"globaltagmap"]))
186 empty_gt_map = self.connection.global_tag_map()
187 self.assertTrue(isinstance(empty_gt_map, self.connection.models[
"globaltagmap"]))
188 self.assertTrue(empty_gt_map.empty)
198 tag_name =
"EBAlignment_measured_v01_express" 199 tag = self.connection.tag(name=tag_name)
202 self.assertTrue(tag.name !=
None)
203 self.assertTrue(isinstance(tag, self.connection.models[
"tag"]))
206 tag = self.connection.tag()
207 self.assertTrue(isinstance(tag, self.connection.models[
"tag"]))
208 self.assertTrue(tag.empty)
211 tag_name =
"EBAlignment_measured_v01_express" 212 tag = self.connection.tag(name=tag_name)
213 self.assertTrue(tag !=
None)
214 parent_global_tags = tag.parent_global_tags()
215 self.assertTrue(parent_global_tags !=
None)
218 empty_tag = self.connection.tag()
219 all_tags = empty_tag.all(amount=10)
220 self.assertTrue(all_tags !=
None)
222 self.assertTrue(len(all_tags.data()) != 0)
225 tag_name =
"EBAlignment_measured_v01_express" 226 empty_tag = self.connection.tag(name=tag_name)
227 all_tags = empty_tag.all(amount=10)
228 self.assertTrue(all_tags !=
None)
230 self.assertTrue(len(all_tags.data()) != 0)
239 tag_name =
"EBAlignment_measured_v01_express" 240 tag = self.connection.tag(name=tag_name)
243 raw_list = iovs.data()
244 self.assertTrue(isinstance(raw_list, list))
245 first_iov = raw_list[0]
246 self.assertTrue(isinstance(first_iov, self.connection.models[
"iov"]))
249 tag_name =
"EBAlignment_measured_v01_express" 250 iovs = self.connection.iov(tag_name=tag_name)
252 raw_list = iovs.data()
253 self.assertTrue(isinstance(raw_list, list))
254 first_iov = raw_list[0]
255 self.assertTrue(isinstance(first_iov, self.connection.models[
"iov"]))
258 empty_iov = self.connection.iov()
259 self.assertTrue(isinstance(empty_iov, self.connection.models[
"iov"]))
260 self.assertTrue(empty_iov.empty)
270 payload_hash =
"00172cd62d8abae41915978d815ae62cc08ad8b9" 271 payload = self.connection.payload(hash=payload_hash)
272 self.assertTrue(isinstance(payload, self.connection.models[
"payload"]))
275 payload = self.connection.payload()
276 self.assertTrue(isinstance(payload, self.connection.models[
"payload"]))
277 self.assertTrue(payload.empty)
280 payload_hash =
"00172cd62d8abae41915978d815ae62cc08ad8b9" 281 payload = self.connection.payload(hash=payload_hash)
282 self.assertTrue(payload !=
None)
283 parent_tags = payload.parent_tags()
284 self.assertTrue(parent_tags !=
None)
287 payload_hash =
"00172cd62d8abae41915978d815ae62cc08ad8b9" 288 payload = self.connection.payload(hash=payload_hash)
289 self.assertTrue(payload !=
None)
290 parent_tags = payload.parent_tags()
300 string_for_non_empty_result =
"ecal" 301 data = self.connection.search_everything(string_for_non_empty_result)
302 self.assertTrue(len(data.get(
"global_tags").
data()) != 0)
303 self.assertTrue(len(data.get(
"tags").
data()) != 0)
304 self.assertTrue(len(data.get(
"payloads").
data()) != 0)
313 tags = self.connection.tag(time_type=
"Run")
314 self.assertTrue(len(tags.data()) > 1)
317 tag = self.connection.tag()
318 self.assertTrue(tag.empty)
321 tag = self.connection.tag(name=
"dfasdf")
322 self.assertTrue(tag ==
None)
325 self.connection.tear_down()
def test_all_tags_non_empty_tag(self)
def factory_tests(querying_tests)
def test_factory_no_result(self)
def test_all_tags_empty_tag(self)
def connect(connection_data, mode="r", map_blobs=False, secrets=None, pooling=True)
std::string print(const Track &, edm::Verbosity=edm::Concise)
Track print utility.
def test_factory_empty_result(self)
def test_get_empty_global_tag_map(self)
def test_get_empty_tag(self)
def test_get_parent_tags_payload(self)
PixelRecoRange< float > Range
def test_get_global_tag_map(self)
def test_get_parent_global_tags(self)
def to_datetime(date_string)
def test_get_payload(self)
def test_type_parent_tags(self)
def test_get_empty_iov(self)
def test_factory_multiple_results(self)
def test_get_iovs_by_iov_query(self)
def test_get_empty_payload(self)
char data[epos_bytes_allocation]
def global_tag_tests(querying_tests)
def test_check_connection(self)
def test_search_everything(self)