CMS 3D CMS Logo

List of all members | Public Member Functions | Public Attributes | Private Attributes
uploads.uploader Class Reference
Inheritance diagram for uploads.uploader:

Public Member Functions

def __init__ (self, metadata_source=None, debug=False, verbose=False, testing=False, server="https://cms-conddb-dev.cern.ch/cmsDbCondUpload/", **kwargs)
 
def check_response_for_error_key (self, response_dict, exit_if_error=True)
 
def close_upload_session (self, upload_session_id)
 
def exit_upload (self, message=None)
 
def filter_iovs_by_fcsr (self, upload_session_id)
 
def get_all_hashes (self)
 
def get_fcsr_from_server (self, upload_session_id)
 
def get_hashes_to_send (self, upload_session_id)
 
def get_tag_dictionary (self)
 
def get_upload_session_id (self)
 
def send_blob (self, payload, upload_session_id)
 
def send_metadata (self, upload_session_id)
 
def send_payloads (self, hashes, upload_session_id)
 
def upload (self)
 
def write_server_side_log (self, log_data)
 

Public Attributes

 data_to_send
 
 hashes_with_no_local_payload
 
 input_tag
 
 metadata_source
 
 server_side_log_file
 
 sqlite_file_name
 
 upload_log_file_name
 
 upload_session_id
 

Private Attributes

 _debug
 
 _handle
 
 _log_data
 
 _outputter
 
 _SERVICE_URL
 
 _testing
 
 _verbose
 

Detailed Description

Upload session controller - creates, tracks, and deletes upload sessions on the server.

Definition at line 83 of file uploads.py.

Constructor & Destructor Documentation

◆ __init__()

def uploads.uploader.__init__ (   self,
  metadata_source = None,
  debug = False,
  verbose = False,
  testing = False,
  server = "https://cms-conddb-dev.cern.ch/cmsDbCondUpload/",
**  kwargs 
)
Upload constructor:
Given an SQLite file and a Metadata sources, reads into a dictionary read for it to be encoded and uploaded.

Note: kwargs is used to capture stray arguments - arguments that do not match keywords will not be used.

Note: default value of service_url should be changed for production.

Definition at line 88 of file uploads.py.

