CMS 3D CMS Logo

List of all members | Public Member Functions | Public Attributes | Private Attributes
uploads.uploader Class Reference
Inheritance diagram for uploads.uploader:

Public Member Functions

def __init__ (self, metadata_source=None, debug=False, verbose=False, testing=False, server="https://cms-conddb-dev.cern.ch/cmsDbCondUpload/", **kwargs)
 
def check_response_for_error_key (self, response_dict, exit_if_error=True)
 
def close_upload_session (self, upload_session_id)
 
def exit_upload (self, message=None)
 
def filter_iovs_by_fcsr (self, upload_session_id)
 
def get_all_hashes (self)
 
def get_fcsr_from_server (self, upload_session_id)
 
def get_hashes_to_send (self, upload_session_id)
 
def get_tag_dictionary (self)
 
def get_upload_session_id (self)
 
def send_blob (self, payload, upload_session_id)
 
def send_metadata (self, upload_session_id)
 
def send_payloads (self, hashes, upload_session_id)
 
def upload (self)
 
def write_server_side_log (self, log_data)
 

Public Attributes

 data_to_send
 
 hashes_with_no_local_payload
 
 input_tag
 
 metadata_source
 
 server_side_log_file
 
 sqlite_file_name
 
 upload_log_file_name
 
 upload_session_id
 

Private Attributes

 _debug
 
 _handle
 
 _log_data
 
 _outputter
 
 _SERVICE_URL
 
 _testing
 
 _verbose
 

Detailed Description

Upload session controller - creates, tracks, and deletes upload sessions on the server.

Definition at line 98 of file uploads.py.

Constructor & Destructor Documentation

◆ __init__()

def uploads.uploader.__init__ (   self,
  metadata_source = None,
  debug = False,
  verbose = False,
  testing = False,
  server = "https://cms-conddb-dev.cern.ch/cmsDbCondUpload/",
**  kwargs 
)
Upload constructor:
Given an SQLite file and a Metadata sources, reads into a dictionary read for it to be encoded and uploaded.

Note: kwargs is used to capture stray arguments - arguments that do not match keywords will not be used.

Note: default value of service_url should be changed for production.

Definition at line 103 of file uploads.py.

