3 This file contains all testing suites to be used to test the framework.
5 Note: avoid checking for lengths of lists, or characteristics of data from db.
6 The db may change + this shouldn't cause tests to fail.
8 TODO: Change code so that all connections are used when testing queries - if this isn't too bad a thing to do with the DBs.
17 import CondCore.Utilities.CondDBFW.querying
as querying
18 import CondCore.Utilities.CondDBFW.data_sources
as data_sources
19 import CondCore.Utilities.CondDBFW.data_formats
as data_formats
20 import CondCore.Utilities.CondDBFW.shell
as shell
21 from CondCore.Utilities.CondDBFW
import querying_framework_api
as api
23 secrets_file =
"/afs/cern.ch/cms/DB/conddb/.cms_cond/netrc"
28 connection_data = {
"db_alias" :
"orapro",
"host" :
"oracle",
"schema" :
"cms_conditions",
"secrets" : secrets_file}
36 tag_name =
"EBAlignment_measured_v01_express"
37 tag = self.connection.tag(name=tag_name)
39 self.assertTrue(tag !=
None)
41 self.assertTrue(tag.name !=
None)
42 self.assertEqual(tag.__class__.__name__.lower(),
"tag")
45 tag = self.connection.tag()
46 self.assertTrue(tag !=
None)
47 self.assertEqual(tag.__class__.__name__.lower(),
"tag")
51 global_tag_name =
"74X_dataRun1_HLT_frozen_v2"
52 global_tag = self.connection.global_tag(name=global_tag_name)
53 self.assertTrue(global_tag !=
None)
54 self.assertTrue(global_tag.name !=
None)
55 self.assertTrue(global_tag.tags() !=
None)
56 self.assertEqual(global_tag.__class__.__name__.lower(),
"globaltag")
60 payload_hash =
"00172cd62d8abae41915978d815ae62cc08ad8b9"
61 payload = self.connection.payload(hash=payload_hash)
62 self.assertTrue(payload !=
None)
63 self.assertEqual(payload.__class__.__name__.lower(),
"payload")
66 payload = self.connection.payload()
67 self.assertTrue(payload !=
None)
68 self.assertEqual(payload.__class__.__name__.lower(),
"payload")
71 payload_hash =
"00172cd62d8abae41915978d815ae62cc08ad8b9"
72 payload = self.connection.payload(hash=payload_hash)
73 self.assertTrue(payload !=
None)
74 parent_tags = payload.parent_tags()
75 self.assertTrue(parent_tags !=
None)
78 tag_name =
"EBAlignment_measured_v01_express"
79 tag = self.connection.tag(name=tag_name)
80 self.assertTrue(tag !=
None)
81 parent_global_tags = tag.parent_global_tags()
82 self.assertTrue(parent_global_tags !=
None)
85 empty_tag = self.connection.tag()
86 all_tags = empty_tag.all(10)
87 self.assertTrue(all_tags !=
None)
89 self.assertTrue(len(all_tags.data()) != 0)
92 tag_name =
"EBAlignment_measured_v01_express"
93 empty_tag = self.connection.tag(name=tag_name)
94 all_tags = empty_tag.all(10)
95 self.assertTrue(all_tags !=
None)
97 self.assertTrue(len(all_tags.data()) != 0)
100 empty_gt = self.connection.global_tag()
101 all_gts = empty_gt.all(10)
102 self.assertTrue(all_gts !=
None)
103 self.assertTrue(len(all_gts.data()) != 0)
106 string_for_non_empty_result =
"ecal"
107 data = self.connection.search_everything(string_for_non_empty_result)
108 self.assertTrue(len(data.get(
"global_tags").
data()) != 0)
109 self.assertTrue(len(data.get(
"tags").
data()) != 0)
110 self.assertTrue(len(data.get(
"payloads").
data()) != 0)
113 tags = self.connection.tag(time_type=
"Run")
114 self.assertTrue(len(tags.data()) > 1)
117 tag = self.connection.tag(name=
"EBAlignment_hlt")
118 self.assertEqual(tag.__class__.__name__.lower(),
"tag")
121 tag = self.connection.tag()
122 self.assertTrue(tag.empty)
125 tag = self.connection.tag(name=
"dfasdf")
126 self.assertTrue(tag ==
None)
129 self.connection.close_session()
134 connection_data = {
"db_alias" :
"orapro",
"host" :
"oracle",
"schema" :
"cms_conditions",
"secrets" : secrets_file}
138 test_list =
list(range(0, 10))
140 self.assertTrue(json_list_object !=
None)
141 self.assertEqual(json_list_object.data(), test_list)
142 for n
in range(0, len(test_list)):
143 self.assertEqual(json_list_object.get(n).
data(), test_list[n])
146 test_dict = {
"key1" :
"value1",
"key2" :
"value2",
"key3" :
"value3"}
148 self.assertTrue(json_dict_object !=
None)
149 self.assertEqual(json_dict_object.data(), test_dict)
150 for key
in test_dict:
151 self.assertEqual(json_dict_object.get(key).
data(), test_dict[key])
154 structure = {
"key1" : [{
"a" : 1,
"b" : 3}, {
"a" : 4,
"b" : 8}],
"key2" : [
"header1",
"header2",
"header3"]}
156 self.assertEqual(json_structure_object.get(
"key1").
data(), structure[
"key1"])
157 self.assertEqual(json_structure_object.get(
"key2").
data(), structure[
"key2"])
160 structure = {
"key1" : [{
"a" : 1,
"b" : 3}, {
"a" : 4,
"b" : 8}],
"key2" : [
"header1",
"header2",
"header3"]}
162 new_structure.add_key([],
"key1")
163 new_structure.get(
"key1").add_child({
"a" : 1,
"b" : 3})
164 new_structure.get(
"key1").add_child({
"a" : 4,
"b" : 8})
165 new_structure.add_key([],
"key2")
166 new_structure.get(
"key2").add_child(
"header1")
167 new_structure.get(
"key2").add_child(
"header2")
168 new_structure.get(
"key2").add_child(
"header3")
169 self.assertEqual(new_structure.data(), structure)
172 test_list =
list(range(0, 10))
173 test_dict = {
"key1" :
"value1",
"key2" :
"value2",
"key3" :
"value3"}
176 self.assertEqual(json_list_object.__class__.__name__,
"json_list")
177 self.assertEqual(json_dict_object.__class__.__name__,
"json_dict")
180 all_tags = self.connection.tag().
all(10)
181 self.assertEqual(all_tags.__class__.__name__,
"json_list")
184 tag_name =
"EBAlignment_measured_v01_express"
185 tag = self.connection.tag(name=tag_name)
186 self.assertTrue(tag !=
None)
187 parent_gts = tag.parent_global_tags()
188 self.assertEqual(parent_gts.__class__.__name__,
"json_list")
191 payload_hash =
"00172cd62d8abae41915978d815ae62cc08ad8b9"
192 payload = self.connection.payload(hash=payload_hash)
193 self.assertTrue(payload !=
None)
194 parent_tags = payload.parent_tags()
195 self.assertEqual(parent_tags.__class__.__name__,
"json_list")
198 global_tag_name =
"74X_dataRun1_HLT_frozen_v2"
199 global_tag = self.connection.global_tag(name=global_tag_name)
200 self.assertTrue(global_tag !=
None)
201 iovs = global_tag.iovs(valid=
True)
204 global_tag_name =
"74X_dataRun1_HLT_frozen_v2"
205 global_tag = self.connection.global_tag(name=global_tag_name)
206 self.assertTrue(global_tag !=
None)
207 iovs = global_tag.iovs(valid=
True)
208 self.assertEqual(iovs.__class__.__name__,
"json_list")
209 snapshot_time = global_tag.snapshot_time
211 insertion_time_as_datetime = datetime.datetime.strptime(iov.insertion_time,
'%Y-%m-%d %H:%M:%S,%f')
212 self.assertTrue(insertion_time_as_datetime < snapshot_time)
215 self.connection.close_session()
220 connection_data = {
"db_alias" :
"orapro",
"host" :
"oracle",
"schema" :
"cms_conditions",
"secrets" : secrets_file}
224 tags = self.connection.tag().
all(10)
226 self.assertEqual(list_of_dicts.__class__.__name__,
"json_list")
228 self.assertEqual(tag.__class__.__name__.lower(),
"tag")
231 models_to_test = map(self.connection.model, [
"global_tag",
"tag",
"payload",
"iov"])
232 for model
in models_to_test:
233 model_name = self.connection.class_name_to_column(model).lower()
237 self.assertTrue(dicts !=
None)
238 for obj
in orm_objects:
239 self.assertEqual(self.connection.class_name_to_column(obj.__class__).lower(), model_name)
240 headers = model.headers
241 for header
in headers:
243 test = getattr(obj, header)
246 print(
"'%s' doesn't exist." % header)
247 header_exists =
False
248 self.assertTrue(header_exists)
251 self.connection.close_session()
256 connection_data = {
"db_alias" :
"orapro",
"host" :
"oracle",
"schema" :
"cms_conditions",
"secrets" : secrets_file}
260 connection_data = {
"db_alias" :
"orapro",
"host" :
"oracle",
"schema" :
"cms_conditions",
"secrets" : secrets_file}
262 self.assertTrue(connection !=
None)
265 self.connection.close_session()
273 connection_data = {
"db_alias" :
"orapro",
"host" :
"oracle",
"schema" :
"cms_conditions",
"secrets" : secrets_file}
275 self.assertTrue(connection !=
None)
277 close_session_result = connection.close_session()
278 self.assertEqual(close_session_result,
True)
280 """def test_oradev_connect(self):
281 connection_data = {"db_alias" : "oradev", "host" : "oracle", "schema" : "", "secrets" : secrets_file_1}
282 connection = querying.connect(connection_data)
283 self.assertTrue(connection != None)
284 # can cause failure if the database is down
285 close_session_result = connection.close_session()
286 self.assertEqual(close_session_result, True)"""
289 for alias
in [
"pro",
"arc",
"int",
"dev"]:
290 connection_data = {
"db_alias" : alias,
"host" :
"frontier",
"schema" :
"cms_conditions"}
293 close_session_result = connection.close_session()
294 self.assertEqual(close_session_result,
True)
297 connection_data = {
"db_alias" :
"pro",
"host" :
"frontier",
"schema" :
"cms_conditions"}
300 tag = connection.tag(name=
"EBAlignment_measured_v06_offline")
302 close_session_result = connection.close_session()
303 self.assertEqual(close_session_result,
True)
308 class script_tests(unittest.TestCase):
311 self.
connection_data = {
"db_alias" :
"orapro",
"host" :
"oracle",
"schema" :
"cms_conditions",
"secrets" : secrets_file}
316 def script(self_instance, connection):
317 tag = self.connection.tag(name=
"EBAlignment_hlt")
318 valid_iovs = tag.iovs()
321 data = api_obj.run_script(script(), output=
False)
323 self.assertEqual(data.get(0).
data().payload_hash,
"1480c559bbbdacfec514c3cbcf2eb978403efd74")
327 @data_formats.objects_to_dicts
328 def script(self_instance, connection):
329 tag = self.connection.tag(name=
"EBAlignment_hlt")
330 valid_iovs = tag.iovs()
333 data = api_obj.run_script(script(), output=
False)
335 self.assertEqual(data.get(0,
"payload_hash").
data(),
"1480c559bbbdacfec514c3cbcf2eb978403efd74")
338 self.connection.close_session()
std::string print(const Track &, edm::Verbosity=edm::Concise)
Track print utility.
def test_get_parent_tags_payload
def tests_all_tags_empty_tag
def test_validity_of_iovs_global_tag
def test_get_empty_payload
def test_get_parent_global_tags
def test_frontier_connect
def test_search_everything
def test_type_parent_global_tags
def test_type_iovs_of_global_tag
def test_factory_empty_result
def tests_all_tags_non_empty_tag
def test_script_with_decorator
def test_type_parent_tags
def tests_all_global_tags_empty_gt
def test_factory_single_result
def test_factory_no_result
char data[epos_bytes_allocation]
def test_factory_multiple_results
def test_check_connection
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run