88  def __init__(self, metadata_source=None, debug=False, verbose=False, testing=False, server="https://cms-conddb-dev.cern.ch/cmsDbCondUpload/", **kwargs): """
89  Upload constructor:
90  Given an SQLite file and a Metadata sources, reads into a dictionary read for it to be encoded and uploaded.
91 
92  Note: kwargs is used to capture stray arguments - arguments that do not match keywords will not be used.
93 
94  Note: default value of service_url should be changed for production.
95  """
96  # set private variables
97  self._debug = debug
98  self._verbose = verbose
99  self._testing = testing
100  # initialise server-side log data as empty string - will be replaced when we get a response back from the server
101  self._log_data = ""
102  self._SERVICE_URL = server
103  self.upload_session_id = None
104 
105  # set up client-side log file
106  self.upload_log_file_name = "upload_logs/upload_log_%d" % new_log_file_id()
107  self._handle = open(self.upload_log_file_name, "a")
108 
109  # set up client-side logging object
110  self._outputter = output(verbose=verbose, log_handle=self._handle)
111  self._outputter.write("Using server instance at '%s'." % self._SERVICE_URL)
112 
113  # expect a CondDBFW data_source object for metadata_source
114  if metadata_source == None:
115  # no upload metadat has been given - we cannot continue with the upload
116  self.exit_upload("A source of metadata must be given so CondDBFW knows how to upload conditions.")
117  else:
118  # set up global metadata source variable
119  self.metadata_source = metadata_source.data()
120 
121  # check for the destination tag
122  # this is required whatever type of upload we're performing
123  if self.metadata_source.get("destinationTags") == None:
124  self.exit_upload("No destination Tag was given.")
125  else:
126  if isinstance(self.metadata_source.get("destinationTags"), dict) and self.metadata_source.get("destinationTags").keys()[0] == None:
127  self.exit_upload("No destination Tag was given.")
128 
129  # make sure a destination database was given
130  if self.metadata_source.get("destinationDatabase") == None:
131  self.exit_upload("No destination database was given.")
132 
133  # get Conditions metadata
134  if self.metadata_source.get("sourceDB") == None and self.metadata_source.get("hashToUse") == None:
135  """
136  If we have neither an sqlite file nor the command line data
137  """
138  self.exit_upload("You must give either an SQLite database file, or the necessary command line arguments to replace one."\
139  + "\nSee --help for command line argument information.")
140  elif self.metadata_source.get("sourceDB") != None:
141  """
142  We've been given an SQLite file, so try to extract Conditions Metadata based on that and the Upload Metadata in metadata_source
143  We now extract the Tag and IOV data from SQLite. It is added to the dictionary for sending over HTTPs later.
144  """
145 
146  # make sure we have an input tag to look for in the source db
147  self.input_tag = metadata_source.data().get("inputTag")
148  if self.input_tag == None:
149  self.exit_upload("No input Tag name was given.")
150 
151  # set empty dictionary to contain Tag and IOV data from SQLite
152  result_dictionary = {}
153  self.sqlite_file_name = self.metadata_source["sourceDB"]
154  if not(os.path.isfile(self.sqlite_file_name)):
155  self.exit_upload("SQLite file '%s' given doesn't exist." % self.sqlite_file_name)
156  sqlite_con = querying.connect("sqlite://%s" % os.path.abspath(self.sqlite_file_name))
157 
158  self._outputter.write("Getting Tag and IOVs from SQLite database.")
159 
160  # query for Tag, check for existence, then convert to dictionary
161  tag = sqlite_con.tag(name=self.input_tag)
162  if tag == None:
163  self.exit_upload("The source Tag '%s' you gave was not found in the SQLite file." % self.input_tag)
164  tag = tag.as_dicts(convert_timestamps=True)
165 
166  # query for IOVs, check for existence, then convert to dictionaries
167  iovs = sqlite_con.iov(tag_name=self.input_tag)
168  if iovs == None:
169  self.exit_upload("No IOVs found in the SQLite file given for Tag '%s'." % self.input_tag)
170  iovs = iovs.as_dicts(convert_timestamps=True)
171  iovs = [iovs] if not isinstance(iovs, list) else iovs
172 
173  """
174  Finally, get the list of all Payload hashes of IOVs,
175  then compute the list of hashes for which there is no Payload for
176  this is used later to decide if we can continue the upload if the Payload was not found on the server.
177  """
178  iovs_for_hashes = sqlite_con.iov(tag_name=self.input_tag)
179  if iovs_for_hashes.__class__ == data_sources.json_list:
180  hashes_of_iovs = iovs_for_hashes.get_members("payload_hash").data()
181  else:
182  hashes_of_iovs = [iovs_for_hashes.payload_hash]
183  self.hashes_with_no_local_payload = [payload_hash for payload_hash in hashes_of_iovs if sqlite_con.payload(hash=payload_hash) == None]
184 
185  # close session open on SQLite database file
186  sqlite_con.close_session()
187 
188  elif metadata_source.data().get("hashToUse") != None:
189  """
190  Assume we've been given metadata in the command line (since no sqlite file is there, and we have command line arguments).
191  We now use Tag and IOV data from command line. It is added to the dictionary for sending over HTTPs later.
192  """
193 
194  # set empty dictionary to contain Tag and IOV data from command line
195  result_dictionary = {}
196 
197  now = to_timestamp(datetime.now())
198  # tag dictionary will be taken from the server
199  # this does not require any authentication
200  tag = self.get_tag_dictionary()
201  self.check_response_for_error_key(tag)
202  iovs = [{"tag_name" : self.metadata_source["destinationTag"], "since" : self.metadata_source["since"], "payload_hash" : self.metadata_source["hashToUse"],\
203  "insertion_time" : now}]
204 
205  # hashToUse cannot be stored locally (no sqlite file is given), so register it as not found
206  self.hashes_with_no_local_payload = [self.metadata_source["hashToUse"]]
207 
208  # Note: normal optimisations will still take place - since the hash checking stage can tell if hashToUse does not exist on the server side
209 
210  # if the source Tag is run-based, convert sinces to lumi-based sinces with lumi-section = 0
211  if tag["time_type"] == "Run":
212  for (i, iov) in enumerate(iovs):
213  iovs[i]["since"] = iovs[i]["since"] << 32
214 
215  result_dictionary = {"inputTagData" : tag, "iovs" : iovs}
216 
217  # add command line arguments to dictionary
218  # remembering that metadata_source is a json_dict object
219  result_dictionary.update(metadata_source.data())
220 
221  # store in instance variable
222  self.data_to_send = result_dictionary
223 
224  # if the since doesn't exist, take the first since from the list of IOVs
225  if result_dictionary.get("since") == None:
226  result_dictionary["since"] = sorted(iovs, key=lambda iov : iov["since"])[0]["since"]
227  elif self.data_to_send["inputTagData"]["time_type"] == "Run":
228  # Tag time_type says IOVs use Runs for sinces, so we convert to Lumi-based for uniform processing
229  self.data_to_send["since"] = self.data_to_send["since"] << 32
230 
231  """
232  TODO - Settle on a single destination tag format.
233  """
234  # look for deprecated metadata entries - give warnings
235  # Note - we only really support this format
236  try:
237  if isinstance(result_dictionary["destinationTags"], dict):
238  self._outputter.write("WARNING: Multiple destination tags in a single metadata source is deprecated.")
239  except Exception as e:
240  self._outputter.write("ERROR: %s" % str(e))
241 
242 

Member Function Documentation

◆ check_response_for_error_key()

def uploads.uploader.check_response_for_error_key (   self,
  response_dict,
  exit_if_error = True 
)
Checks the decoded response of an HTTP request to the server.
If it is a dictionary, and one of its keys is "error", the server returned an error

Definition at line 250 of file uploads.py.