103  def __init__(self, metadata_source=None, debug=False, verbose=False, testing=False, server="https://cms-conddb-dev.cern.ch/cmsDbCondUpload/", **kwargs): """
104  Upload constructor:
105  Given an SQLite file and a Metadata sources, reads into a dictionary read for it to be encoded and uploaded.
106 
107  Note: kwargs is used to capture stray arguments - arguments that do not match keywords will not be used.
108 
109  Note: default value of service_url should be changed for production.
110  """
111  # set private variables
112  self._debug = debug
113  self._verbose = verbose
114  self._testing = testing
115  # initialise server-side log data as empty string - will be replaced when we get a response back from the server
116  self._log_data = ""
117  self._SERVICE_URL = server
118  self.upload_session_id = None
119 
120  # set up client-side log file
121  self.upload_log_file_name = "upload_logs/upload_log_%d" % new_log_file_id()
122  self._handle = open(self.upload_log_file_name, "a")
123 
124  # set up client-side logging object
125  self._outputter = output(verbose=verbose, log_handle=self._handle, debug = self._debug)
126  self._outputter.write("Using server instance at '%s'." % self._SERVICE_URL)
127 
128  # expect a CondDBFW data_source object for metadata_source
129  if metadata_source == None:
130  # no upload metadat has been given - we cannot continue with the upload
131  self.exit_upload("A source of metadata must be given so CondDBFW knows how to upload conditions.")
132  else:
133  # set up global metadata source variable
134  self.metadata_source = metadata_source.data()
135 
136  # check for the destination tag
137  # this is required whatever type of upload we're performing
138  if self.metadata_source.get("destinationTags") == None:
139  self.exit_upload("No destination Tag was given.")
140  else:
141  if isinstance(self.metadata_source.get("destinationTags"), dict) and self.metadata_source.get("destinationTags").keys()[0] == None:
142  self.exit_upload("No destination Tag was given.")
143 
144  # make sure a destination database was given
145  if self.metadata_source.get("destinationDatabase") == None:
146  self.exit_upload("No destination database was given.")
147 
148  # get Conditions metadata
149  if self.metadata_source.get("sourceDB") == None and self.metadata_source.get("hashToUse") == None:
150  """
151  If we have neither an sqlite file nor the command line data
152  """
153  self.exit_upload("You must give either an SQLite database file, or the necessary command line arguments to replace one."\
154  + "\nSee --help for command line argument information.")
155  elif self.metadata_source.get("sourceDB") != None:
156  """
157  We've been given an SQLite file, so try to extract Conditions Metadata based on that and the Upload Metadata in metadata_source
158  We now extract the Tag and IOV data from SQLite. It is added to the dictionary for sending over HTTPs later.
159  """
160 
161  # make sure we have an input tag to look for in the source db
162  self.input_tag = metadata_source.data().get("inputTag")
163  if self.input_tag == None:
164  self.exit_upload("No input Tag name was given.")
165 
166  # set empty dictionary to contain Tag and IOV data from SQLite
167  result_dictionary = {}
168  self.sqlite_file_name = self.metadata_source["sourceDB"]
169  if not(os.path.isfile(self.sqlite_file_name)):
170  self.exit_upload("SQLite file '%s' given doesn't exist." % self.sqlite_file_name)
171  sqlite_con = querying.connect("sqlite://%s" % os.path.abspath(self.sqlite_file_name))
172 
173  self._outputter.write("Getting Tag and IOVs from SQLite database.", output.VERBOSE)
174 
175  # query for Tag, check for existence, then convert to dictionary
176  tag = sqlite_con.tag(name=self.input_tag)
177  if tag == None:
178  self.exit_upload("The source Tag '%s' you gave was not found in the SQLite file." % self.input_tag)
179  tag = tag.as_dicts(convert_timestamps=True)
180 
181  # query for IOVs, check for existence, then convert to dictionaries
182  iovs = sqlite_con.iov(tag_name=self.input_tag)
183  if iovs == None:
184  self.exit_upload("No IOVs found in the SQLite file given for Tag '%s'." % self.input_tag)
185  iovs = iovs.as_dicts(convert_timestamps=True)
186  iovs = [iovs] if not isinstance(iovs, list) else iovs
187 
188  """
189  Finally, get the list of all Payload hashes of IOVs,
190  then compute the list of hashes for which there is no Payload for
191  this is used later to decide if we can continue the upload if the Payload was not found on the server.
192  """
193  iovs_for_hashes = sqlite_con.iov(tag_name=self.input_tag)
194  if iovs_for_hashes.__class__ == data_sources.json_list:
195  hashes_of_iovs = iovs_for_hashes.get_members("payload_hash").data()
196  else:
197  hashes_of_iovs = [iovs_for_hashes.payload_hash]
198  self.hashes_with_no_local_payload = [payload_hash for payload_hash in hashes_of_iovs if sqlite_con.payload(hash=payload_hash) == None]
199 
200  # close session open on SQLite database file
201  sqlite_con.close_session()
202 
203  elif metadata_source.data().get("hashToUse") != None:
204  """
205  Assume we've been given metadata in the command line (since no sqlite file is there, and we have command line arguments).
206  We now use Tag and IOV data from command line. It is added to the dictionary for sending over HTTPs later.
207  """
208 
209  # set empty dictionary to contain Tag and IOV data from command line
210  result_dictionary = {}
211 
212  now = to_timestamp(datetime.now())
213  # tag dictionary will be taken from the server
214  # this does not require any authentication
215  tag = self.get_tag_dictionary()
216  self.check_response_for_error_key(tag)
217  iovs = [{"tag_name" : self.metadata_source["destinationTag"], "since" : self.metadata_source["since"], "payload_hash" : self.metadata_source["hashToUse"],\
218  "insertion_time" : now}]
219 
220  # hashToUse cannot be stored locally (no sqlite file is given), so register it as not found
221  self.hashes_with_no_local_payload = [self.metadata_source["hashToUse"]]
222 
223  # Note: normal optimisations will still take place - since the hash checking stage can tell if hashToUse does not exist on the server side
224 
225  # if the source Tag is run-based, convert sinces to lumi-based sinces with lumi-section = 0
226  if tag["time_type"] == "Run":
227  for (i, iov) in enumerate(iovs):
228  iovs[i]["since"] = iovs[i]["since"] << 32
229 
230  result_dictionary = {"inputTagData" : tag, "iovs" : iovs}
231 
232  # add command line arguments to dictionary
233  # remembering that metadata_source is a json_dict object
234  result_dictionary.update(metadata_source.data())
235 
236  # store in instance variable
237  self.data_to_send = result_dictionary
238 
239  # if the since doesn't exist, take the first since from the list of IOVs
240  if result_dictionary.get("since") == None:
241  result_dictionary["since"] = sorted(iovs, key=lambda iov : iov["since"])[0]["since"]
242  elif self.data_to_send["inputTagData"]["time_type"] == "Run":
243  # Tag time_type says IOVs use Runs for sinces, so we convert to Lumi-based for uniform processing
244  self.data_to_send["since"] = self.data_to_send["since"] << 32
245 
246 

Member Function Documentation

◆ check_response_for_error_key()

def uploads.uploader.check_response_for_error_key (   self,
  response_dict,
  exit_if_error = True 
)
Checks the decoded response of an HTTP request to the server.
If it is a dictionary, and one of its keys is "error", the server returned an error

Definition at line 254 of file uploads.py.

254  def check_response_for_error_key(self, response_dict, exit_if_error=True):
255  """
256  Checks the decoded response of an HTTP request to the server.
257  If it is a dictionary, and one of its keys is "error", the server returned an error
258  """
259  # if the decoded response data is a dictionary and has an error key in it, we should display an error and its traceback
260  if isinstance(response_dict, dict) and "error" in response_dict.keys():
261  splitter_string = "\n%s\n" % ("-"*50)
262  self._outputter.write("\nERROR: %s" % splitter_string, output.ERROR)
263  self._outputter.write(response_dict["error"], output.ERROR)
264 
265  # if the user has given the --debug flag, show the traceback as well
266  if self._debug:
267  # suggest to the user to email this to db upload experts
268  self._outputter.write("\nTRACEBACK (since --debug is set):%s" % splitter_string, output.DEBUG)
269  if response_dict.get("traceback") != None:
270  self._outputter.write(response_dict["traceback"], output.DEBUG)
271  else:
272  self._outputter.write("No traceback was returned from the server.", output.DEBUG)
273  else:
274  self._outputter.write("Use the --debug option to show the traceback of this error.", output.INFO)
275 
276  # write server side log to client side (if we have an error from creating an upload session, the log is in its initial state (""))
277  # if an error has occurred on the server side, a log will have been written
278  self.write_server_side_log(response_dict.get("log_data"))
279 
280  if exit_if_error:
281  if self._testing:
282  return False
283  else:
284  exit()
285  elif not("error" in response_dict.keys()) and "log_data" in response_dict.keys():
286  # store the log data, if it's there, in memory - this is used if a request times out and we don't get any log data back
287  self._log_data = response_dict["log_data"][2:-1]
288  return True
289 

