4 Joshua Dawes - CERN, CMS - The University of Manchester
6 This module holds classes to help with uploading conditions to the drop box web service, which also uses CondDBFW to read and write data.
9 from __future__
import print_function
10 from __future__
import absolute_import
15 from datetime
import datetime
16 from urllib
import urlencode
22 from .url_query
import url_query
25 from .
import data_sources
26 from .
import querying
28 from .utils
import to_timestamp, to_datetime, friendly_since
32 Takes a since and, if it is Run-based expressed as Lumi-based, returns the run number.
33 Otherwise, returns the since without transformations.
35 if time_type ==
"Run" and (since & 0xffffff) == 0:
41 def log(file_handle, message):
43 Very simple logging function, used by output class.
45 file_handle.write(
"[%s] %s\n" % (
to_timestamp(datetime.now()), message))
49 Find a new client-side log file name.
51 Note: This cannot use the upload session token since logs need to be written before this is opened.
52 However, this can be changed so that the filename that uses the token is written to once
57 log_files = [file
for file
in os.listdir(os.path.join(os.getcwd(),
"upload_logs"))
if "upload_log" in file]
58 new_id = len(log_files)+1
63 Used to control output to the console and to the client-side log.
66 def __init__(self, log_handle=None, verbose=False):
72 def write(self, message="", ignore_verbose=False):
74 Write to the console and to the log file held by self.
85 Upload session controller - creates, tracks, and deletes upload sessions on the server.
88 def __init__(self, metadata_source=None, debug=False, verbose=False, testing=False, server="https://cms-conddb-dev.cern.ch/cmsDbCondUpload/
", **kwargs):
91 Given an SQLite file and a Metadata sources, reads into a dictionary read for it to be encoded and uploaded.
93 Note: kwargs is used to capture stray arguments - arguments that do not match keywords will not be used.
95 Note: default value of service_url should be changed for production.
115 if metadata_source ==
None:
117 self.
exit_upload(
"A source of metadata must be given so CondDBFW knows how to upload conditions.")
132 self.
exit_upload(
"No destination database was given.")
137 If we have neither an sqlite file nor the command line data
139 self.
exit_upload(
"You must give either an SQLite database file, or the necessary command line arguments to replace one."\
140 +
"\nSee --help for command line argument information.")
143 We've been given an SQLite file, so try to extract Conditions Metadata based on that and the Upload Metadata in metadata_source
144 We now extract the Tag and IOV data from SQLite. It is added to the dictionary for sending over HTTPs later.
153 result_dictionary = {}
162 tag = sqlite_con.tag(name=self.
input_tag)
164 self.
exit_upload(
"The source Tag '%s' you gave was not found in the SQLite file." % self.
input_tag)
165 tag = tag.as_dicts(convert_timestamps=
True)
168 iovs = sqlite_con.iov(tag_name=self.
input_tag)
171 iovs = iovs.as_dicts(convert_timestamps=
True)
172 iovs = [iovs]
if not isinstance(iovs, list)
else iovs
175 Finally, get the list of all Payload hashes of IOVs,
176 then compute the list of hashes for which there is no Payload for
177 this is used later to decide if we can continue the upload if the Payload was not found on the server.
179 iovs_for_hashes = sqlite_con.iov(tag_name=self.
input_tag)
181 hashes_of_iovs = iovs_for_hashes.get_members(
"payload_hash").
data()
183 hashes_of_iovs = [iovs_for_hashes.payload_hash]
187 sqlite_con.close_session()
189 elif metadata_source.data().get(
"hashToUse") !=
None:
191 Assume we've been given metadata in the command line (since no sqlite file is there, and we have command line arguments).
192 We now use Tag and IOV data from command line. It is added to the dictionary for sending over HTTPs later.
196 result_dictionary = {}
204 "insertion_time" : now}]
212 if tag[
"time_type"] ==
"Run":
213 for (i, iov)
in enumerate(iovs):
214 iovs[i][
"since"] = iovs[i][
"since"] << 32
216 result_dictionary = {
"inputTagData" : tag,
"iovs" : iovs}
220 result_dictionary.update(metadata_source.data())
226 if result_dictionary.get(
"since") ==
None:
227 result_dictionary[
"since"] = sorted(iovs, key=
lambda iov : iov[
"since"])[0][
"since"]
228 elif self.
data_to_send[
"inputTagData"][
"time_type"] ==
"Run":
233 TODO - Settle on a single destination tag format.
238 if isinstance(result_dictionary[
"destinationTags"], dict):
239 self.
_outputter.
write(
"WARNING: Multiple destination tags in a single metadata source is deprecated.")
240 except Exception
as e:
247 response = request.send()
252 Checks the decoded response of an HTTP request to the server.
253 If it is a dictionary, and one of its keys is "error", the server returned an error
256 if isinstance(response_dict, dict)
and "error" in response_dict.keys():
257 splitter_string =
"\n%s\n" % (
"-"*50)
258 self.
_outputter.
write(
"\nERROR: %s" % splitter_string, ignore_verbose=
True)
264 self.
_outputter.
write(
"\nTRACEBACK (since --debug is set):%s" % splitter_string, ignore_verbose=
True)
265 if response_dict.get(
"traceback") !=
None:
268 self.
_outputter.
write(
"No traceback was returned from the server.", ignore_verbose=
True)
270 self.
_outputter.
write(
"Use the --debug option to show the traceback of this error.", ignore_verbose=
True)
281 elif not(
"error" in response_dict.keys())
and "log_data" in response_dict.keys():
283 self.
_log_data = response_dict[
"log_data"]
288 Given the log data from the server, write it to a client-side log file.
292 if not(os.path.exists(os.path.join(os.getcwd(),
"server_side_logs/"))):
293 os.makedirs(
"server_side_logs/")
296 server_log_file_name =
None
303 handle = open(server_log_file_name,
"w")
304 handle.write(base64.b64decode(log_data))
306 except Exception
as e:
308 server_log_file_name =
None
313 if server_log_file_name !=
None:
314 print(
"Log file from server written to '%s'." % server_log_file_name)
316 print(
"No server log file could be written locally.")
322 Used to exit the script - which only happens if an error has occurred.
323 If the --testing flag was passed by the user, we should return False for failure, and not exit
336 print(
"\n%s\n" % message)
344 Calls methods that send HTTP requests to the upload server.
348 Open an upload session on the server - this also gives us a tag lock on the tag being uploaded, if it is available.
365 return self.
exit_upload(
"Ran out of retries opening an upload session, where the limit was 3.")
366 except Exception
as e:
371 self.
_outputter.
write(
"Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
373 self.
_outputter.
write(
"Something went wrong that isn't handled by code - the traceback is above.")
378 Only if a value is given for --fcsr-filter, run FCSR filtering on the IOVs locally.
383 Filtering the IOVs before we send them by getting the First Conditions Safe Run
384 from the server based on the target synchronization type.
386 if self.
data_to_send[
"inputTagData"][
"time_type"] !=
"Time":
393 return self.
exit_upload(
"Ran out of retries trying to filter IOVs by FCSR from server, where the limit was 3.")
394 except Exception
as e:
399 self.
_outputter.
write(
"Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
401 self.
_outputter.
write(
"Something went wrong that isn't handled by code - the traceback is above.")
405 self.
_outputter.
write(
"The Tag you're uploading is time-based, so we can't do any FCSR-based validation. FCSR filtering is being skipped.")
408 Check for the hashes that the server doesn't have - only send these (but in the next step).
422 all_hashes =
map(
lambda iov : iov[
"payload_hash"], self.
data_to_send[
"iovs"])
423 hashes_not_found = check_hashes_response[
"hashes_not_found"]
424 hashes_found =
list(set(all_hashes) - set(hashes_not_found))
425 self.
_outputter.
write(
"Checking for IOVs that have no Payload locally or on the server.")
427 for hash_not_found
in hashes_not_found:
429 return self.
exit_upload(
"IOV with hash '%s' does not have a Payload locally or on the server. Cannot continue." % hash_not_found)
431 for hash_found
in hashes_found:
433 self.
_outputter.
write(
"Payload with hash %s on server, so can upload IOV." % hash_found)
435 self.
_outputter.
write(
"All IOVs either come with Payloads or point to a Payload already on the server.")
439 return self.
exit_upload(
"Ran out of retries trying to check hashes of payloads to send, where the limit was 3.")
440 except Exception
as e:
445 self.
_outputter.
write(
"Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
447 self.
_outputter.
write(
"Something went wrong that isn't handled by code - the traceback is above.")
452 Send the payloads the server told us about in the previous step (returned from get_hashes_to_send)
453 exception handling is done inside this method, since it calls a method itself for each payload.
456 if self.
_testing and not(send_payloads_response):
460 Final stage - send metadata to server (since the payloads are there now)
461 if this is successful, once it finished the upload session is closed on the server and the tag lock is released.
476 return self.
exit_upload(
"Ran out of retries trying to send metadata, where the limit was 3.")
477 except Exception
as e:
482 self.
_outputter.
write(
"Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
484 self.
_outputter.
write(
"Something went wrong that isn't handled by code - the traceback is above.")
498 Open an upload session on the server, and get a unique token back that we can use to authenticate for all future requests,
499 as long as the upload session is still open.
507 body_data = base64.b64encode(json.dumps(
515 url_data = {
"database" : self.
data_to_send[
"destinationDatabase"]}
518 response = query.send()
524 Close an upload session on the server by calling its close_upload_session end-point.
525 This is done if there is an error on the client-side.
527 self.
_outputter.
write(
"An error occurred - closing the upload session on the server.")
528 url_data = {
"database" : self.
data_to_send[
"destinationDatabase"],
"upload_session_id" : upload_session_id}
530 response = query.send()
536 Execute the HTTPs request to ask the server for the FCSR.
538 Note: we do this in a separate function we so we can do the decoding check for json data with check_response.
543 "upload_session_id" : upload_session_id,
548 result = query.send()
553 Ask for the server for the FCSR based on the synchronization type of the source Tag.
554 Then, modify the IOVs (possibly remove some) based on the FCSR we received.
555 This is useful in the case that most IOVs have different payloads, and our FCSR is close to the end of the range the IOVs cover.
557 self.
_outputter.
write(
"Getting the First Condition Safe Run for the current sync type.")
560 fcsr = fcsr_data[
"fcsr"]
561 fcsr_changed = fcsr_data[
"fcsr_changed"]
562 new_sync = fcsr_data[
"new_sync"]
565 self.
_outputter.
write(
"Synchronization '%s' given was changed to '%s' to match destination Tag." % (self.
data_to_send[
"fcsr_filter"], new_sync))
567 self.
_outputter.
write(
"Synchronization '%s' gave FCSR %d for FCSR Filtering."\
571 There may be cases where this assumption is not correct (that we can reassign since if fcsr > since)
572 Only set since to fcsr from server if the fcsr is further along than the user is trying to upload to
573 Note: this applies to run, lumi and timestamp run_types.
579 if self.
data_to_send[
"fcsr_filter"].lower() ==
"offline":
580 self.
_outputter.
write(
"If you're uploading to offline, you can't upload to a since < FCSR.\nNo upload has been processed.")
584 self.
_outputter.
write(
"Final FCSR after comparison with FCSR received from server is %d."\
588 Post validation processing assuming destination since is now valid.
590 Because we don't have an sqlite database to query (everything's in a dictionary),
591 we have to go through the IOVs manually find the greatest since that's less than
592 the destination since.
594 Purpose of this algorithm: move any IOV sinces that we can use up to the fcsr without leaving a hole in the Conditions coverage
597 max_since_below_dest = self.
data_to_send[
"iovs"][0][
"since"]
600 max_since_below_dest = self.
data_to_send[
"iovs"][i][
"since"]
612 self.
data_to_send[
"iovs"][i][
"insertion_time"] = new_time
616 Get all the hashes from the dictionary of IOVs we have from the SQLite file.
618 self.
_outputter.
write(
"\tGetting list of all hashes found in SQLite database.")
619 hashes =
map(
lambda iov : iov[
"payload_hash"], self.
data_to_send[
"iovs"])
625 Get the hashes of the payloads we want to send that the server doesn't have yet.
627 self.
_outputter.
write(
"Getting list of hashes that the server does not have Payloads for, to send to server.")
629 url_data = {
"database" : self.
data_to_send[
"destinationDatabase"],
"upload_session_id" : upload_session_id}
631 response = query.send()
636 Send a list of payloads corresponding to hashes we got from the SQLite file and filtered by asking the server.
641 self.
_outputter.
write(
"No hashes to send - moving to metadata upload.")
652 self.
_outputter.
write(
"\tGetting Payloads from SQLite database based on list of hashes.")
653 payloads = con.payload(hash=hashes)
667 dicts = payloads.as_dicts()
671 for n, payload
in enumerate(dicts):
672 self.
_outputter.
write(
"\t(%d/%d) Sending payload with hash '%s'." % (n+1, len(dicts), payload[
"hash"]))
673 response = self.
send_blob(payload, upload_session_id)
687 Send the BLOB of a payload over HTTP.
688 The BLOB is put in the request body, so no additional processing has to be done on the server side, apart from decoding from base64.
691 blob_data = base64.b64encode(payload[
"data"])
693 url_data = {
"database" : self.
data_to_send[
"destinationDatabase"],
"upload_session_id" : upload_session_id}
696 for key
in payload.keys():
699 if key ==
"insertion_time":
702 url_data[key] = payload[key]
709 request_response = request.send()
710 return request_response
711 except Exception
as e:
714 self.
_outputter.
write(
"\t\t\tPayload with hash '%s' was not uploaded because the maximum number of retries was exceeded." % payload[
"hash"])
715 self.
_outputter.
write(
"Payload with hash '%s' was not uploaded because the maximum number of retries was exceeded." % payload[
"hash"])
716 return json.dumps({
"error" :
str(e),
"traceback" : traceback.format_exc()})
721 Final part of the upload process - send the Conditions metadata (Tag, IOVs - not upload metadata).
722 The server closes the session (and releases the tag lock) after processing has been completed.
729 self.
_outputter.
write(
"Sending metadata to server - see server_side_log at server_side_logs/upload_log_%s for details on metadata processing on server side."\
733 url_data = {
"database" : self.
data_to_send[
"destinationDatabase"],
"upload_session_id" : upload_session_id}
735 response = request.send()
736 self.
_outputter.
write(
"Response received - conditions upload process complete.")
739 if __name__ ==
"__main__":
741 This code should only be executed for testing.
744 from .uploadConditions
import parse_arguments
748 This code should only be executed for testing.
749 Any uploads done by the user should be done by calling the uploadConditions.py script.
750 See https://cms-conddb-dev.cern.ch/cmsDbCondUpload for information on how to obtain the correct version.
756 upload_metadata[
"sqlite_file"] = upload_metadata.get(
"sourceDB")
759 upload_metadata_argument = {}
760 for (key, value)
in upload_metadata.items():
761 if key !=
"metadata_source":
762 upload_metadata_argument[key] = value
766 upload_controller =
uploader(**upload_metadata)
768 result = upload_controller.upload()