250  def check_response_for_error_key(self, response_dict, exit_if_error=True):
251  """
252  Checks the decoded response of an HTTP request to the server.
253  If it is a dictionary, and one of its keys is "error", the server returned an error
254  """
255  # if the decoded response data is a dictionary and has an error key in it, we should display an error and its traceback
256  if isinstance(response_dict, dict) and "error" in response_dict.keys():
257  splitter_string = "\n%s\n" % ("-"*50)
258  self._outputter.write("\nERROR: %s" % splitter_string, ignore_verbose=True)
259  self._outputter.write(response_dict["error"], ignore_verbose=True)
260 
261  # if the user has given the --debug flag, show the traceback as well
262  if self._debug:
263  # suggest to the user to email this to db upload experts
264  self._outputter.write("\nTRACEBACK (since --debug is set):%s" % splitter_string, ignore_verbose=True)
265  if response_dict.get("traceback") != None:
266  self._outputter.write(response_dict["traceback"], ignore_verbose=True)
267  else:
268  self._outputter.write("No traceback was returned from the server.", ignore_verbose=True)
269  else:
270  self._outputter.write("Use the --debug option to show the traceback of this error.", ignore_verbose=True)
271 
272  # write server side log to client side (if we have an error from creating an upload session, the log is in its initial state (""))
273  # if an error has occurred on the server side, a log will have been written
274  self.write_server_side_log(response_dict.get("log_data"))
275 
276  if exit_if_error:
277  if self._testing:
278  return False
279  else:
280  exit()
281  elif not("error" in response_dict.keys()) and "log_data" in response_dict.keys():
282  # store the log data, if it's there, in memory - this is used if a request times out and we don't get any log data back
283  self._log_data = response_dict["log_data"]
284  return True
285 

References EcalMatacqAnalyzer._debug, EcalABAnalyzer._debug, EcalLaserAnalyzer2._debug, EcalLaserAnalyzer._debug, uploads.uploader._debug, uploads.uploader._log_data, uploads.uploader._outputter, uploads.uploader._testing, beamvalidation.exit(), writeEcalDQMStatus.write, and uploads.uploader.write_server_side_log().

Referenced by uploads.uploader.exit_upload(), uploads.uploader.send_payloads(), and uploads.uploader.upload().

◆ close_upload_session()

def uploads.uploader.close_upload_session (   self,
  upload_session_id 
)
Close an upload session on the server by calling its close_upload_session end-point.
This is done if there is an error on the client-side.

Definition at line 522 of file uploads.py.