References EcalMatacqAnalyzer._debug, EcalABAnalyzer._debug, uploads.output._debug, EcalLaserAnalyzer2._debug, EcalLaserAnalyzer._debug, uploads.uploader._debug, uploads.uploader._log_data, uploads.uploader._outputter, uploads.uploader._testing, beamvalidation.exit(), writeEcalDQMStatus.write, and uploads.uploader.write_server_side_log().

Referenced by uploads.uploader.exit_upload(), uploads.uploader.send_payloads(), and uploads.uploader.upload().

◆ close_upload_session()

def uploads.uploader.close_upload_session (   self,
  upload_session_id 
)
Close an upload session on the server by calling its close_upload_session end-point.
This is done if there is an error on the client-side.

Definition at line 541 of file uploads.py.

541  def close_upload_session(self, upload_session_id):
542  """
543  Close an upload session on the server by calling its close_upload_session end-point.
544  This is done if there is an error on the client-side.
545  """
546  self._outputter.write("An error occurred - closing the upload session on the server.")
547  url_data = {"database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id}
548  query = url_query(url=self._SERVICE_URL + "close_upload_session/", url_data=url_data)
549  response = query.send()
550  return response
551 

References uploads.uploader._outputter, uploads.uploader._SERVICE_URL, errors.check_response(), uploads.uploader.data_to_send, and writeEcalDQMStatus.write.

Referenced by uploads.uploader.exit_upload().

◆ exit_upload()

def uploads.uploader.exit_upload (   self,
  message = None 
)
Used to exit the script - which only happens if an error has occurred.
If the --testing flag was passed by the user, we should return False for failure, and not exit

Definition at line 330 of file uploads.py.

330  def exit_upload(self, message=None):
331  """
332  Used to exit the script - which only happens if an error has occurred.
333  If the --testing flag was passed by the user, we should return False for failure, and not exit
334  """
335  if self.upload_session_id != None:
336  # only try to close the upload session if an upload session has been obtained
337  response = self.close_upload_session(self.upload_session_id)
338  no_error = self.check_response_for_error_key(response)
339  # if no error was found in the upload session closure request,
340  # we still have to write the server side log
341  if no_error:
342  self.write_server_side_log(self._log_data)
343  # close client-side log handle
344  self._handle.close()
345  if message != None:
346  print("\n%s\n" % message)
347  if self._testing:
348  return False
349  else:
350  exit()
351 

References uploads.uploader._handle, uploads.uploader._log_data, uploads.uploader._testing, uploads.uploader.check_response_for_error_key(), uploads.uploader.close_upload_session(), beamvalidation.exit(), print(), uploads.uploader.upload_session_id, and uploads.uploader.write_server_side_log().

Referenced by uploads.uploader.filter_iovs_by_fcsr().

◆ filter_iovs_by_fcsr()

def uploads.uploader.filter_iovs_by_fcsr (   self,
  upload_session_id 
)
Ask for the server for the FCSR based on the synchronization type of the source Tag.
Then, modify the IOVs (possibly remove some) based on the FCSR we received.
This is useful in the case that most IOVs have different payloads, and our FCSR is close to the end of the range the IOVs cover.

Definition at line 570 of file uploads.py.

570  def filter_iovs_by_fcsr(self, upload_session_id):
571  """
572  Ask for the server for the FCSR based on the synchronization type of the source Tag.
573  Then, modify the IOVs (possibly remove some) based on the FCSR we received.
574  This is useful in the case that most IOVs have different payloads, and our FCSR is close to the end of the range the IOVs cover.
575  """
576  self._outputter.write("Getting the First Condition Safe Run for the current sync type.")
577 
578  fcsr_data = self.get_fcsr_from_server(upload_session_id)
579  fcsr = fcsr_data["fcsr"]
580  fcsr_changed = fcsr_data["fcsr_changed"]
581  new_sync = fcsr_data["new_sync"]
582 
583  if fcsr_changed:
584  self._outputter.write("Synchronization '%s' given was changed to '%s' to match destination Tag." % (self.data_to_send["fcsr_filter"], new_sync))
585 
586  self._outputter.write("Synchronization '%s' gave FCSR %d for FCSR Filtering."\
587  % (self.data_to_send["fcsr_filter"], friendly_since(self.data_to_send["inputTagData"]["time_type"], fcsr)))
588 
589  """
590  There may be cases where this assumption is not correct (that we can reassign since if fcsr > since)
591  Only set since to fcsr from server if the fcsr is further along than the user is trying to upload to
592  Note: this applies to run, lumi and timestamp run_types.
593  """
594 
595  # if the fcsr is above the since given by the user, we need to set the user since to the fcsr
596  if fcsr > self.data_to_send["since"]:
597  # check if we're uploading to offline sync - if so, then user since must be >= fcsr, so we should report an error
598  if self.data_to_send["fcsr_filter"].lower() == "offline":
599  self._outputter.write("If you're uploading to offline, you can't upload to a since < FCSR.\nNo upload has been processed.")
600  self.exit_upload()
601  self.data_to_send["since"] = fcsr
602 
603  self._outputter.write("Final FCSR after comparison with FCSR received from server is %d."\
604  % friendly_since(self.data_to_send["inputTagData"]["time_type"], int(self.data_to_send["since"])))
605 
606  """
607  Post validation processing assuming destination since is now valid.
608 
609  Because we don't have an sqlite database to query (everything's in a dictionary),
610  we have to go through the IOVs manually find the greatest since that's less than
611  the destination since.
612 
613  Purpose of this algorithm: move any IOV sinces that we can use up to the fcsr without leaving a hole in the Conditions coverage
614  """
615 
616  max_since_below_dest = self.data_to_send["iovs"][0]["since"]
617  for (i, iov) in enumerate(self.data_to_send["iovs"]):
618  if self.data_to_send["iovs"][i]["since"] <= self.data_to_send["since"] and self.data_to_send["iovs"][i]["since"] > max_since_below_dest:
619  max_since_below_dest = self.data_to_send["iovs"][i]["since"]
620 
621  # only select iovs that have sinces >= max_since_below_dest
622  # and then shift any IOVs left to the destination since
623  self.data_to_send["iovs"] = [iov for iov in self.data_to_send["iovs"] if iov["since"] >= max_since_below_dest]
624  for (i, iov) in enumerate(self.data_to_send["iovs"]):
625  if self.data_to_send["iovs"][i]["since"] < self.data_to_send["since"]:
626  self.data_to_send["iovs"][i]["since"] = self.data_to_send["since"]
627 
628  # modify insertion_time of iovs
629  new_time = to_timestamp(datetime.now())
630  for (i, iov) in enumerate(self.data_to_send["iovs"]):
631  self.data_to_send["iovs"][i]["insertion_time"] = new_time
632 

References uploads.uploader._outputter, uploads.uploader.data_to_send, uploads.uploader.exit_upload(), uploads.friendly_since(), uploads.uploader.get_fcsr_from_server(), createfilelist.int, conddb_time.to_timestamp(), and writeEcalDQMStatus.write.

◆ get_all_hashes()

def uploads.uploader.get_all_hashes (   self)
Get all the hashes from the dictionary of IOVs we have from the SQLite file.

Definition at line 633 of file uploads.py.

633  def get_all_hashes(self):
634  """
635  Get all the hashes from the dictionary of IOVs we have from the SQLite file.
636  """
637  self._outputter.write("\tGetting list of all hashes found in SQLite database.", output.DEBUG)
638  hashes = map(lambda iov : iov["payload_hash"], self.data_to_send["iovs"])
639  self._outputter.write("Found %i local Payload(s) referenced in IOVs"%len(hashes), output.INFO)
640  return hashes
641 

References uploads.uploader._outputter, errors.check_response(), uploads.uploader.data_to_send, genParticles_cff.map, and writeEcalDQMStatus.write.

Referenced by uploads.uploader.get_hashes_to_send().

◆ get_fcsr_from_server()

def uploads.uploader.get_fcsr_from_server (   self,
  upload_session_id 
)
Execute the HTTPs request to ask the server for the FCSR.

Note: we do this in a separate function we so we can do the decoding check for json data with check_response.

Definition at line 553 of file uploads.py.

553  def get_fcsr_from_server(self, upload_session_id):
554  """
555  Execute the HTTPs request to ask the server for the FCSR.
556 
557  Note: we do this in a separate function we so we can do the decoding check for json data with check_response.
558  """
559  # tiny amount of client-side logic here - all of the work is done on the server
560  url_data = {
561  "database" : self.data_to_send["destinationDatabase"],
562  "upload_session_id" : upload_session_id,
563  "destinationTag" : self.data_to_send["destinationTags"].keys()[0],
564  "sourceTagSync" : self.data_to_send["fcsr_filter"]
565  }
566  query = url_query(url=self._SERVICE_URL + "get_fcsr/", url_data=url_data)
567  result = query.send()
568  return result
569 

References uploads.uploader._SERVICE_URL, uploads.uploader.data_to_send, and relativeConstraints.keys.

Referenced by uploads.uploader.filter_iovs_by_fcsr().

◆ get_hashes_to_send()

def uploads.uploader.get_hashes_to_send (   self,
  upload_session_id 
)
Get the hashes of the payloads we want to send that the server doesn't have yet.

Definition at line 643 of file uploads.py.

643  def get_hashes_to_send(self, upload_session_id):
644  """
645  Get the hashes of the payloads we want to send that the server doesn't have yet.
646  """
647  self._outputter.write("Getting list of hashes that the server does not have Payloads for, to send to server.", output.DEBUG)
648  post_data = json.dumps(self.get_all_hashes())
649  url_data = {"database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id}
650  query = url_query(url=self._SERVICE_URL + "check_hashes/", url_data=url_data, body=post_data)
651  response = query.send()
652  return response
653 

References uploads.uploader._outputter, uploads.uploader._SERVICE_URL, uploads.uploader.data_to_send, uploads.uploader.get_all_hashes(), and writeEcalDQMStatus.write.

◆ get_tag_dictionary()

def uploads.uploader.get_tag_dictionary (   self)

Definition at line 248 of file uploads.py.

248  def get_tag_dictionary(self):
249  url_data = {"tag_name" : self.metadata_source["destinationTag"], "database" : self.metadata_source["destinationDatabase"]}
250  request = url_query(url=self._SERVICE_URL + "get_tag_dictionary/", url_data=url_data)
251  response = request.send()
252  return response
253 

References uploads.uploader._SERVICE_URL, and uploads.uploader.metadata_source.

◆ get_upload_session_id()

def uploads.uploader.get_upload_session_id (   self)
Open an upload session on the server, and get a unique token back that we can use to authenticate for all future requests,
as long as the upload session is still open.

Definition at line 515 of file uploads.py.

515  def get_upload_session_id(self):
516  """
517  Open an upload session on the server, and get a unique token back that we can use to authenticate for all future requests,
518  as long as the upload session is still open.
519  """
520  self._outputter.write("Getting upload session.", output.VERBOSE)
521 
522  # send password in the body so it can be encrypted over https
523  # username and password are taken from the netrc file
524  # at this point, the value in username_or_token is always a username, since
525  # this method's end result is obtaining a token.
526  body_data = base64.b64encode(json.dumps(
527  {
528  "destinationTag" : self.data_to_send["destinationTags"].keys()[0],
529  "username_or_token" : self.data_to_send["username"],
530  "password" : self.data_to_send["password"]
531  }
532  ))
533 
534  url_data = {"database" : self.data_to_send["destinationDatabase"]}
535 
536  query = url_query(url=self._SERVICE_URL + "get_upload_session/", body=body_data, url_data=url_data)
537  response = query.send()
538  return response
539 

References uploads.uploader._outputter, uploads.uploader._SERVICE_URL, errors.check_response(), uploads.uploader.data_to_send, relativeConstraints.keys, and writeEcalDQMStatus.write.

Referenced by uploads.uploader.upload().

◆ send_blob()

def uploads.uploader.send_blob (   self,
  payload,
  upload_session_id 
)
Send the BLOB of a payload over HTTP.
The BLOB is put in the request body, so no additional processing has to be done on the server side, apart from decoding from base64.

Definition at line 705 of file uploads.py.

705  def send_blob(self, payload, upload_session_id):
706  """
707  Send the BLOB of a payload over HTTP.
708  The BLOB is put in the request body, so no additional processing has to be done on the server side, apart from decoding from base64.
709  """
710  # encode the BLOB data of the Payload to make sure we don't send a character that will influence the HTTPs request
711  blob_data = base64.b64encode(payload["data"])
712 
713  url_data = {"database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id}
714 
715  # construct the data to send in the body and header of the HTTPs request
716  for key in payload.keys():
717  # skip blob
718  if key != "data":
719  if key == "insertion_time":
720  url_data[key] = to_timestamp(payload[key])
721  else:
722  url_data[key] = payload[key]
723 
724  request = url_query(url=self._SERVICE_URL + "store_payload/", url_data=url_data, body=blob_data)
725 
726  # send the request and return the response
727  # Note - the url_query module will handle retries, and will throw a NoMoreRetriesException if it runs out
728  try:
729  request_response = request.send()
730  return request_response
731  except Exception as e:
732  # make sure we don't try again - if a NoMoreRetriesException has been thrown, retries have run out
733  if isinstance(e, errors.NoMoreRetriesException):
734  self._outputter.write("\t\t\tPayload with hash '%s' was not uploaded because the maximum number of retries was exceeded." % payload["hash"])
735  self._outputter.write("Payload with hash '%s' was not uploaded because the maximum number of retries was exceeded." % payload["hash"])
736  return json.dumps({"error" : str(e), "traceback" : traceback.format_exc()})
737 

References uploads.uploader._outputter, uploads.uploader._SERVICE_URL, errors.check_response(), uploads.uploader.data_to_send, str, conddb_time.to_timestamp(), and writeEcalDQMStatus.write.

Referenced by uploads.uploader.send_payloads().

◆ send_metadata()

def uploads.uploader.send_metadata (   self,
  upload_session_id 
)
Final part of the upload process - send the Conditions metadata (Tag, IOVs - not upload metadata).
The server closes the session (and releases the tag lock) after processing has been completed.

Definition at line 739 of file uploads.py.

739  def send_metadata(self, upload_session_id):
740  """
741  Final part of the upload process - send the Conditions metadata (Tag, IOVs - not upload metadata).
742  The server closes the session (and releases the tag lock) after processing has been completed.
743  """
744 
745  # set user text if it's empty
746  if self.data_to_send["userText"] in ["", None]:
747  self.data_to_send["userText"] = "Tag '%s' uploaded from CondDBFW client." % self.data_to_send["destinationTags"].keys()[0]
748 
749  self._outputter.write("Sending metadata to server - see server_side_log at server_side_logs/upload_log_%s for details on metadata processing on server side."\
750  % self.upload_session_id, output.VERBOSE)
751 
752  # sent the HTTPs request to the server
753  url_data = {"database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id}
754  request = url_query(url=self._SERVICE_URL + "upload_metadata/", url_data=url_data, body=json.dumps(self.data_to_send))
755  response = request.send()
756  self._outputter.write("Response received - conditions upload process complete.", output.VERBOSE)
757  return response
758 

References uploads.uploader._outputter, uploads.uploader._SERVICE_URL, uploads.uploader.data_to_send, relativeConstraints.keys, data_sources.json_data_node.make(), uploadConditions.parse_arguments(), print(), uploads.uploader.upload_session_id, and writeEcalDQMStatus.write.

◆ send_payloads()

def uploads.uploader.send_payloads (   self,
  hashes,
  upload_session_id 
)
Send a list of payloads corresponding to hashes we got from the SQLite file and filtered by asking the server.

Definition at line 654 of file uploads.py.

654  def send_payloads(self, hashes, upload_session_id):
655  """
656  Send a list of payloads corresponding to hashes we got from the SQLite file and filtered by asking the server.
657  """
658  # if we have no hashes, we can't send anything
659  # but don't exit since it might mean all the Payloads were already on the server
660  if len(hashes) == 0:
661  self._outputter.write("No payloads to send - moving to IOV upload.")
662  return True
663  else:
664  self._outputter.write("Sending payloads of hashes not found:")
665  # construct connection string for local SQLite database file
666  database = ("sqlite://%s" % os.path.abspath(self.sqlite_file_name)) if isinstance(self.sqlite_file_name, str) else self.sqlite_file_name
667  # create CondDBFW connection that maps blobs - as we need to query for payload BLOBs (disabled by default in CondDBFW)
668  self._outputter.write("\tConnecting to input SQLite database.")
669  con = querying.connect(database, map_blobs=True)
670 
671  # query for the Payloads
672  self._outputter.write("\tGetting Payloads from SQLite database based on list of hashes.")
673  payloads = con.payload(hash=hashes)
674  # if we get a single Payload back, put it in a list and turn it into a json_list
675  if payloads.__class__ != data_sources.json_list:
676  payloads = data_sources.json_data_node.make([payloads])
677 
678  # close the session with the SQLite database file - we won't use it again
679  con.close_session()
680 
681  # if found some Payloads, send them
682  if payloads:
683  # Note: there is an edge case in which the SQLite file could have been queried
684  # to delete the Payloads since we queried it for IOV hashes. This may be handled in the next iteration.
685  # send http post with data blob in body, and everything else as URL parameters
686  # convert Payload to a dictionary - we can put most of this into the URL of the HTTPs request
687  dicts = payloads.as_dicts()
688  self._outputter.write("Uploading Payload BLOBs:")
689 
690  # for each payload, send the BLOB to the server
691  for n, payload in enumerate(dicts):
692  self._outputter.write("\t(%d/%d) Sending payload with hash '%s'." % (n+1, len(dicts), payload["hash"]))
693  response = self.send_blob(payload, upload_session_id)
694  # check response for errors
695  no_error = self.check_response_for_error_key(response, exit_if_error=True)
696  if not(no_error):
697  return False
698  self._outputter.write("\tPayload sent - moving to next one.")
699  self._outputter.write("All Payloads uploaded.")
700  return True
701  else:
702  return False
703 

References uploads.uploader._outputter, errors.check_response(), uploads.uploader.check_response_for_error_key(), querying.connect(), data_sources.json_data_node.make(), uploads.uploader.send_blob(), uploads.uploader.sqlite_file_name, and writeEcalDQMStatus.write.

◆ upload()

def uploads.uploader.upload (   self)
Calls methods that send HTTP requests to the upload server.
Open an upload session on the server - this also gives us a tag lock on the tag being uploaded, if it is available.

Definition at line 352 of file uploads.py.

352  def upload(self):
353  """
354  Calls methods that send HTTP requests to the upload server.
355  """
356 
357  """
358  Open an upload session on the server - this also gives us a tag lock on the tag being uploaded, if it is available.
359  """
360  try:
361 
362  # get upload session, check response for error key
363  upload_session_data = self.get_upload_session_id()
364  no_error = self.check_response_for_error_key(upload_session_data)
365 
366  # if there was an error and we're testing, return False for the testing module
367  if not(no_error) and self._testing:
368  return False
369 
370  self.upload_session_id = upload_session_data["id"]
371  self._outputter.write("Upload session obtained with token '%s'." % self.upload_session_id, output.DEBUG)
372  self.server_side_log_file = upload_session_data["log_file"]
373 
374  except errors.NoMoreRetriesException as no_more_retries:
375  return self.exit_upload("Ran out of retries opening an upload session, where the limit was 3.")
376  except Exception as e:
377  # something went wrong that we have no specific exception for, so just exit and output the traceback if --debug is set.
378  self._outputter.write(traceback.format_exc(), output.ERROR)
379 
380  if not(self._verbose):
381  self._outputter.write("Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
382  else:
383  self._outputter.write("Something went wrong that isn't handled by code - the traceback is above.")
384 
385  return self.exit_upload()
386 
387  """
388  Only if a value is given for --fcsr-filter, run FCSR filtering on the IOVs locally.
389  """
390  if self.data_to_send["fcsr_filter"] != None:
391  """
392  FCSR Filtering:
393  Filtering the IOVs before we send them by getting the First Conditions Safe Run
394  from the server based on the target synchronization type.
395  """
396  if self.data_to_send["inputTagData"]["time_type"] != "Time":
397  # if we have a time-based tag, we can't do FCSR validation - this is also the case on the server side
398  try:
399  self.filter_iovs_by_fcsr(self.upload_session_id)
400  # this function does not return a value, since it just operates on data - so no point checking for an error key
401  # the error key check is done inside the function on the response from the server
402  except errors.NoMoreRetriesException as no_more_retries:
403  return self.exit_upload("Ran out of retries trying to filter IOVs by FCSR from server, where the limit was 3.")
404  except Exception as e:
405  # something went wrong that we have no specific exception for, so just exit and output the traceback if --debug is set.
406  self._outputter.write(traceback.format_exc(), output.ERROR)
407 
408  if not(self._verbose):
409  self._outputter.write("Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
410  else:
411  self._outputter.write("Something went wrong that isn't handled by code - the traceback is above.")
412 
413  return self.exit_upload()
414  else:
415  self._outputter.write("The Tag you're uploading is time-based, so we can't do any FCSR-based validation. FCSR filtering is being skipped.")
416 
417  """
418  Check for the hashes that the server doesn't have - only send these (but in the next step).
419  """
420  try:
421 
422  check_hashes_response = self.get_hashes_to_send(self.upload_session_id)
423  # check for an error key in the response
424  no_error = self.check_response_for_error_key(check_hashes_response)
425 
426  # if there was an error and we're testing, return False for the testing module
427  if not(no_error) and self._testing:
428  return False
429 
430  # finally, check hashes_not_found with hashes not found locally - if there is an intersection, we stop the upload
431  # because if a hash is not found and is not on the server, there is no data to upload
432  all_hashes = map(lambda iov : iov["payload_hash"], self.data_to_send["iovs"])
433  hashes_not_found = check_hashes_response["hashes_not_found"]
434  hashes_found = list(set(all_hashes) - set(hashes_not_found))
435  self._outputter.write("Checking for IOVs that have no Payload locally or on the server.", output.VERBOSE)
436  # check if any hashes not found on the server is used in the local SQLite database
437  for hash_not_found in hashes_not_found:
438  if hash_not_found in self.hashes_with_no_local_payload:
439  return self.exit_upload("IOV with hash '%s' does not have a Payload locally or on the server. Cannot continue." % hash_not_found)
440 
441  for hash_found in hashes_found:
442  if hash_found in self.hashes_with_no_local_payload:
443  self._outputter.write("Payload with hash %s on server, so can upload IOV." % hash_found, output.VERBOSE)
444 
445  self._outputter.write("Found %i Payloads in remote server" % len(hashes_found), output.INFO)
446  self._outputter.write("Found %i Payloads not in remote server" % len(hashes_not_found), output.INFO)
447 
448  self._outputter.write("All IOVs either come with Payloads or point to a Payload already on the server.", output.VERBOSE)
449 
450  except errors.NoMoreRetriesException as no_more_retries:
451  # for now, just write the log if we get a NoMoreRetriesException
452  return self.exit_upload("Ran out of retries trying to check hashes of payloads to send, where the limit was 3.")
453  except Exception as e:
454  # something went wrong that we have no specific exception for, so just exit and output the traceback if --debug is set.
455  self._outputter.write(traceback.format_exc(), output.ERROR)
456 
457  if not(self._verbose):
458  self._outputter.write("Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
459  else:
460  self._outputter.write("Something went wrong that isn't handled by code - the traceback is above.")
461 
462  return self.exit_upload()
463 
464  """
465  Send the payloads the server told us about in the previous step (returned from get_hashes_to_send)
466  exception handling is done inside this method, since it calls a method itself for each payload.
467  """
468  send_payloads_response = self.send_payloads(check_hashes_response["hashes_not_found"], self.upload_session_id)
469  if self._testing and not(send_payloads_response):
470  return False
471 
472  """
473  Final stage - send metadata to server (since the payloads are there now)
474  if this is successful, once it finished the upload session is closed on the server and the tag lock is released.
475  """
476  try:
477 
478  # note that the response (in send_metadata_response) is already decoded from base64 by the response check decorator
479  send_metadata_response = self.send_metadata(self.upload_session_id)
480 
481  no_error = self.check_response_for_error_key(send_metadata_response)
482  if not(no_error) and self._testing:
483  return False
484 
485  try:
486  self._outputter.write(send_metadata_response["summary"], output.INFO)
487  except KeyError:
488  pass
489 
490  # we have to call this explicitly here since check_response_for_error_key only writes the log file
491  # if an error has occurred, whereas it should always be written here
492  self.write_server_side_log(self._log_data)
493 
494  except errors.NoMoreRetriesException as no_more_retries:
495  return self.exit_upload("Ran out of retries trying to send metadata, where the limit was 3.")
496  except Exception as e:
497  # something went wrong that we have no specific exception for, so just exit and output the traceback if --debug is set.
498  self._outputter.write(traceback.format_exc(), output.ERROR)
499 
500  if not(self._verbose):
501  self._outputter.write("Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
502  else:
503  self._outputter.write("Something went wrong that isn't handled by code - the traceback is above.")
504 
505  return self.exit_upload()
506 
507  # close client side log handle
508  self._handle.close()
509 
510  # if we're running the testing script, return True to say the upload has worked
511  if self._testing:
512  return True
513 

References uploads.uploader._outputter, uploads.uploader._testing, uploads.uploader.check_response_for_error_key(), uploads.uploader.get_upload_session_id(), uploads.uploader.upload_session_id, and writeEcalDQMStatus.write.

◆ write_server_side_log()

def uploads.uploader.write_server_side_log (   self,
  log_data 
)
Given the log data from the server, write it to a client-side log file.

Definition at line 290 of file uploads.py.

290  def write_server_side_log(self, log_data):
291  """
292  Given the log data from the server, write it to a client-side log file.
293  """
294  # if the server_side_log directory doesn't exist, create it
295  # without it we can't write the log when we download it from the server
296  if not(os.path.exists(os.path.join(os.getcwd(), "server_side_logs/"))):
297  os.makedirs("server_side_logs/")
298 
299  # directory exists now, write to client-side log file
300  server_log_file_name = None
301  try:
302  # if the upload session does not exist yet, don't try to write the log file
303  if self.upload_session_id == None:
304  raise Exception("No upload session")
305  # create a write handle to the file, decode the log data from base64, write and close
306  server_log_file_name = "server_side_logs/upload_log_%s" % str(self.upload_session_id)
307  handle = open(server_log_file_name, "w")
308  handle.write(base64.b64decode(log_data))
309  handle.close()
310  except Exception as e:
311  # reset log file name to None so we don't try to write it later
312  server_log_file_name = None
313  #self._outputter.write("Couldn't write the server-side log file.\nThis may be because no upload session could be opened.")
314 
315  # tell the user where the log files are
316  # in the next iteration we may just merge the log files and store one log (how it's done in the plotter module)
317  if self._SERVICE_URL.startswith("https://cms-conddb-dev.cern.ch/cmsDbCondUpload"):
318  logUrl = "https://cms-conddb.cern.ch/cmsDbBrowser/logs/show_cond_uploader_log/Prep/%s"%self.upload_session_id
319  else:
320  logUrl = "https://cms-conddb.cern.ch/cmsDbBrowser/logs/show_cond_uploader_log/Prod/%s"%self.upload_session_id
321 
322  print("[%s] INFO: Server log found at %s." % (datetime.now(), logUrl))
323  if server_log_file_name != None:
324  print("[%s] INFO: Local copy of server log file at '%s'." % (datetime.now(), server_log_file_name))
325  else:
326  print("No server log file could be written locally.")
327 
328  print("[%s] INFO: Local copy of client log file at '%s'." % (datetime.now(), self.upload_log_file_name))
329 

References uploads.uploader._SERVICE_URL, print(), str, uploads.uploader.upload_log_file_name, and uploads.uploader.upload_session_id.

Referenced by uploads.uploader.check_response_for_error_key(), and uploads.uploader.exit_upload().

Member Data Documentation

◆ _debug

uploads.uploader._debug
private

◆ _handle

uploads.uploader._handle
private

Definition at line 123 of file uploads.py.

Referenced by uploads.uploader.exit_upload().

◆ _log_data

uploads.uploader._log_data
private

◆ _outputter

uploads.uploader._outputter
private

◆ _SERVICE_URL

uploads.uploader._SERVICE_URL
private

◆ _testing

uploads.uploader._testing
private

◆ _verbose

uploads.uploader._verbose
private

◆ data_to_send

uploads.uploader.data_to_send

◆ hashes_with_no_local_payload

uploads.uploader.hashes_with_no_local_payload

Definition at line 199 of file uploads.py.

◆ input_tag

uploads.uploader.input_tag

Definition at line 163 of file uploads.py.

◆ metadata_source

uploads.uploader.metadata_source

Definition at line 135 of file uploads.py.

Referenced by uploads.uploader.get_tag_dictionary().

◆ server_side_log_file

uploads.uploader.server_side_log_file

Definition at line 372 of file uploads.py.

◆ sqlite_file_name

uploads.uploader.sqlite_file_name

Definition at line 169 of file uploads.py.

Referenced by uploads.uploader.send_payloads().

◆ upload_log_file_name

uploads.uploader.upload_log_file_name

Definition at line 122 of file uploads.py.

Referenced by uploads.uploader.write_server_side_log().

◆ upload_session_id

uploads.uploader.upload_session_id
convertSQLitetoXML_cfg.output
output
Definition: convertSQLitetoXML_cfg.py:72
data_sources.json_data_node.make
def make(data)
Definition: data_sources.py:131
querying.connect
def connect(connection_data, mode="r", map_blobs=False, secrets=None, pooling=True)
Definition: querying.py:452
relativeConstraints.keys
keys
Definition: relativeConstraints.py:89
conddb_time.to_timestamp
def to_timestamp(dt)
Definition: conddb_time.py:13
str
#define str(s)
Definition: TestProcessor.cc:53
uploads.friendly_since
def friendly_since(time_type, since)
Definition: uploads.py:30
print
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:46
Exception
createfilelist.int
int
Definition: createfilelist.py:10
writeEcalDQMStatus.write
write
Definition: writeEcalDQMStatus.py:48
errors.NoMoreRetriesException
Definition: errors.py:19
data_sources.json_list
Definition: data_sources.py:175
data
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:79
url_query
Definition: url_query.py:1
alcaDQMUpload.upload
def upload(url, args, files)
Definition: alcaDQMUpload.py:65
genParticles_cff.map
map
Definition: genParticles_cff.py:11
beamvalidation.exit
def exit(msg="")
Definition: beamvalidation.py:52
uploads.new_log_file_id
def new_log_file_id()
Definition: uploads.py:47