4 Joshua Dawes - CERN, CMS - The University of Manchester
6 This module holds classes to help with uploading conditions to the drop box web service, which also uses CondDBFW to read and write data.
9 from __future__
import print_function
10 from __future__
import absolute_import
15 from datetime
import datetime
16 from urllib
import urlencode
22 from .url_query
import url_query
25 from .
import data_sources
26 from .
import querying
28 from .utils
import to_timestamp, to_datetime, friendly_since
32 Takes a since and, if it is Run-based expressed as Lumi-based, returns the run number.
33 Otherwise, returns the since without transformations.
35 if time_type ==
"Run" and (since & 0xffffff) == 0:
41 def log(file_handle, message):
43 Very simple logging function, used by output class.
45 file_handle.write(
"[%s] %s\n" % (
to_timestamp(datetime.now()), message))
49 Find a new client-side log file name.
51 Note: This cannot use the upload session token since logs need to be written before this is opened.
52 However, this can be changed so that the filename that uses the token is written to once
57 log_files = [file
for file
in os.listdir(os.path.join(os.getcwd(),
"upload_logs"))
if "upload_log" in file]
58 new_id = len(log_files)+1
69 Used to control output to the console and to the client-side log.
72 def __init__(self, log_handle=None, verbose=False, debug=False):
78 self.
labels = [
"INFO",
"ERROR",
"WARNING",
"VERBOSE",
"DEBUG"]
80 def write(self, message="", level=INFO):
82 Write to the console and to the log file held by self.
84 message =
"[%s] %s: %s"%(datetime.now(), self.
labels[level], message)
86 if level == output.DEBUG
and self.
_debug:
88 elif level < output.DEBUG:
91 if level == output.DEBUG:
93 elif level <= output.ERROR:
100 Upload session controller - creates, tracks, and deletes upload sessions on the server.
103 def __init__(self, metadata_source=None, debug=False, verbose=False, testing=False, server="https://cms-conddb-dev.cern.ch/cmsDbCondUpload/
", **kwargs):
106 Given an SQLite file and a Metadata sources, reads into a dictionary read for it to be encoded and uploaded.
108 Note: kwargs is used to capture stray arguments - arguments that do not match keywords will not be used.
110 Note: default value of service_url should be changed for production.
130 if metadata_source ==
None:
132 self.
exit_upload(
"A source of metadata must be given so CondDBFW knows how to upload conditions.")
147 self.
exit_upload(
"No destination database was given.")
152 If we have neither an sqlite file nor the command line data
154 self.
exit_upload(
"You must give either an SQLite database file, or the necessary command line arguments to replace one."\
155 +
"\nSee --help for command line argument information.")
158 We've been given an SQLite file, so try to extract Conditions Metadata based on that and the Upload Metadata in metadata_source
159 We now extract the Tag and IOV data from SQLite. It is added to the dictionary for sending over HTTPs later.
168 result_dictionary = {}
174 self.
_outputter.
write(
"Getting Tag and IOVs from SQLite database.", output.VERBOSE)
177 tag = sqlite_con.tag(name=self.
input_tag)
179 self.
exit_upload(
"The source Tag '%s' you gave was not found in the SQLite file." % self.
input_tag)
180 tag = tag.as_dicts(convert_timestamps=
True)
183 iovs = sqlite_con.iov(tag_name=self.
input_tag)
186 iovs = iovs.as_dicts(convert_timestamps=
True)
187 iovs = [iovs]
if not isinstance(iovs, list)
else iovs
190 Finally, get the list of all Payload hashes of IOVs,
191 then compute the list of hashes for which there is no Payload for
192 this is used later to decide if we can continue the upload if the Payload was not found on the server.
194 iovs_for_hashes = sqlite_con.iov(tag_name=self.
input_tag)
196 hashes_of_iovs = iovs_for_hashes.get_members(
"payload_hash").
data()
198 hashes_of_iovs = [iovs_for_hashes.payload_hash]
202 sqlite_con.close_session()
204 elif metadata_source.data().get(
"hashToUse") !=
None:
206 Assume we've been given metadata in the command line (since no sqlite file is there, and we have command line arguments).
207 We now use Tag and IOV data from command line. It is added to the dictionary for sending over HTTPs later.
211 result_dictionary = {}
219 "insertion_time" : now}]
227 if tag[
"time_type"] ==
"Run":
228 for (i, iov)
in enumerate(iovs):
229 iovs[i][
"since"] = iovs[i][
"since"] << 32
231 result_dictionary = {
"inputTagData" : tag,
"iovs" : iovs}
235 result_dictionary.update(metadata_source.data())
241 if result_dictionary.get(
"since") ==
None:
242 result_dictionary[
"since"] = sorted(iovs, key=
lambda iov : iov[
"since"])[0][
"since"]
243 elif self.
data_to_send[
"inputTagData"][
"time_type"] ==
"Run":
251 response = request.send()
256 Checks the decoded response of an HTTP request to the server.
257 If it is a dictionary, and one of its keys is "error", the server returned an error
260 if isinstance(response_dict, dict)
and "error" in response_dict.keys():
261 splitter_string =
"\n%s\n" % (
"-"*50)
268 self.
_outputter.
write(
"\nTRACEBACK (since --debug is set):%s" % splitter_string, output.DEBUG)
269 if response_dict.get(
"traceback") !=
None:
272 self.
_outputter.
write(
"No traceback was returned from the server.", output.DEBUG)
274 self.
_outputter.
write(
"Use the --debug option to show the traceback of this error.", output.INFO)
285 elif not(
"error" in response_dict.keys())
and "log_data" in response_dict.keys():
287 self.
_log_data = response_dict[
"log_data"][2:-1]
292 Given the log data from the server, write it to a client-side log file.
296 if not(os.path.exists(os.path.join(os.getcwd(),
"server_side_logs/"))):
297 os.makedirs(
"server_side_logs/")
300 server_log_file_name =
None
307 handle = open(server_log_file_name,
"w")
308 handle.write(base64.b64decode(log_data))
310 except Exception
as e:
312 server_log_file_name =
None
317 if self.
_SERVICE_URL.startswith(
"https://cms-conddb-dev.cern.ch/cmsDbCondUpload"):
318 logUrl =
"https://cms-conddb.cern.ch/cmsDbBrowser/logs/show_cond_uploader_log/Prep/%s"%self.
upload_session_id
320 logUrl =
"https://cms-conddb.cern.ch/cmsDbBrowser/logs/show_cond_uploader_log/Prod/%s"%self.
upload_session_id
322 print(
"[%s] INFO: Server log found at %s." % (datetime.now(), logUrl))
323 if server_log_file_name !=
None:
324 print(
"[%s] INFO: Local copy of server log file at '%s'." % (datetime.now(), server_log_file_name))
326 print(
"No server log file could be written locally.")
332 Used to exit the script - which only happens if an error has occurred.
333 If the --testing flag was passed by the user, we should return False for failure, and not exit
346 print(
"\n%s\n" % message)
354 Calls methods that send HTTP requests to the upload server.
358 Open an upload session on the server - this also gives us a tag lock on the tag being uploaded, if it is available.
375 return self.
exit_upload(
"Ran out of retries opening an upload session, where the limit was 3.")
376 except Exception
as e:
381 self.
_outputter.
write(
"Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
383 self.
_outputter.
write(
"Something went wrong that isn't handled by code - the traceback is above.")
388 Only if a value is given for --fcsr-filter, run FCSR filtering on the IOVs locally.
393 Filtering the IOVs before we send them by getting the First Conditions Safe Run
394 from the server based on the target synchronization type.
396 if self.
data_to_send[
"inputTagData"][
"time_type"] !=
"Time":
403 return self.
exit_upload(
"Ran out of retries trying to filter IOVs by FCSR from server, where the limit was 3.")
404 except Exception
as e:
409 self.
_outputter.
write(
"Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
411 self.
_outputter.
write(
"Something went wrong that isn't handled by code - the traceback is above.")
415 self.
_outputter.
write(
"The Tag you're uploading is time-based, so we can't do any FCSR-based validation. FCSR filtering is being skipped.")
418 Check for the hashes that the server doesn't have - only send these (but in the next step).
432 all_hashes =
map(
lambda iov : iov[
"payload_hash"], self.
data_to_send[
"iovs"])
433 hashes_not_found = check_hashes_response[
"hashes_not_found"]
434 hashes_found = list(set(all_hashes) - set(hashes_not_found))
435 self.
_outputter.
write(
"Checking for IOVs that have no Payload locally or on the server.", output.VERBOSE)
437 for hash_not_found
in hashes_not_found:
439 return self.
exit_upload(
"IOV with hash '%s' does not have a Payload locally or on the server. Cannot continue." % hash_not_found)
441 for hash_found
in hashes_found:
443 self.
_outputter.
write(
"Payload with hash %s on server, so can upload IOV." % hash_found, output.VERBOSE)
445 self.
_outputter.
write(
"Found %i Payloads in remote server" % len(hashes_found), output.INFO)
446 self.
_outputter.
write(
"Found %i Payloads not in remote server" % len(hashes_not_found), output.INFO)
448 self.
_outputter.
write(
"All IOVs either come with Payloads or point to a Payload already on the server.", output.VERBOSE)
452 return self.
exit_upload(
"Ran out of retries trying to check hashes of payloads to send, where the limit was 3.")
453 except Exception
as e:
458 self.
_outputter.
write(
"Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
460 self.
_outputter.
write(
"Something went wrong that isn't handled by code - the traceback is above.")
465 Send the payloads the server told us about in the previous step (returned from get_hashes_to_send)
466 exception handling is done inside this method, since it calls a method itself for each payload.
469 if self.
_testing and not(send_payloads_response):
473 Final stage - send metadata to server (since the payloads are there now)
474 if this is successful, once it finished the upload session is closed on the server and the tag lock is released.
495 return self.
exit_upload(
"Ran out of retries trying to send metadata, where the limit was 3.")
496 except Exception
as e:
501 self.
_outputter.
write(
"Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
503 self.
_outputter.
write(
"Something went wrong that isn't handled by code - the traceback is above.")
517 Open an upload session on the server, and get a unique token back that we can use to authenticate for all future requests,
518 as long as the upload session is still open.
526 body_data = base64.b64encode(json.dumps(
534 url_data = {
"database" : self.
data_to_send[
"destinationDatabase"]}
537 response = query.send()
543 Close an upload session on the server by calling its close_upload_session end-point.
544 This is done if there is an error on the client-side.
546 self.
_outputter.
write(
"An error occurred - closing the upload session on the server.")
547 url_data = {
"database" : self.
data_to_send[
"destinationDatabase"],
"upload_session_id" : upload_session_id}
549 response = query.send()
555 Execute the HTTPs request to ask the server for the FCSR.
557 Note: we do this in a separate function we so we can do the decoding check for json data with check_response.
562 "upload_session_id" : upload_session_id,
567 result = query.send()
572 Ask for the server for the FCSR based on the synchronization type of the source Tag.
573 Then, modify the IOVs (possibly remove some) based on the FCSR we received.
574 This is useful in the case that most IOVs have different payloads, and our FCSR is close to the end of the range the IOVs cover.
576 self.
_outputter.
write(
"Getting the First Condition Safe Run for the current sync type.")
579 fcsr = fcsr_data[
"fcsr"]
580 fcsr_changed = fcsr_data[
"fcsr_changed"]
581 new_sync = fcsr_data[
"new_sync"]
584 self.
_outputter.
write(
"Synchronization '%s' given was changed to '%s' to match destination Tag." % (self.
data_to_send[
"fcsr_filter"], new_sync))
586 self.
_outputter.
write(
"Synchronization '%s' gave FCSR %d for FCSR Filtering."\
590 There may be cases where this assumption is not correct (that we can reassign since if fcsr > since)
591 Only set since to fcsr from server if the fcsr is further along than the user is trying to upload to
592 Note: this applies to run, lumi and timestamp run_types.
598 if self.
data_to_send[
"fcsr_filter"].lower() ==
"offline":
599 self.
_outputter.
write(
"If you're uploading to offline, you can't upload to a since < FCSR.\nNo upload has been processed.")
603 self.
_outputter.
write(
"Final FCSR after comparison with FCSR received from server is %d."\
607 Post validation processing assuming destination since is now valid.
609 Because we don't have an sqlite database to query (everything's in a dictionary),
610 we have to go through the IOVs manually find the greatest since that's less than
611 the destination since.
613 Purpose of this algorithm: move any IOV sinces that we can use up to the fcsr without leaving a hole in the Conditions coverage
616 max_since_below_dest = self.
data_to_send[
"iovs"][0][
"since"]
619 max_since_below_dest = self.
data_to_send[
"iovs"][i][
"since"]
631 self.
data_to_send[
"iovs"][i][
"insertion_time"] = new_time
635 Get all the hashes from the dictionary of IOVs we have from the SQLite file.
637 self.
_outputter.
write(
"\tGetting list of all hashes found in SQLite database.", output.DEBUG)
638 hashes =
map(
lambda iov : iov[
"payload_hash"], self.
data_to_send[
"iovs"])
639 self.
_outputter.
write(
"Found %i local Payload(s) referenced in IOVs"%len(hashes), output.INFO)
645 Get the hashes of the payloads we want to send that the server doesn't have yet.
647 self.
_outputter.
write(
"Getting list of hashes that the server does not have Payloads for, to send to server.", output.DEBUG)
649 url_data = {
"database" : self.
data_to_send[
"destinationDatabase"],
"upload_session_id" : upload_session_id}
651 response = query.send()
656 Send a list of payloads corresponding to hashes we got from the SQLite file and filtered by asking the server.
672 self.
_outputter.
write(
"\tGetting Payloads from SQLite database based on list of hashes.")
673 payloads = con.payload(hash=hashes)
687 dicts = payloads.as_dicts()
691 for n, payload
in enumerate(dicts):
692 self.
_outputter.
write(
"\t(%d/%d) Sending payload with hash '%s'." % (n+1, len(dicts), payload[
"hash"]))
693 response = self.
send_blob(payload, upload_session_id)
707 Send the BLOB of a payload over HTTP.
708 The BLOB is put in the request body, so no additional processing has to be done on the server side, apart from decoding from base64.
711 blob_data = base64.b64encode(payload[
"data"])
713 url_data = {
"database" : self.
data_to_send[
"destinationDatabase"],
"upload_session_id" : upload_session_id}
716 for key
in payload.keys():
719 if key ==
"insertion_time":
722 url_data[key] = payload[key]
729 request_response = request.send()
730 return request_response
731 except Exception
as e:
734 self.
_outputter.
write(
"\t\t\tPayload with hash '%s' was not uploaded because the maximum number of retries was exceeded." % payload[
"hash"])
735 self.
_outputter.
write(
"Payload with hash '%s' was not uploaded because the maximum number of retries was exceeded." % payload[
"hash"])
736 return json.dumps({
"error" :
str(e),
"traceback" : traceback.format_exc()})
741 Final part of the upload process - send the Conditions metadata (Tag, IOVs - not upload metadata).
742 The server closes the session (and releases the tag lock) after processing has been completed.
749 self.
_outputter.
write(
"Sending metadata to server - see server_side_log at server_side_logs/upload_log_%s for details on metadata processing on server side."\
753 url_data = {
"database" : self.
data_to_send[
"destinationDatabase"],
"upload_session_id" : upload_session_id}
755 response = request.send()
756 self.
_outputter.
write(
"Response received - conditions upload process complete.", output.VERBOSE)
759 if __name__ ==
"__main__":
761 This code should only be executed for testing.
764 from .uploadConditions
import parse_arguments
768 This code should only be executed for testing.
769 Any uploads done by the user should be done by calling the uploadConditions.py script.
770 See https://cms-conddb-dev.cern.ch/cmsDbCondUpload for information on how to obtain the correct version.
776 upload_metadata[
"sqlite_file"] = upload_metadata.get(
"sourceDB")
779 upload_metadata_argument = {}
780 for (key, value)
in upload_metadata.items():
781 if key !=
"metadata_source":
782 upload_metadata_argument[key] = value
786 upload_controller =
uploader(**upload_metadata)
788 result = upload_controller.upload()