4 Joshua Dawes - CERN, CMS - The University of Manchester 6 This module holds classes to help with uploading conditions to the drop box web service, which also uses CondDBFW to read and write data. 9 from __future__
import print_function
14 from datetime
import datetime
15 from urllib
import urlencode
21 from url_query
import url_query
27 from utils
import to_timestamp, to_datetime, friendly_since
31 Takes a since and, if it is Run-based expressed as Lumi-based, returns the run number. 32 Otherwise, returns the since without transformations. 34 if time_type ==
"Run" and (since & 0xffffff) == 0:
40 def log(file_handle, message):
42 Very simple logging function, used by output class. 44 file_handle.write(
"[%s] %s\n" % (
to_timestamp(datetime.now()), message))
48 Find a new client-side log file name. 50 Note: This cannot use the upload session token since logs need to be written before this is opened. 51 However, this can be changed so that the filename that uses the token is written to once 56 log_files = [file
for file
in os.listdir(os.path.join(os.getcwd(),
"upload_logs"))
if "upload_log" in file]
57 new_id = len(log_files)+1
62 Used to control output to the console and to the client-side log. 65 def __init__(self, log_handle=None, verbose=False):
71 def write(self, message="", ignore_verbose=False):
73 Write to the console and to the log file held by self. 84 Upload session controller - creates, tracks, and deletes upload sessions on the server. 87 def __init__(self, metadata_source=None, debug=False, verbose=False, testing=False, server="https://cms-conddb-dev.cern.ch/cmsDbCondUpload/
", **kwargs): 90 Given an SQLite file and a Metadata sources, reads into a dictionary read for it to be encoded and uploaded. 92 Note: kwargs is used to capture stray arguments - arguments that do not match keywords will not be used. 94 Note: default value of service_url should be changed for production. 111 self._outputter.write(
"Using server instance at '%s'." % self.
_SERVICE_URL)
114 if metadata_source ==
None:
116 self.
exit_upload(
"A source of metadata must be given so CondDBFW knows how to upload conditions.")
123 if self.metadata_source.get(
"destinationTags") ==
None:
126 if isinstance(self.metadata_source.get(
"destinationTags"), dict)
and self.metadata_source.get(
"destinationTags").
keys()[0] ==
None:
130 if self.metadata_source.get(
"destinationDatabase") ==
None:
131 self.
exit_upload(
"No destination database was given.")
134 if self.metadata_source.get(
"sourceDB") ==
None and self.metadata_source.get(
"hashToUse") ==
None:
136 If we have neither an sqlite file nor the command line data 138 self.
exit_upload(
"You must give either an SQLite database file, or the necessary command line arguments to replace one."\
139 +
"\nSee --help for command line argument information.")
140 elif self.metadata_source.get(
"sourceDB") !=
None:
142 We've been given an SQLite file, so try to extract Conditions Metadata based on that and the Upload Metadata in metadata_source 143 We now extract the Tag and IOV data from SQLite. It is added to the dictionary for sending over HTTPs later. 152 result_dictionary = {}
158 self._outputter.write(
"Getting Tag and IOVs from SQLite database.")
161 tag = sqlite_con.tag(name=self.
input_tag)
163 self.
exit_upload(
"The source Tag '%s' you gave was not found in the SQLite file." % self.
input_tag)
164 tag = tag.as_dicts(convert_timestamps=
True)
167 iovs = sqlite_con.iov(tag_name=self.
input_tag)
170 iovs = iovs.as_dicts(convert_timestamps=
True)
171 iovs = [iovs]
if not isinstance(iovs, list)
else iovs
174 Finally, get the list of all Payload hashes of IOVs, 175 then compute the list of hashes for which there is no Payload for 176 this is used later to decide if we can continue the upload if the Payload was not found on the server. 178 iovs_for_hashes = sqlite_con.iov(tag_name=self.
input_tag)
180 hashes_of_iovs = iovs_for_hashes.get_members(
"payload_hash").
data()
182 hashes_of_iovs = [iovs_for_hashes.payload_hash]
186 sqlite_con.close_session()
188 elif metadata_source.data().
get(
"hashToUse") !=
None:
190 Assume we've been given metadata in the command line (since no sqlite file is there, and we have command line arguments). 191 We now use Tag and IOV data from command line. It is added to the dictionary for sending over HTTPs later. 195 result_dictionary = {}
203 "insertion_time" : now}]
211 if tag[
"time_type"] ==
"Run":
212 for (i, iov)
in enumerate(iovs):
213 iovs[i][
"since"] = iovs[i][
"since"] << 32
215 result_dictionary = {
"inputTagData" : tag,
"iovs" : iovs}
219 result_dictionary.update(metadata_source.data())
225 if result_dictionary.get(
"since") ==
None:
226 result_dictionary[
"since"] = sorted(iovs, key=
lambda iov : iov[
"since"])[0][
"since"]
227 elif self.
data_to_send[
"inputTagData"][
"time_type"] ==
"Run":
232 TODO - Settle on a single destination tag format. 237 if isinstance(result_dictionary[
"destinationTags"], dict):
238 self._outputter.write(
"WARNING: Multiple destination tags in a single metadata source is deprecated.")
239 except Exception
as e:
240 self._outputter.write(
"ERROR: %s" %
str(e))
246 response = request.send()
251 Checks the decoded response of an HTTP request to the server. 252 If it is a dictionary, and one of its keys is "error", the server returned an error 255 if isinstance(response_dict, dict)
and "error" in response_dict.keys():
256 splitter_string =
"\n%s\n" % (
"-"*50)
257 self._outputter.write(
"\nERROR: %s" % splitter_string, ignore_verbose=
True)
258 self._outputter.write(response_dict[
"error"], ignore_verbose=
True)
263 self._outputter.write(
"\nTRACEBACK (since --debug is set):%s" % splitter_string, ignore_verbose=
True)
264 if response_dict.get(
"traceback") !=
None:
265 self._outputter.write(response_dict[
"traceback"], ignore_verbose=
True)
267 self._outputter.write(
"No traceback was returned from the server.", ignore_verbose=
True)
269 self._outputter.write(
"Use the --debug option to show the traceback of this error.", ignore_verbose=
True)
280 elif not(
"error" in response_dict.keys())
and "log_data" in response_dict.keys():
282 self.
_log_data = response_dict[
"log_data"]
287 Given the log data from the server, write it to a client-side log file. 291 if not(os.path.exists(os.path.join(os.getcwd(),
"server_side_logs/"))):
292 os.makedirs(
"server_side_logs/")
295 server_log_file_name =
None 302 handle = open(server_log_file_name,
"w")
303 handle.write(base64.b64decode(log_data))
305 except Exception
as e:
307 server_log_file_name =
None 312 if server_log_file_name !=
None:
313 print(
"Log file from server written to '%s'." % server_log_file_name)
315 print(
"No server log file could be written locally.")
321 Used to exit the script - which only happens if an error has occurred. 322 If the --testing flag was passed by the user, we should return False for failure, and not exit 335 print(
"\n%s\n" % message)
343 Calls methods that send HTTP requests to the upload server. 347 Open an upload session on the server - this also gives us a tag lock on the tag being uploaded, if it is available. 360 self._outputter.write(
"Upload session obtained with token '%s'." % self.
upload_session_id)
364 return self.
exit_upload(
"Ran out of retries opening an upload session, where the limit was 3.")
365 except Exception
as e:
367 self._outputter.write(traceback.format_exc(), ignore_verbose=
True)
370 self._outputter.write(
"Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
372 self._outputter.write(
"Something went wrong that isn't handled by code - the traceback is above.")
377 Only if a value is given for --fcsr-filter, run FCSR filtering on the IOVs locally. 382 Filtering the IOVs before we send them by getting the First Conditions Safe Run 383 from the server based on the target synchronization type. 385 if self.
data_to_send[
"inputTagData"][
"time_type"] !=
"Time":
392 return self.
exit_upload(
"Ran out of retries trying to filter IOVs by FCSR from server, where the limit was 3.")
393 except Exception
as e:
395 self._outputter.write(traceback.format_exc(), ignore_verbose=
True)
398 self._outputter.write(
"Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
400 self._outputter.write(
"Something went wrong that isn't handled by code - the traceback is above.")
404 self._outputter.write(
"The Tag you're uploading is time-based, so we can't do any FCSR-based validation. FCSR filtering is being skipped.")
407 Check for the hashes that the server doesn't have - only send these (but in the next step). 421 all_hashes =
map(
lambda iov : iov[
"payload_hash"], self.
data_to_send[
"iovs"])
422 hashes_not_found = check_hashes_response[
"hashes_not_found"]
423 hashes_found =
list(set(all_hashes) - set(hashes_not_found))
424 self._outputter.write(
"Checking for IOVs that have no Payload locally or on the server.")
426 for hash_not_found
in hashes_not_found:
428 return self.
exit_upload(
"IOV with hash '%s' does not have a Payload locally or on the server. Cannot continue." % hash_not_found)
430 for hash_found
in hashes_found:
432 self._outputter.write(
"Payload with hash %s on server, so can upload IOV." % hash_found)
434 self._outputter.write(
"All IOVs either come with Payloads or point to a Payload already on the server.")
438 return self.
exit_upload(
"Ran out of retries trying to check hashes of payloads to send, where the limit was 3.")
439 except Exception
as e:
441 self._outputter.write(traceback.format_exc(), ignore_verbose=
True)
444 self._outputter.write(
"Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
446 self._outputter.write(
"Something went wrong that isn't handled by code - the traceback is above.")
451 Send the payloads the server told us about in the previous step (returned from get_hashes_to_send) 452 exception handling is done inside this method, since it calls a method itself for each payload. 455 if self.
_testing and not(send_payloads_response):
459 Final stage - send metadata to server (since the payloads are there now) 460 if this is successful, once it finished the upload session is closed on the server and the tag lock is released. 475 return self.
exit_upload(
"Ran out of retries trying to send metadata, where the limit was 3.")
476 except Exception
as e:
478 self._outputter.write(traceback.format_exc(), ignore_verbose=
True)
481 self._outputter.write(
"Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
483 self._outputter.write(
"Something went wrong that isn't handled by code - the traceback is above.")
497 Open an upload session on the server, and get a unique token back that we can use to authenticate for all future requests, 498 as long as the upload session is still open. 500 self._outputter.write(
"Getting upload session.")
506 body_data = base64.b64encode(json.dumps(
514 url_data = {
"database" : self.
data_to_send[
"destinationDatabase"]}
517 response = query.send()
523 Close an upload session on the server by calling its close_upload_session end-point. 524 This is done if there is an error on the client-side. 526 self._outputter.write(
"An error occurred - closing the upload session on the server.")
527 url_data = {
"database" : self.
data_to_send[
"destinationDatabase"],
"upload_session_id" : upload_session_id}
529 response = query.send()
535 Execute the HTTPs request to ask the server for the FCSR. 537 Note: we do this in a separate function we so we can do the decoding check for json data with check_response. 542 "upload_session_id" : upload_session_id,
547 result = query.send()
552 Ask for the server for the FCSR based on the synchronization type of the source Tag. 553 Then, modify the IOVs (possibly remove some) based on the FCSR we received. 554 This is useful in the case that most IOVs have different payloads, and our FCSR is close to the end of the range the IOVs cover. 556 self._outputter.write(
"Getting the First Condition Safe Run for the current sync type.")
559 fcsr = fcsr_data[
"fcsr"]
560 fcsr_changed = fcsr_data[
"fcsr_changed"]
561 new_sync = fcsr_data[
"new_sync"]
564 self._outputter.write(
"Synchronization '%s' given was changed to '%s' to match destination Tag." % (self.
data_to_send[
"fcsr_filter"], new_sync))
566 self._outputter.write(
"Synchronization '%s' gave FCSR %d for FCSR Filtering."\
570 There may be cases where this assumption is not correct (that we can reassign since if fcsr > since) 571 Only set since to fcsr from server if the fcsr is further along than the user is trying to upload to 572 Note: this applies to run, lumi and timestamp run_types. 578 if self.
data_to_send[
"fcsr_filter"].lower() ==
"offline":
579 self._outputter.write(
"If you're uploading to offline, you can't upload to a since < FCSR.\nNo upload has been processed.")
583 self._outputter.write(
"Final FCSR after comparison with FCSR received from server is %d."\
587 Post validation processing assuming destination since is now valid. 589 Because we don't have an sqlite database to query (everything's in a dictionary), 590 we have to go through the IOVs manually find the greatest since that's less than 591 the destination since. 593 Purpose of this algorithm: move any IOV sinces that we can use up to the fcsr without leaving a hole in the Conditions coverage 596 max_since_below_dest = self.
data_to_send[
"iovs"][0][
"since"]
599 max_since_below_dest = self.
data_to_send[
"iovs"][i][
"since"]
611 self.
data_to_send[
"iovs"][i][
"insertion_time"] = new_time
615 Get all the hashes from the dictionary of IOVs we have from the SQLite file. 617 self._outputter.write(
"\tGetting list of all hashes found in SQLite database.")
618 hashes =
map(
lambda iov : iov[
"payload_hash"], self.
data_to_send[
"iovs"])
624 Get the hashes of the payloads we want to send that the server doesn't have yet. 626 self._outputter.write(
"Getting list of hashes that the server does not have Payloads for, to send to server.")
628 url_data = {
"database" : self.
data_to_send[
"destinationDatabase"],
"upload_session_id" : upload_session_id}
630 response = query.send()
635 Send a list of payloads corresponding to hashes we got from the SQLite file and filtered by asking the server. 640 self._outputter.write(
"No hashes to send - moving to metadata upload.")
643 self._outputter.write(
"Sending payloads of hashes not found:")
647 self._outputter.write(
"\tConnecting to input SQLite database.")
651 self._outputter.write(
"\tGetting Payloads from SQLite database based on list of hashes.")
652 payloads = con.payload(hash=hashes)
666 dicts = payloads.as_dicts()
667 self._outputter.write(
"Uploading Payload BLOBs:")
670 for n, payload
in enumerate(dicts):
671 self._outputter.write(
"\t(%d/%d) Sending payload with hash '%s'." % (n+1, len(dicts), payload[
"hash"]))
672 response = self.
send_blob(payload, upload_session_id)
677 self._outputter.write(
"\tPayload sent - moving to next one.")
678 self._outputter.write(
"All Payloads uploaded.")
686 Send the BLOB of a payload over HTTP. 687 The BLOB is put in the request body, so no additional processing has to be done on the server side, apart from decoding from base64. 690 blob_data = base64.b64encode(payload[
"data"])
692 url_data = {
"database" : self.
data_to_send[
"destinationDatabase"],
"upload_session_id" : upload_session_id}
695 for key
in payload.keys():
698 if key ==
"insertion_time":
701 url_data[key] = payload[key]
708 request_response = request.send()
709 return request_response
710 except Exception
as e:
713 self._outputter.write(
"\t\t\tPayload with hash '%s' was not uploaded because the maximum number of retries was exceeded." % payload[
"hash"])
714 self._outputter.write(
"Payload with hash '%s' was not uploaded because the maximum number of retries was exceeded." % payload[
"hash"])
715 return json.dumps({
"error" :
str(e),
"traceback" : traceback.format_exc()})
720 Final part of the upload process - send the Conditions metadata (Tag, IOVs - not upload metadata). 721 The server closes the session (and releases the tag lock) after processing has been completed. 728 self._outputter.write(
"Sending metadata to server - see server_side_log at server_side_logs/upload_log_%s for details on metadata processing on server side."\
732 url_data = {
"database" : self.
data_to_send[
"destinationDatabase"],
"upload_session_id" : upload_session_id}
734 response = request.send()
735 self._outputter.write(
"Response received - conditions upload process complete.")
738 if __name__ ==
"__main__":
740 This code should only be executed for testing. 743 from uploadConditions
import parse_arguments
747 This code should only be executed for testing. 748 Any uploads done by the user should be done by calling the uploadConditions.py script. 749 See https://cms-conddb-dev.cern.ch/cmsDbCondUpload for information on how to obtain the correct version. 755 upload_metadata[
"sqlite_file"] = upload_metadata.get(
"sourceDB")
758 upload_metadata_argument = {}
759 for (key, value)
in upload_metadata.items():
760 if key !=
"metadata_source":
761 upload_metadata_argument[key] = value
765 upload_controller =
uploader(**upload_metadata)
767 result = upload_controller.upload()
def filter_iovs_by_fcsr(self, upload_session_id)
def __init__(self, log_handle=None, verbose=False)
def get_upload_session_id(self)
hashes_with_no_local_payload
def connect(connection_data, mode="r", map_blobs=False, secrets=None, pooling=True)
def check_response_for_error_key(self, response_dict, exit_if_error=True)
def check_response(check="json")
def send_payloads(self, hashes, upload_session_id)
S & print(S &os, JobReport::InputFile const &f)
def get_fcsr_from_server(self, upload_session_id)
def __init__(self, metadata_source=None, debug=False, verbose=False, testing=False, server="https://cms-conddb-dev.cern.ch/cmsDbCondUpload/", kwargs)
def friendly_since(time_type, since)
def write_server_side_log(self, log_data)
def log(file_handle, message)
def exit_upload(self, message=None)
def close_upload_session(self, upload_session_id)
def write(self, message="", ignore_verbose=False)
def get_hashes_to_send(self, upload_session_id)
def get_tag_dictionary(self)
def send_blob(self, payload, upload_session_id)
char data[epos_bytes_allocation]
def send_metadata(self, upload_session_id)
T get(const Candidate &c)
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run