522  def close_upload_session(self, upload_session_id):
523  """
524  Close an upload session on the server by calling its close_upload_session end-point.
525  This is done if there is an error on the client-side.
526  """
527  self._outputter.write("An error occurred - closing the upload session on the server.")
528  url_data = {"database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id}
529  query = url_query(url=self._SERVICE_URL + "close_upload_session/", url_data=url_data)
530  response = query.send()
531  return response
532 

References uploads.uploader._outputter, uploads.uploader._SERVICE_URL, errors.check_response(), uploads.uploader.data_to_send, and writeEcalDQMStatus.write.

Referenced by uploads.uploader.exit_upload().

◆ exit_upload()

def uploads.uploader.exit_upload (   self,
  message = None 
)
Used to exit the script - which only happens if an error has occurred.
If the --testing flag was passed by the user, we should return False for failure, and not exit

Definition at line 320 of file uploads.py.

320  def exit_upload(self, message=None):
321  """
322  Used to exit the script - which only happens if an error has occurred.
323  If the --testing flag was passed by the user, we should return False for failure, and not exit
324  """
325  if self.upload_session_id != None:
326  # only try to close the upload session if an upload session has been obtained
327  response = self.close_upload_session(self.upload_session_id)
328  no_error = self.check_response_for_error_key(response)
329  # if no error was found in the upload session closure request,
330  # we still have to write the server side log
331  if no_error:
332  self.write_server_side_log(self._log_data)
333  # close client-side log handle
334  self._handle.close()
335  if message != None:
336  print("\n%s\n" % message)
337  if self._testing:
338  return False
339  else:
340  exit()
341 

References uploads.uploader._handle, uploads.uploader._log_data, uploads.uploader._testing, uploads.uploader.check_response_for_error_key(), uploads.uploader.close_upload_session(), beamvalidation.exit(), print(), uploads.uploader.upload_session_id, and uploads.uploader.write_server_side_log().

Referenced by uploads.uploader.filter_iovs_by_fcsr().

◆ filter_iovs_by_fcsr()

def uploads.uploader.filter_iovs_by_fcsr (   self,
  upload_session_id 
)
Ask for the server for the FCSR based on the synchronization type of the source Tag.
Then, modify the IOVs (possibly remove some) based on the FCSR we received.
This is useful in the case that most IOVs have different payloads, and our FCSR is close to the end of the range the IOVs cover.

Definition at line 551 of file uploads.py.

551  def filter_iovs_by_fcsr(self, upload_session_id):
552  """
553  Ask for the server for the FCSR based on the synchronization type of the source Tag.
554  Then, modify the IOVs (possibly remove some) based on the FCSR we received.
555  This is useful in the case that most IOVs have different payloads, and our FCSR is close to the end of the range the IOVs cover.
556  """
557  self._outputter.write("Getting the First Condition Safe Run for the current sync type.")
558 
559  fcsr_data = self.get_fcsr_from_server(upload_session_id)
560  fcsr = fcsr_data["fcsr"]
561  fcsr_changed = fcsr_data["fcsr_changed"]
562  new_sync = fcsr_data["new_sync"]
563 
564  if fcsr_changed:
565  self._outputter.write("Synchronization '%s' given was changed to '%s' to match destination Tag." % (self.data_to_send["fcsr_filter"], new_sync))
566 
567  self._outputter.write("Synchronization '%s' gave FCSR %d for FCSR Filtering."\
568  % (self.data_to_send["fcsr_filter"], friendly_since(self.data_to_send["inputTagData"]["time_type"], fcsr)))
569 
570  """
571  There may be cases where this assumption is not correct (that we can reassign since if fcsr > since)
572  Only set since to fcsr from server if the fcsr is further along than the user is trying to upload to
573  Note: this applies to run, lumi and timestamp run_types.
574  """
575 
576  # if the fcsr is above the since given by the user, we need to set the user since to the fcsr
577  if fcsr > self.data_to_send["since"]:
578  # check if we're uploading to offline sync - if so, then user since must be >= fcsr, so we should report an error
579  if self.data_to_send["fcsr_filter"].lower() == "offline":
580  self._outputter.write("If you're uploading to offline, you can't upload to a since < FCSR.\nNo upload has been processed.")
581  self.exit_upload()
582  self.data_to_send["since"] = fcsr
583 
584  self._outputter.write("Final FCSR after comparison with FCSR received from server is %d."\
585  % friendly_since(self.data_to_send["inputTagData"]["time_type"], int(self.data_to_send["since"])))
586 
587  """
588  Post validation processing assuming destination since is now valid.
589 
590  Because we don't have an sqlite database to query (everything's in a dictionary),
591  we have to go through the IOVs manually find the greatest since that's less than
592  the destination since.
593 
594  Purpose of this algorithm: move any IOV sinces that we can use up to the fcsr without leaving a hole in the Conditions coverage
595  """
596 
597  max_since_below_dest = self.data_to_send["iovs"][0]["since"]
598  for (i, iov) in enumerate(self.data_to_send["iovs"]):
599  if self.data_to_send["iovs"][i]["since"] <= self.data_to_send["since"] and self.data_to_send["iovs"][i]["since"] > max_since_below_dest:
600  max_since_below_dest = self.data_to_send["iovs"][i]["since"]
601 
602  # only select iovs that have sinces >= max_since_below_dest
603  # and then shift any IOVs left to the destination since
604  self.data_to_send["iovs"] = [iov for iov in self.data_to_send["iovs"] if iov["since"] >= max_since_below_dest]
605  for (i, iov) in enumerate(self.data_to_send["iovs"]):
606  if self.data_to_send["iovs"][i]["since"] < self.data_to_send["since"]:
607  self.data_to_send["iovs"][i]["since"] = self.data_to_send["since"]
608 
609  # modify insertion_time of iovs
610  new_time = to_timestamp(datetime.now())
611  for (i, iov) in enumerate(self.data_to_send["iovs"]):
612  self.data_to_send["iovs"][i]["insertion_time"] = new_time
613 

References uploads.uploader._outputter, uploads.uploader.data_to_send, uploads.uploader.exit_upload(), uploads.friendly_since(), uploads.uploader.get_fcsr_from_server(), createfilelist.int, conddb_time.to_timestamp(), and writeEcalDQMStatus.write.

◆ get_all_hashes()

def uploads.uploader.get_all_hashes (   self)
Get all the hashes from the dictionary of IOVs we have from the SQLite file.

Definition at line 614 of file uploads.py.

614  def get_all_hashes(self):
615  """
616  Get all the hashes from the dictionary of IOVs we have from the SQLite file.
617  """
618  self._outputter.write("\tGetting list of all hashes found in SQLite database.")
619  hashes = map(lambda iov : iov["payload_hash"], self.data_to_send["iovs"])
620  return hashes
621 

References uploads.uploader._outputter, errors.check_response(), uploads.uploader.data_to_send, genParticles_cff.map, and writeEcalDQMStatus.write.

Referenced by uploads.uploader.get_hashes_to_send().

◆ get_fcsr_from_server()

def uploads.uploader.get_fcsr_from_server (   self,
  upload_session_id 
)
Execute the HTTPs request to ask the server for the FCSR.

Note: we do this in a separate function we so we can do the decoding check for json data with check_response.

Definition at line 534 of file uploads.py.

534  def get_fcsr_from_server(self, upload_session_id):
535  """
536  Execute the HTTPs request to ask the server for the FCSR.
537 
538  Note: we do this in a separate function we so we can do the decoding check for json data with check_response.
539  """
540  # tiny amount of client-side logic here - all of the work is done on the server
541  url_data = {
542  "database" : self.data_to_send["destinationDatabase"],
543  "upload_session_id" : upload_session_id,
544  "destinationTag" : self.data_to_send["destinationTags"].keys()[0],
545  "sourceTagSync" : self.data_to_send["fcsr_filter"]
546  }
547  query = url_query(url=self._SERVICE_URL + "get_fcsr/", url_data=url_data)
548  result = query.send()
549  return result
550 

References uploads.uploader._SERVICE_URL, uploads.uploader.data_to_send, and relativeConstraints.keys.

Referenced by uploads.uploader.filter_iovs_by_fcsr().

◆ get_hashes_to_send()

def uploads.uploader.get_hashes_to_send (   self,
  upload_session_id 
)
Get the hashes of the payloads we want to send that the server doesn't have yet.

Definition at line 623 of file uploads.py.

623  def get_hashes_to_send(self, upload_session_id):
624  """
625  Get the hashes of the payloads we want to send that the server doesn't have yet.
626  """
627  self._outputter.write("Getting list of hashes that the server does not have Payloads for, to send to server.")
628  post_data = json.dumps(self.get_all_hashes())
629  url_data = {"database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id}
630  query = url_query(url=self._SERVICE_URL + "check_hashes/", url_data=url_data, body=post_data)
631  response = query.send()
632  return response
633 

References uploads.uploader._outputter, uploads.uploader._SERVICE_URL, uploads.uploader.data_to_send, uploads.uploader.get_all_hashes(), and writeEcalDQMStatus.write.

◆ get_tag_dictionary()

def uploads.uploader.get_tag_dictionary (   self)

Definition at line 244 of file uploads.py.

244  def get_tag_dictionary(self):
245  url_data = {"tag_name" : self.metadata_source["destinationTag"], "database" : self.metadata_source["destinationDatabase"]}
246  request = url_query(url=self._SERVICE_URL + "get_tag_dictionary/", url_data=url_data)
247  response = request.send()
248  return response
249 

References uploads.uploader._SERVICE_URL, and uploads.uploader.metadata_source.

◆ get_upload_session_id()

def uploads.uploader.get_upload_session_id (   self)
Open an upload session on the server, and get a unique token back that we can use to authenticate for all future requests,
as long as the upload session is still open.

Definition at line 496 of file uploads.py.

496  def get_upload_session_id(self):
497  """
498  Open an upload session on the server, and get a unique token back that we can use to authenticate for all future requests,
499  as long as the upload session is still open.
500  """
501  self._outputter.write("Getting upload session.")
502 
503  # send password in the body so it can be encrypted over https
504  # username and password are taken from the netrc file
505  # at this point, the value in username_or_token is always a username, since
506  # this method's end result is obtaining a token.
507  body_data = base64.b64encode(json.dumps(
508  {
509  "destinationTag" : self.data_to_send["destinationTags"].keys()[0],
510  "username_or_token" : self.data_to_send["username"],
511  "password" : self.data_to_send["password"]
512  }
513  ))
514 
515  url_data = {"database" : self.data_to_send["destinationDatabase"]}
516 
517  query = url_query(url=self._SERVICE_URL + "get_upload_session/", body=body_data, url_data=url_data)
518  response = query.send()
519  return response
520 

References uploads.uploader._outputter, uploads.uploader._SERVICE_URL, errors.check_response(), uploads.uploader.data_to_send, relativeConstraints.keys, and writeEcalDQMStatus.write.

Referenced by uploads.uploader.upload().

◆ send_blob()

def uploads.uploader.send_blob (   self,
  payload,
  upload_session_id 
)
Send the BLOB of a payload over HTTP.
The BLOB is put in the request body, so no additional processing has to be done on the server side, apart from decoding from base64.

Definition at line 685 of file uploads.py.

685  def send_blob(self, payload, upload_session_id):
686  """
687  Send the BLOB of a payload over HTTP.
688  The BLOB is put in the request body, so no additional processing has to be done on the server side, apart from decoding from base64.
689  """
690  # encode the BLOB data of the Payload to make sure we don't send a character that will influence the HTTPs request
691  blob_data = base64.b64encode(payload["data"])
692 
693  url_data = {"database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id}
694 
695  # construct the data to send in the body and header of the HTTPs request
696  for key in payload.keys():
697  # skip blob
698  if key != "data":
699  if key == "insertion_time":
700  url_data[key] = to_timestamp(payload[key])
701  else:
702  url_data[key] = payload[key]
703 
704  request = url_query(url=self._SERVICE_URL + "store_payload/", url_data=url_data, body=blob_data)
705 
706  # send the request and return the response
707  # Note - the url_query module will handle retries, and will throw a NoMoreRetriesException if it runs out
708  try:
709  request_response = request.send()
710  return request_response
711  except Exception as e:
712  # make sure we don't try again - if a NoMoreRetriesException has been thrown, retries have run out
713  if isinstance(e, errors.NoMoreRetriesException):
714  self._outputter.write("\t\t\tPayload with hash '%s' was not uploaded because the maximum number of retries was exceeded." % payload["hash"])
715  self._outputter.write("Payload with hash '%s' was not uploaded because the maximum number of retries was exceeded." % payload["hash"])
716  return json.dumps({"error" : str(e), "traceback" : traceback.format_exc()})
717 

References uploads.uploader._outputter, uploads.uploader._SERVICE_URL, errors.check_response(), uploads.uploader.data_to_send, str, conddb_time.to_timestamp(), and writeEcalDQMStatus.write.

Referenced by uploads.uploader.send_payloads().

◆ send_metadata()

def uploads.uploader.send_metadata (   self,
  upload_session_id 
)
Final part of the upload process - send the Conditions metadata (Tag, IOVs - not upload metadata).
The server closes the session (and releases the tag lock) after processing has been completed.

Definition at line 719 of file uploads.py.

719  def send_metadata(self, upload_session_id):
720  """
721  Final part of the upload process - send the Conditions metadata (Tag, IOVs - not upload metadata).
722  The server closes the session (and releases the tag lock) after processing has been completed.
723  """
724 
725  # set user text if it's empty
726  if self.data_to_send["userText"] in ["", None]:
727  self.data_to_send["userText"] = "Tag '%s' uploaded from CondDBFW client." % self.data_to_send["destinationTags"].keys()[0]
728 
729  self._outputter.write("Sending metadata to server - see server_side_log at server_side_logs/upload_log_%s for details on metadata processing on server side."\
730  % self.upload_session_id)
731 
732  # sent the HTTPs request to the server
733  url_data = {"database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id}
734  request = url_query(url=self._SERVICE_URL + "upload_metadata/", url_data=url_data, body=json.dumps(self.data_to_send))
735  response = request.send()
736  self._outputter.write("Response received - conditions upload process complete.")
737  return response
738 

References uploads.uploader._outputter, uploads.uploader._SERVICE_URL, uploads.uploader.data_to_send, relativeConstraints.keys, data_sources.json_data_node.make(), uploadConditions.parse_arguments(), print(), uploads.uploader.upload_session_id, and writeEcalDQMStatus.write.

◆ send_payloads()

def uploads.uploader.send_payloads (   self,
  hashes,
  upload_session_id 
)
Send a list of payloads corresponding to hashes we got from the SQLite file and filtered by asking the server.

Definition at line 634 of file uploads.py.

634  def send_payloads(self, hashes, upload_session_id):
635  """
636  Send a list of payloads corresponding to hashes we got from the SQLite file and filtered by asking the server.
637  """
638  # if we have no hashes, we can't send anything
639  # but don't exit since it might mean all the Payloads were already on the server
640  if len(hashes) == 0:
641  self._outputter.write("No hashes to send - moving to metadata upload.")
642  return True
643  else:
644  self._outputter.write("Sending payloads of hashes not found:")
645  # construct connection string for local SQLite database file
646  database = ("sqlite://%s" % os.path.abspath(self.sqlite_file_name)) if isinstance(self.sqlite_file_name, str) else self.sqlite_file_name
647  # create CondDBFW connection that maps blobs - as we need to query for payload BLOBs (disabled by default in CondDBFW)
648  self._outputter.write("\tConnecting to input SQLite database.")
649  con = querying.connect(database, map_blobs=True)
650 
651  # query for the Payloads
652  self._outputter.write("\tGetting Payloads from SQLite database based on list of hashes.")
653  payloads = con.payload(hash=hashes)
654  # if we get a single Payload back, put it in a list and turn it into a json_list
655  if payloads.__class__ != data_sources.json_list:
656  payloads = data_sources.json_data_node.make([payloads])
657 
658  # close the session with the SQLite database file - we won't use it again
659  con.close_session()
660 
661  # if found some Payloads, send them
662  if payloads:
663  # Note: there is an edge case in which the SQLite file could have been queried
664  # to delete the Payloads since we queried it for IOV hashes. This may be handled in the next iteration.
665  # send http post with data blob in body, and everything else as URL parameters
666  # convert Payload to a dictionary - we can put most of this into the URL of the HTTPs request
667  dicts = payloads.as_dicts()
668  self._outputter.write("Uploading Payload BLOBs:")
669 
670  # for each payload, send the BLOB to the server
671  for n, payload in enumerate(dicts):
672  self._outputter.write("\t(%d/%d) Sending payload with hash '%s'." % (n+1, len(dicts), payload["hash"]))
673  response = self.send_blob(payload, upload_session_id)
674  # check response for errors
675  no_error = self.check_response_for_error_key(response, exit_if_error=True)
676  if not(no_error):
677  return False
678  self._outputter.write("\tPayload sent - moving to next one.")
679  self._outputter.write("All Payloads uploaded.")
680  return True
681  else:
682  return False
683 

References uploads.uploader._outputter, errors.check_response(), uploads.uploader.check_response_for_error_key(), querying.connect(), data_sources.json_data_node.make(), uploads.uploader.send_blob(), uploads.uploader.sqlite_file_name, and writeEcalDQMStatus.write.

◆ upload()

def uploads.uploader.upload (   self)
Calls methods that send HTTP requests to the upload server.
Open an upload session on the server - this also gives us a tag lock on the tag being uploaded, if it is available.

Definition at line 342 of file uploads.py.

342  def upload(self):
343  """
344  Calls methods that send HTTP requests to the upload server.
345  """
346 
347  """
348  Open an upload session on the server - this also gives us a tag lock on the tag being uploaded, if it is available.
349  """
350  try:
351 
352  # get upload session, check response for error key
353  upload_session_data = self.get_upload_session_id()
354  no_error = self.check_response_for_error_key(upload_session_data)
355 
356  # if there was an error and we're testing, return False for the testing module
357  if not(no_error) and self._testing:
358  return False
359 
360  self.upload_session_id = upload_session_data["id"]
361  self._outputter.write("Upload session obtained with token '%s'." % self.upload_session_id)
362  self.server_side_log_file = upload_session_data["log_file"]
363 
364  except errors.NoMoreRetriesException as no_more_retries:
365  return self.exit_upload("Ran out of retries opening an upload session, where the limit was 3.")
366  except Exception as e:
367  # something went wrong that we have no specific exception for, so just exit and output the traceback if --debug is set.
368  self._outputter.write(traceback.format_exc(), ignore_verbose=True)
369 
370  if not(self._verbose):
371  self._outputter.write("Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
372  else:
373  self._outputter.write("Something went wrong that isn't handled by code - the traceback is above.")
374 
375  return self.exit_upload()
376 
377  """
378  Only if a value is given for --fcsr-filter, run FCSR filtering on the IOVs locally.
379  """
380  if self.data_to_send["fcsr_filter"] != None:
381  """
382  FCSR Filtering:
383  Filtering the IOVs before we send them by getting the First Conditions Safe Run
384  from the server based on the target synchronization type.
385  """
386  if self.data_to_send["inputTagData"]["time_type"] != "Time":
387  # if we have a time-based tag, we can't do FCSR validation - this is also the case on the server side
388  try:
389  self.filter_iovs_by_fcsr(self.upload_session_id)
390  # this function does not return a value, since it just operates on data - so no point checking for an error key
391  # the error key check is done inside the function on the response from the server
392  except errors.NoMoreRetriesException as no_more_retries:
393  return self.exit_upload("Ran out of retries trying to filter IOVs by FCSR from server, where the limit was 3.")
394  except Exception as e:
395  # something went wrong that we have no specific exception for, so just exit and output the traceback if --debug is set.
396  self._outputter.write(traceback.format_exc(), ignore_verbose=True)
397 
398  if not(self._verbose):
399  self._outputter.write("Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
400  else:
401  self._outputter.write("Something went wrong that isn't handled by code - the traceback is above.")
402 
403  return self.exit_upload()
404  else:
405  self._outputter.write("The Tag you're uploading is time-based, so we can't do any FCSR-based validation. FCSR filtering is being skipped.")
406 
407  """
408  Check for the hashes that the server doesn't have - only send these (but in the next step).
409  """
410  try:
411 
412  check_hashes_response = self.get_hashes_to_send(self.upload_session_id)
413  # check for an error key in the response
414  no_error = self.check_response_for_error_key(check_hashes_response)
415 
416  # if there was an error and we're testing, return False for the testing module
417  if not(no_error) and self._testing:
418  return False
419 
420  # finally, check hashes_not_found with hashes not found locally - if there is an intersection, we stop the upload
421  # because if a hash is not found and is not on the server, there is no data to upload
422  all_hashes = map(lambda iov : iov["payload_hash"], self.data_to_send["iovs"])
423  hashes_not_found = check_hashes_response["hashes_not_found"]
424  hashes_found = list(set(all_hashes) - set(hashes_not_found))
425  self._outputter.write("Checking for IOVs that have no Payload locally or on the server.")
426  # check if any hashes not found on the server is used in the local SQLite database
427  for hash_not_found in hashes_not_found:
428  if hash_not_found in self.hashes_with_no_local_payload:
429  return self.exit_upload("IOV with hash '%s' does not have a Payload locally or on the server. Cannot continue." % hash_not_found)
430 
431  for hash_found in hashes_found:
432  if hash_found in self.hashes_with_no_local_payload:
433  self._outputter.write("Payload with hash %s on server, so can upload IOV." % hash_found)
434 
435  self._outputter.write("All IOVs either come with Payloads or point to a Payload already on the server.")
436 
437  except errors.NoMoreRetriesException as no_more_retries:
438  # for now, just write the log if we get a NoMoreRetriesException
439  return self.exit_upload("Ran out of retries trying to check hashes of payloads to send, where the limit was 3.")
440  except Exception as e:
441  # something went wrong that we have no specific exception for, so just exit and output the traceback if --debug is set.
442  self._outputter.write(traceback.format_exc(), ignore_verbose=True)
443 
444  if not(self._verbose):
445  self._outputter.write("Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
446  else:
447  self._outputter.write("Something went wrong that isn't handled by code - the traceback is above.")
448 
449  return self.exit_upload()
450 
451  """
452  Send the payloads the server told us about in the previous step (returned from get_hashes_to_send)
453  exception handling is done inside this method, since it calls a method itself for each payload.
454  """
455  send_payloads_response = self.send_payloads(check_hashes_response["hashes_not_found"], self.upload_session_id)
456  if self._testing and not(send_payloads_response):
457  return False
458 
459  """
460  Final stage - send metadata to server (since the payloads are there now)
461  if this is successful, once it finished the upload session is closed on the server and the tag lock is released.
462  """
463  try:
464 
465  # note that the response (in send_metadata_response) is already decoded from base64 by the response check decorator
466  send_metadata_response = self.send_metadata(self.upload_session_id)
467  no_error = self.check_response_for_error_key(send_metadata_response)
468  if not(no_error) and self._testing:
469  return False
470 
471  # we have to call this explicitly here since check_response_for_error_key only writes the log file
472  # if an error has occurred, whereas it should always be written here
473  self.write_server_side_log(self._log_data)
474 
475  except errors.NoMoreRetriesException as no_more_retries:
476  return self.exit_upload("Ran out of retries trying to send metadata, where the limit was 3.")
477  except Exception as e:
478  # something went wrong that we have no specific exception for, so just exit and output the traceback if --debug is set.
479  self._outputter.write(traceback.format_exc(), ignore_verbose=True)
480 
481  if not(self._verbose):
482  self._outputter.write("Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
483  else:
484  self._outputter.write("Something went wrong that isn't handled by code - the traceback is above.")
485 
486  return self.exit_upload()
487 
488  # close client side log handle
489  self._handle.close()
490 
491  # if we're running the testing script, return True to say the upload has worked
492  if self._testing:
493  return True
494 

References uploads.uploader._outputter, uploads.uploader._testing, uploads.uploader.check_response_for_error_key(), uploads.uploader.get_upload_session_id(), uploads.uploader.upload_session_id, and writeEcalDQMStatus.write.

◆ write_server_side_log()

def uploads.uploader.write_server_side_log (   self,
  log_data 
)
Given the log data from the server, write it to a client-side log file.

Definition at line 286 of file uploads.py.

286  def write_server_side_log(self, log_data):
287  """
288  Given the log data from the server, write it to a client-side log file.
289  """
290  # if the server_side_log directory doesn't exist, create it
291  # without it we can't write the log when we download it from the server
292  if not(os.path.exists(os.path.join(os.getcwd(), "server_side_logs/"))):
293  os.makedirs("server_side_logs/")
294 
295  # directory exists now, write to client-side log file
296  server_log_file_name = None
297  try:
298  # if the upload session does not exist yet, don't try to write the log file
299  if self.upload_session_id == None:
300  raise Exception("No upload session")
301  # create a write handle to the file, decode the log data from base64, write and close
302  server_log_file_name = "server_side_logs/upload_log_%s" % str(self.upload_session_id)
303  handle = open(server_log_file_name, "w")
304  handle.write(base64.b64decode(log_data))
305  handle.close()
306  except Exception as e:
307  # reset log file name to None so we don't try to write it later
308  server_log_file_name = None
309  #self._outputter.write("Couldn't write the server-side log file.\nThis may be because no upload session could be opened.")
310 
311  # tell the user where the log files are
312  # in the next iteration we may just merge the log files and store one log (how it's done in the plotter module)
313  if server_log_file_name != None:
314  print("Log file from server written to '%s'." % server_log_file_name)
315  else:
316  print("No server log file could be written locally.")
317 
318  print("Log file from CondDBFW written to '%s'." % self.upload_log_file_name)
319 

References print(), str, uploads.uploader.upload_log_file_name, and uploads.uploader.upload_session_id.

Referenced by uploads.uploader.check_response_for_error_key(), and uploads.uploader.exit_upload().

Member Data Documentation

◆ _debug

uploads.uploader._debug
private

◆ _handle

uploads.uploader._handle
private

Definition at line 108 of file uploads.py.

Referenced by uploads.uploader.exit_upload().

◆ _log_data

uploads.uploader._log_data
private

◆ _outputter

uploads.uploader._outputter
private

◆ _SERVICE_URL

uploads.uploader._SERVICE_URL
private

◆ _testing

uploads.uploader._testing
private

◆ _verbose

uploads.uploader._verbose
private

◆ data_to_send

uploads.uploader.data_to_send

◆ hashes_with_no_local_payload

uploads.uploader.hashes_with_no_local_payload

Definition at line 184 of file uploads.py.

◆ input_tag

uploads.uploader.input_tag

Definition at line 148 of file uploads.py.

◆ metadata_source

uploads.uploader.metadata_source

Definition at line 120 of file uploads.py.

Referenced by uploads.uploader.get_tag_dictionary().

◆ server_side_log_file

uploads.uploader.server_side_log_file

Definition at line 362 of file uploads.py.

◆ sqlite_file_name

uploads.uploader.sqlite_file_name

Definition at line 154 of file uploads.py.

Referenced by uploads.uploader.send_payloads().

◆ upload_log_file_name

uploads.uploader.upload_log_file_name

Definition at line 107 of file uploads.py.

Referenced by uploads.uploader.write_server_side_log().

◆ upload_session_id

uploads.uploader.upload_session_id
convertSQLitetoXML_cfg.output
output
Definition: convertSQLitetoXML_cfg.py:72
data_sources.json_data_node.make
def make(data)
Definition: data_sources.py:131
querying.connect
def connect(connection_data, mode="r", map_blobs=False, secrets=None, pooling=True)
Definition: querying.py:452
relativeConstraints.keys
keys
Definition: relativeConstraints.py:89
conddb_time.to_timestamp
def to_timestamp(dt)
Definition: conddb_time.py:13
str
#define str(s)
Definition: TestProcessor.cc:51
uploads.friendly_since
def friendly_since(time_type, since)
Definition: uploads.py:30
print
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:46
Exception
createfilelist.int
int
Definition: createfilelist.py:10
writeEcalDQMStatus.write
write
Definition: writeEcalDQMStatus.py:48
errors.NoMoreRetriesException
Definition: errors.py:19
data_sources.json_list
Definition: data_sources.py:175
data
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:79
url_query
Definition: url_query.py:1
alcaDQMUpload.upload
def upload(url, args, files)
Definition: alcaDQMUpload.py:65
genParticles_cff.map
map
Definition: genParticles_cff.py:11
beamvalidation.exit
def exit(msg="")
Definition: beamvalidation.py:53
uploads.new_log_file_id
def new_log_file_id()
Definition: uploads.py:47