4 Joshua Dawes - CERN, CMS - The University of Manchester
6 This module holds classes to help with uploading conditions to the drop box web service, which also uses CondDBFW to read and write data.
13 from datetime
import datetime
14 from urllib.parse
import urlencode
20 from .url_query
import url_query
23 from .
import data_sources
24 from .
import querying
26 from .utils
import to_timestamp, to_datetime, friendly_since
30 Takes a since and, if it is Run-based expressed as Lumi-based, returns the run number.
31 Otherwise, returns the since without transformations.
33 if time_type ==
"Run" and (since & 0xffffff) == 0:
39 def log(file_handle, message):
41 Very simple logging function, used by output class.
43 file_handle.write(
"[%s] %s\n" % (
to_timestamp(datetime.utcnow()), message))
47 Find a new client-side log file name.
49 Note: This cannot use the upload session token since logs need to be written before this is opened.
50 However, this can be changed so that the filename that uses the token is written to once
55 log_files = [file
for file
in os.listdir(os.path.join(os.getcwd(),
"upload_logs"))
if "upload_log" in file]
56 new_id = len(log_files)+1
61 Used to control output to the console and to the client-side log.
64 def __init__(self, log_handle=None, verbose=False):
70 def write(self, message="", ignore_verbose=False):
72 Write to the console and to the log file held by self.
83 Upload session controller - creates, tracks, and deletes upload sessions on the server.
86 def __init__(self, metadata_source=None, debug=False, verbose=False, testing=False, server="https://cms-conddb-dev.cern.ch/cmsDbCondUpload/
", **kwargs):
89 Given an SQLite file and a Metadata sources, reads into a dictionary read for it to be encoded and uploaded.
91 Note: kwargs is used to capture stray arguments - arguments that do not match keywords will not be used.
93 Note: default value of service_url should be changed for production.
110 self._outputter.write(
"Using server instance at '%s'." % self.
_SERVICE_URL)
113 if metadata_source ==
None:
115 self.
exit_upload(
"A source of metadata must be given so CondDBFW knows how to upload conditions.")
122 if self.metadata_source.get(
"destinationTags") ==
None:
125 if type(self.metadata_source.get(
"destinationTags")) == dict
and list(self.metadata_source.get(
"destinationTags").
keys())[0] ==
None:
129 if self.metadata_source.get(
"destinationDatabase") ==
None:
130 self.
exit_upload(
"No destination database was given.")
133 if self.metadata_source.get(
"sourceDB") ==
None and self.metadata_source.get(
"hashToUse") ==
None:
135 If we have neither an sqlite file nor the command line data
137 self.
exit_upload(
"You must give either an SQLite database file, or the necessary command line arguments to replace one."\
138 +
"\nSee --help for command line argument information.")
139 elif self.metadata_source.get(
"sourceDB") !=
None:
141 We've been given an SQLite file, so try to extract Conditions Metadata based on that and the Upload Metadata in metadata_source
142 We now extract the Tag and IOV data from SQLite. It is added to the dictionary for sending over HTTPs later.
151 result_dictionary = {}
157 self._outputter.write(
"Getting Tag and IOVs from SQLite database.")
160 tag = sqlite_con.tag(name=self.
input_tag)
162 self.
exit_upload(
"The source Tag '%s' you gave was not found in the SQLite file." % self.
input_tag)
163 tag = tag.as_dicts(convert_timestamps=
True)
166 iovs = sqlite_con.iov(tag_name=self.
input_tag)
169 iovs = iovs.as_dicts(convert_timestamps=
True)
170 iovs = [iovs]
if type(iovs) != list
else iovs
173 Finally, get the list of all Payload hashes of IOVs,
174 then compute the list of hashes for which there is no Payload for
175 this is used later to decide if we can continue the upload if the Payload was not found on the server.
177 iovs_for_hashes = sqlite_con.iov(tag_name=self.
input_tag)
179 hashes_of_iovs = iovs_for_hashes.get_members(
"payload_hash").
data()
181 hashes_of_iovs = [iovs_for_hashes.payload_hash]
185 sqlite_con.close_session()
187 elif metadata_source.data().get(
"hashToUse") !=
None:
189 Assume we've been given metadata in the command line (since no sqlite file is there, and we have command line arguments).
190 We now use Tag and IOV data from command line. It is added to the dictionary for sending over HTTPs later.
194 result_dictionary = {}
202 "insertion_time" : now}]
210 if tag[
"time_type"] ==
"Run":
211 for (i, iov)
in enumerate(iovs):
212 iovs[i][
"since"] = iovs[i][
"since"] << 32
214 result_dictionary = {
"inputTagData" : tag,
"iovs" : iovs}
218 result_dictionary.update(metadata_source.data())
224 if result_dictionary.get(
"since") ==
None:
225 result_dictionary[
"since"] = sorted(iovs, key=
lambda iov : iov[
"since"])[0][
"since"]
226 elif self.
data_to_send[
"inputTagData"][
"time_type"] ==
"Run":
231 TODO - Settle on a single destination tag format.
236 if type(result_dictionary[
"destinationTags"]) == dict:
237 self._outputter.write(
"WARNING: Multiple destination tags in a single metadata source is deprecated.")
238 except Exception
as e:
239 self._outputter.write(
"ERROR: %s" %
str(e))
245 response = request.send()
250 Checks the decoded response of an HTTP request to the server.
251 If it is a dictionary, and one of its keys is "error", the server returned an error
254 if type(response_dict) == dict
and "error" in list(response_dict.keys()):
255 splitter_string =
"\n%s\n" % (
"-"*50)
256 self._outputter.write(
"\nERROR: %s" % splitter_string, ignore_verbose=
True)
257 self._outputter.write(response_dict[
"error"], ignore_verbose=
True)
262 self._outputter.write(
"\nTRACEBACK (since --debug is set):%s" % splitter_string, ignore_verbose=
True)
263 if response_dict.get(
"traceback") !=
None:
264 self._outputter.write(response_dict[
"traceback"], ignore_verbose=
True)
266 self._outputter.write(
"No traceback was returned from the server.", ignore_verbose=
True)
268 self._outputter.write(
"Use the --debug option to show the traceback of this error.", ignore_verbose=
True)
279 elif not(
"error" in list(response_dict.keys()))
and "log_data" in list(response_dict.keys()):
281 self.
_log_data = response_dict[
"log_data"]
286 Given the log data from the server, write it to a client-side log file.
290 if not(os.path.exists(os.path.join(os.getcwd(),
"server_side_logs/"))):
291 os.makedirs(
"server_side_logs/")
294 server_log_file_name =
None
301 handle = open(server_log_file_name,
"w")
302 handle.write(base64.b64decode(log_data))
304 except Exception
as e:
306 server_log_file_name =
None
311 if server_log_file_name !=
None:
312 print(
"Log file from server written to '%s'." % server_log_file_name)
314 print(
"No server log file could be written locally.")
320 Used to exit the script - which only happens if an error has occurred.
321 If the --testing flag was passed by the user, we should return False for failure, and not exit
334 print(
"\n%s\n" % message)
342 Calls methods that send HTTP requests to the upload server.
346 Open an upload session on the server - this also gives us a tag lock on the tag being uploaded, if it is available.
359 self._outputter.write(
"Upload session obtained with token '%s'." % self.
upload_session_id)
363 return self.
exit_upload(
"Ran out of retries opening an upload session, where the limit was 3.")
364 except Exception
as e:
366 self._outputter.write(traceback.format_exc(), ignore_verbose=
True)
369 self._outputter.write(
"Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
371 self._outputter.write(
"Something went wrong that isn't handled by code - the traceback is above.")
376 Only if a value is given for --fcsr-filter, run FCSR filtering on the IOVs locally.
381 Filtering the IOVs before we send them by getting the First Conditions Safe Run
382 from the server based on the target synchronization type.
384 if self.
data_to_send[
"inputTagData"][
"time_type"] !=
"Time":
391 return self.
exit_upload(
"Ran out of retries trying to filter IOVs by FCSR from server, where the limit was 3.")
392 except Exception
as e:
394 self._outputter.write(traceback.format_exc(), ignore_verbose=
True)
397 self._outputter.write(
"Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
399 self._outputter.write(
"Something went wrong that isn't handled by code - the traceback is above.")
403 self._outputter.write(
"The Tag you're uploading is time-based, so we can't do any FCSR-based validation. FCSR filtering is being skipped.")
406 Check for the hashes that the server doesn't have - only send these (but in the next step).
420 all_hashes = [iov[
"payload_hash"]
for iov
in self.
data_to_send[
"iovs"]]
421 hashes_not_found = check_hashes_response[
"hashes_not_found"]
422 hashes_found = list(set(all_hashes) - set(hashes_not_found))
423 self._outputter.write(
"Checking for IOVs that have no Payload locally or on the server.")
425 for hash_not_found
in hashes_not_found:
427 return self.
exit_upload(
"IOV with hash '%s' does not have a Payload locally or on the server. Cannot continue." % hash_not_found)
429 for hash_found
in hashes_found:
431 self._outputter.write(
"Payload with hash %s on server, so can upload IOV." % hash_found)
433 self._outputter.write(
"All IOVs either come with Payloads or point to a Payload already on the server.")
437 return self.
exit_upload(
"Ran out of retries trying to check hashes of payloads to send, where the limit was 3.")
438 except Exception
as e:
440 self._outputter.write(traceback.format_exc(), ignore_verbose=
True)
443 self._outputter.write(
"Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
445 self._outputter.write(
"Something went wrong that isn't handled by code - the traceback is above.")
450 Send the payloads the server told us about in the previous step (returned from get_hashes_to_send)
451 exception handling is done inside this method, since it calls a method itself for each payload.
454 if self.
_testing and not(send_payloads_response):
458 Final stage - send metadata to server (since the payloads are there now)
459 if this is successful, once it finished the upload session is closed on the server and the tag lock is released.
474 return self.
exit_upload(
"Ran out of retries trying to send metadata, where the limit was 3.")
475 except Exception
as e:
477 self._outputter.write(traceback.format_exc(), ignore_verbose=
True)
480 self._outputter.write(
"Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
482 self._outputter.write(
"Something went wrong that isn't handled by code - the traceback is above.")
496 Open an upload session on the server, and get a unique token back that we can use to authenticate for all future requests,
497 as long as the upload session is still open.
499 self._outputter.write(
"Getting upload session.")
505 body_data = base64.b64encode(json.dumps(
513 url_data = {
"database" : self.
data_to_send[
"destinationDatabase"]}
516 response = query.send()
522 Close an upload session on the server by calling its close_upload_session end-point.
523 This is done if there is an error on the client-side.
525 self._outputter.write(
"An error occurred - closing the upload session on the server.")
526 url_data = {
"database" : self.
data_to_send[
"destinationDatabase"],
"upload_session_id" : upload_session_id}
528 response = query.send()
534 Execute the HTTPs request to ask the server for the FCSR.
536 Note: we do this in a separate function we so we can do the decoding check for json data with check_response.
544 "upload_session_id" : upload_session_id,
547 "tier0_response" : self.data_to_send.get(
"tier0_response")
550 result = query.send()
555 Ask for the server for the FCSR based on the synchronization type of the source Tag.
556 Then, modify the IOVs (possibly remove some) based on the FCSR we received.
557 This is useful in the case that most IOVs have different payloads, and our FCSR is close to the end of the range the IOVs cover.
559 self._outputter.write(
"Getting the First Condition Safe Run for the current sync type.")
562 fcsr = fcsr_data[
"fcsr"]
563 fcsr_changed = fcsr_data[
"fcsr_changed"]
564 new_sync = fcsr_data[
"new_sync"]
567 self._outputter.write(
"Synchronization '%s' given was changed to '%s' to match destination Tag." % (self.
data_to_send[
"fcsr_filter"], new_sync))
569 self._outputter.write(
"Synchronization '%s' gave FCSR %d for FCSR Filtering."\
573 There may be cases where this assumption is not correct (that we can reassign since if fcsr > since)
574 Only set since to fcsr from server if the fcsr is further along than the user is trying to upload to
575 Note: this applies to run, lumi and timestamp run_types.
581 if self.
data_to_send[
"fcsr_filter"].lower() ==
"offline":
582 self._outputter.write(
"If you're uploading to offline, you can't upload to a since < FCSR.\nNo upload has been processed.")
586 self._outputter.write(
"Final FCSR after comparison with FCSR received from server is %d."\
590 Post validation processing assuming destination since is now valid.
592 Because we don't have an sqlite database to query (everything's in a dictionary),
593 we have to go through the IOVs manually find the greatest since that's less than
594 the destination since.
596 Purpose of this algorithm: move any IOV sinces that we can use up to the fcsr without leaving a hole in the Conditions coverage
599 max_since_below_dest = self.
data_to_send[
"iovs"][0][
"since"]
602 max_since_below_dest = self.
data_to_send[
"iovs"][i][
"since"]
614 self.
data_to_send[
"iovs"][i][
"insertion_time"] = new_time
618 Get all the hashes from the dictionary of IOVs we have from the SQLite file.
620 self._outputter.write(
"\tGetting list of all hashes found in SQLite database.")
621 hashes = [iov[
"payload_hash"]
for iov
in self.
data_to_send[
"iovs"]]
627 Get the hashes of the payloads we want to send that the server doesn't have yet.
629 self._outputter.write(
"Getting list of hashes that the server does not have Payloads for, to send to server.")
631 url_data = {
"database" : self.
data_to_send[
"destinationDatabase"],
"upload_session_id" : upload_session_id}
633 response = query.send()
638 Send a list of payloads corresponding to hashes we got from the SQLite file and filtered by asking the server.
643 self._outputter.write(
"No hashes to send - moving to metadata upload.")
646 self._outputter.write(
"Sending payloads of hashes not found:")
650 self._outputter.write(
"\tConnecting to input SQLite database.")
654 self._outputter.write(
"\tGetting Payloads from SQLite database based on list of hashes.")
655 byte_hashes = [bytes(h,
'utf-8')
for h
in hashes]
656 payloads = con.payload(hash=byte_hashes)
670 dicts = payloads.as_dicts()
671 self._outputter.write(
"Uploading Payload BLOBs:")
674 for n, payload
in enumerate(dicts):
675 self._outputter.write(
"\t(%d/%d) Sending payload with hash '%s'." % (n+1, len(dicts), payload[
"hash"]))
676 response = self.
send_blob(payload, upload_session_id)
681 self._outputter.write(
"\tPayload sent - moving to next one.")
682 self._outputter.write(
"All Payloads uploaded.")
690 Send the BLOB of a payload over HTTP.
691 The BLOB is put in the request body, so no additional processing has to be done on the server side, apart from decoding from base64.
694 blob_data = base64.b64encode(payload[
"data"])
696 url_data = {
"database" : self.
data_to_send[
"destinationDatabase"],
"upload_session_id" : upload_session_id}
699 for key
in list(payload.keys()):
702 if key ==
"insertion_time":
705 url_data[key] = payload[key]
712 request_response = request.send()
713 return request_response
714 except Exception
as e:
717 self._outputter.write(
"\t\t\tPayload with hash '%s' was not uploaded because the maximum number of retries was exceeded." % payload[
"hash"])
718 self._outputter.write(
"Payload with hash '%s' was not uploaded because the maximum number of retries was exceeded." % payload[
"hash"])
719 return json.dumps({
"error" :
str(e),
"traceback" : traceback.format_exc()})
724 Final part of the upload process - send the Conditions metadata (Tag, IOVs - not upload metadata).
725 The server closes the session (and releases the tag lock) after processing has been completed.
732 self._outputter.write(
"Sending metadata to server - see server_side_log at server_side_logs/upload_log_%s for details on metadata processing on server side."\
736 url_data = {
"database" : self.
data_to_send[
"destinationDatabase"],
"upload_session_id" : upload_session_id,
"tier0_response" : self.data_to_send.get(
"tier0_response")}
738 response = request.send()
739 self._outputter.write(
"Response received - conditions upload process complete.")
742 if __name__ ==
"__main__":
744 This code should only be executed for testing.
747 from .uploadConditions
import parse_arguments
751 This code should only be executed for testing.
752 Any uploads done by the user should be done by calling the uploadConditions.py script.
753 See https://cms-conddb-dev.cern.ch/cmsDbCondUpload for information on how to obtain the correct version.
759 upload_metadata[
"sqlite_file"] = upload_metadata.get(
"sourceDB")
762 upload_metadata_argument = {}
763 for (key, value)
in list(upload_metadata.items()):
764 if key !=
"metadata_source":
765 upload_metadata_argument[key] = value
771 result = upload_controller.upload()
hashes_with_no_local_payload
def get_upload_session_id
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
def check_response_for_error_key
def write_server_side_log
char data[epos_bytes_allocation]