CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
List of all members | Public Member Functions | Public Attributes | Private Attributes
uploads.uploader Class Reference
Inheritance diagram for uploads.uploader:

Public Member Functions

def __init__
 
def check_response_for_error_key
 
def close_upload_session
 
def exit_upload
 
def filter_iovs_by_fcsr
 
def get_all_hashes
 
def get_fcsr_from_server
 
def get_hashes_to_send
 
def get_tag_dictionary
 
def get_upload_session_id
 
def send_blob
 
def send_metadata
 
def send_payloads
 
def upload
 
def write_server_side_log
 

Public Attributes

 data_to_send
 
 hashes_with_no_local_payload
 
 input_tag
 
 metadata_source
 
 server_side_log_file
 
 sqlite_file_name
 
 upload_log_file_name
 
 upload_session_id
 

Private Attributes

 _debug
 
 _handle
 
 _log_data
 
 _outputter
 
 _SERVICE_URL
 
 _testing
 
 _verbose
 

Detailed Description

Upload session controller - creates, tracks, and deletes upload sessions on the server.

Definition at line 81 of file uploads.py.

Constructor & Destructor Documentation

def uploads.uploader.__init__ (   self,
  metadata_source = None,
  debug = False,
  verbose = False,
  testing = False,
  server = "https://cms-conddb-dev.cern.ch/cmsDbCondUpload/",
  kwargs 
)
Upload constructor:
Given an SQLite file and a Metadata sources, reads into a dictionary read for it to be encoded and uploaded.

Note: kwargs is used to capture stray arguments - arguments that do not match keywords will not be used.

Note: default value of service_url should be changed for production.

Definition at line 86 of file uploads.py.

86 
87  def __init__(self, metadata_source=None, debug=False, verbose=False, testing=False, server="https://cms-conddb-dev.cern.ch/cmsDbCondUpload/", **kwargs):
88  """
89  Upload constructor:
90  Given an SQLite file and a Metadata sources, reads into a dictionary read for it to be encoded and uploaded.
91 
92  Note: kwargs is used to capture stray arguments - arguments that do not match keywords will not be used.
93 
94  Note: default value of service_url should be changed for production.
95  """
96  # set private variables
97  self._debug = debug
98  self._verbose = verbose
99  self._testing = testing
100  # initialise server-side log data as empty string - will be replaced when we get a response back from the server
101  self._log_data = ""
102  self._SERVICE_URL = server
103  self.upload_session_id = None
104 
105  # set up client-side log file
106  self.upload_log_file_name = "upload_logs/upload_log_%d" % new_log_file_id()
107  self._handle = open(self.upload_log_file_name, "a")
108 
109  # set up client-side logging object
110  self._outputter = output(verbose=verbose, log_handle=self._handle)
111  self._outputter.write("Using server instance at '%s'." % self._SERVICE_URL)
112 
113  # expect a CondDBFW data_source object for metadata_source
114  if metadata_source == None:
115  # no upload metadat has been given - we cannot continue with the upload
116  self.exit_upload("A source of metadata must be given so CondDBFW knows how to upload conditions.")
117  else:
118  # set up global metadata source variable
119  self.metadata_source = metadata_source.data()
120 
121  # check for the destination tag
122  # this is required whatever type of upload we're performing
123  if self.metadata_source.get("destinationTags") == None:
124  self.exit_upload("No destination Tag was given.")
125  else:
126  if type(self.metadata_source.get("destinationTags")) == dict and list(self.metadata_source.get("destinationTags").keys())[0] == None:
127  self.exit_upload("No destination Tag was given.")
128 
129  # make sure a destination database was given
130  if self.metadata_source.get("destinationDatabase") == None:
131  self.exit_upload("No destination database was given.")
132 
133  # get Conditions metadata
134  if self.metadata_source.get("sourceDB") == None and self.metadata_source.get("hashToUse") == None:
135  """
136  If we have neither an sqlite file nor the command line data
137  """
138  self.exit_upload("You must give either an SQLite database file, or the necessary command line arguments to replace one."\
139  + "\nSee --help for command line argument information.")
140  elif self.metadata_source.get("sourceDB") != None:
141  """
142  We've been given an SQLite file, so try to extract Conditions Metadata based on that and the Upload Metadata in metadata_source
143  We now extract the Tag and IOV data from SQLite. It is added to the dictionary for sending over HTTPs later.
144  """
145 
146  # make sure we have an input tag to look for in the source db
147  self.input_tag = metadata_source.data().get("inputTag")
148  if self.input_tag == None:
149  self.exit_upload("No input Tag name was given.")
150 
151  # set empty dictionary to contain Tag and IOV data from SQLite
152  result_dictionary = {}
153  self.sqlite_file_name = self.metadata_source["sourceDB"]
154  if not(os.path.isfile(self.sqlite_file_name)):
155  self.exit_upload("SQLite file '%s' given doesn't exist." % self.sqlite_file_name)
156  sqlite_con = querying.connect("sqlite://%s" % os.path.abspath(self.sqlite_file_name))
157 
158  self._outputter.write("Getting Tag and IOVs from SQLite database.")
159 
160  # query for Tag, check for existence, then convert to dictionary
161  tag = sqlite_con.tag(name=self.input_tag)
162  if tag == None:
163  self.exit_upload("The source Tag '%s' you gave was not found in the SQLite file." % self.input_tag)
164  tag = tag.as_dicts(convert_timestamps=True)
165 
166  # query for IOVs, check for existence, then convert to dictionaries
167  iovs = sqlite_con.iov(tag_name=self.input_tag)
168  if iovs == None:
169  self.exit_upload("No IOVs found in the SQLite file given for Tag '%s'." % self.input_tag)
170  iovs = iovs.as_dicts(convert_timestamps=True)
171  iovs = [iovs] if type(iovs) != list else iovs
172 
173  """
174  Finally, get the list of all Payload hashes of IOVs,
175  then compute the list of hashes for which there is no Payload for
176  this is used later to decide if we can continue the upload if the Payload was not found on the server.
177  """
178  iovs_for_hashes = sqlite_con.iov(tag_name=self.input_tag)
179  if iovs_for_hashes.__class__ == data_sources.json_list:
180  hashes_of_iovs = iovs_for_hashes.get_members("payload_hash").data()
181  else:
182  hashes_of_iovs = [iovs_for_hashes.payload_hash]
183  self.hashes_with_no_local_payload = [payload_hash for payload_hash in hashes_of_iovs if sqlite_con.payload(hash=payload_hash) == None]
184 
185  # close session open on SQLite database file
186  sqlite_con.close_session()
187 
188  elif metadata_source.data().get("hashToUse") != None:
189  """
190  Assume we've been given metadata in the command line (since no sqlite file is there, and we have command line arguments).
191  We now use Tag and IOV data from command line. It is added to the dictionary for sending over HTTPs later.
192  """
193 
194  # set empty dictionary to contain Tag and IOV data from command line
195  result_dictionary = {}
196 
197  now = to_timestamp(datetime.utcnow())
198  # tag dictionary will be taken from the server
199  # this does not require any authentication
200  tag = self.get_tag_dictionary()
202  iovs = [{"tag_name" : self.metadata_source["destinationTag"], "since" : self.metadata_source["since"], "payload_hash" : self.metadata_source["hashToUse"],\
203  "insertion_time" : now}]
204 
205  # hashToUse cannot be stored locally (no sqlite file is given), so register it as not found
206  self.hashes_with_no_local_payload = [self.metadata_source["hashToUse"]]
207 
208  # Note: normal optimisations will still take place - since the hash checking stage can tell if hashToUse does not exist on the server side
209 
210  # if the source Tag is run-based, convert sinces to lumi-based sinces with lumi-section = 0
211  if tag["time_type"] == "Run":
212  for (i, iov) in enumerate(iovs):
213  iovs[i]["since"] = iovs[i]["since"] << 32
214 
215  result_dictionary = {"inputTagData" : tag, "iovs" : iovs}
216 
217  # add command line arguments to dictionary
218  # remembering that metadata_source is a json_dict object
219  result_dictionary.update(metadata_source.data())
220 
221  # store in instance variable
222  self.data_to_send = result_dictionary
223 
224  # if the since doesn't exist, take the first since from the list of IOVs
225  if result_dictionary.get("since") == None:
226  result_dictionary["since"] = sorted(iovs, key=lambda iov : iov["since"])[0]["since"]
227  elif self.data_to_send["inputTagData"]["time_type"] == "Run":
228  # Tag time_type says IOVs use Runs for sinces, so we convert to Lumi-based for uniform processing
229  self.data_to_send["since"] = self.data_to_send["since"] << 32
230 
231  """
232  TODO - Settle on a single destination tag format.
233  """
234  # look for deprecated metadata entries - give warnings
235  # Note - we only really support this format
236  try:
237  if type(result_dictionary["destinationTags"]) == dict:
238  self._outputter.write("WARNING: Multiple destination tags in a single metadata source is deprecated.")
239  except Exception as e:
240  self._outputter.write("ERROR: %s" % str(e))
hashes_with_no_local_payload
Definition: uploads.py:182
def new_log_file_id
Definition: uploads.py:45
def connect
Definition: querying.py:453
def check_response_for_error_key
Definition: uploads.py:248
def to_timestamp
Definition: conddb_time.py:13
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:79
def get_tag_dictionary
Definition: uploads.py:242
#define str(s)

Member Function Documentation

def uploads.uploader.check_response_for_error_key (   self,
  response_dict,
  exit_if_error = True 
)
Checks the decoded response of an HTTP request to the server.
If it is a dictionary, and one of its keys is "error", the server returned an error

Definition at line 248 of file uploads.py.

References EcalMatacqAnalyzer._debug, EcalABAnalyzer._debug, uploads.uploader._debug, EcalLaserAnalyzer2._debug, EcalLaserAnalyzer._debug, uploads.uploader._log_data, uploads.uploader._testing, beamvalidation.exit(), and uploads.uploader.write_server_side_log().

Referenced by uploads.uploader.exit_upload(), uploads.uploader.send_payloads(), and uploads.uploader.upload().

249  def check_response_for_error_key(self, response_dict, exit_if_error=True):
250  """
251  Checks the decoded response of an HTTP request to the server.
252  If it is a dictionary, and one of its keys is "error", the server returned an error
253  """
254  # if the decoded response data is a dictionary and has an error key in it, we should display an error and its traceback
255  if type(response_dict) == dict and "error" in list(response_dict.keys()):
256  splitter_string = "\n%s\n" % ("-"*50)
257  self._outputter.write("\nERROR: %s" % splitter_string, ignore_verbose=True)
258  self._outputter.write(response_dict["error"], ignore_verbose=True)
259 
260  # if the user has given the --debug flag, show the traceback as well
261  if self._debug:
262  # suggest to the user to email this to db upload experts
263  self._outputter.write("\nTRACEBACK (since --debug is set):%s" % splitter_string, ignore_verbose=True)
264  if response_dict.get("traceback") != None:
265  self._outputter.write(response_dict["traceback"], ignore_verbose=True)
266  else:
267  self._outputter.write("No traceback was returned from the server.", ignore_verbose=True)
268  else:
269  self._outputter.write("Use the --debug option to show the traceback of this error.", ignore_verbose=True)
270 
271  # write server side log to client side (if we have an error from creating an upload session, the log is in its initial state (""))
272  # if an error has occurred on the server side, a log will have been written
273  self.write_server_side_log(response_dict.get("log_data"))
274 
275  if exit_if_error:
276  if self._testing:
277  return False
278  else:
279  exit()
280  elif not("error" in list(response_dict.keys())) and "log_data" in list(response_dict.keys()):
281  # store the log data, if it's there, in memory - this is used if a request times out and we don't get any log data back
282  self._log_data = response_dict["log_data"]
283  return True
def check_response_for_error_key
Definition: uploads.py:248
def write_server_side_log
Definition: uploads.py:284
def uploads.uploader.close_upload_session (   self,
  upload_session_id 
)
Close an upload session on the server by calling its close_upload_session end-point.
This is done if there is an error on the client-side.

Definition at line 520 of file uploads.py.

References uploads.uploader._SERVICE_URL, errors.check_response(), and uploads.uploader.data_to_send.

Referenced by uploads.uploader.exit_upload().

521  def close_upload_session(self, upload_session_id):
522  """
523  Close an upload session on the server by calling its close_upload_session end-point.
524  This is done if there is an error on the client-side.
525  """
526  self._outputter.write("An error occurred - closing the upload session on the server.")
527  url_data = {"database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id}
528  query = url_query(url=self._SERVICE_URL + "close_upload_session/", url_data=url_data)
529  response = query.send()
530  return response
def close_upload_session
Definition: uploads.py:520
def uploads.uploader.exit_upload (   self,
  message = None 
)
Used to exit the script - which only happens if an error has occurred.
If the --testing flag was passed by the user, we should return False for failure, and not exit

Definition at line 318 of file uploads.py.

References uploads.uploader._log_data, uploads.uploader._testing, uploads.uploader.check_response_for_error_key(), uploads.uploader.close_upload_session(), beamvalidation.exit(), print(), uploads.uploader.upload_session_id, and uploads.uploader.write_server_side_log().

Referenced by uploads.uploader.filter_iovs_by_fcsr().

319  def exit_upload(self, message=None):
320  """
321  Used to exit the script - which only happens if an error has occurred.
322  If the --testing flag was passed by the user, we should return False for failure, and not exit
323  """
324  if self.upload_session_id != None:
325  # only try to close the upload session if an upload session has been obtained
326  response = self.close_upload_session(self.upload_session_id)
327  no_error = self.check_response_for_error_key(response)
328  # if no error was found in the upload session closure request,
329  # we still have to write the server side log
330  if no_error:
332  # close client-side log handle
333  self._handle.close()
334  if message != None:
335  print("\n%s\n" % message)
336  if self._testing:
337  return False
338  else:
339  exit()
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
def check_response_for_error_key
Definition: uploads.py:248
def write_server_side_log
Definition: uploads.py:284
def close_upload_session
Definition: uploads.py:520
def uploads.uploader.filter_iovs_by_fcsr (   self,
  upload_session_id 
)
Ask for the server for the FCSR based on the synchronization type of the source Tag.
Then, modify the IOVs (possibly remove some) based on the FCSR we received.
This is useful in the case that most IOVs have different payloads, and our FCSR is close to the end of the range the IOVs cover.

Definition at line 553 of file uploads.py.

References uploads.uploader.data_to_send, uploads.uploader.exit_upload(), uploads.friendly_since(), uploads.uploader.get_fcsr_from_server(), and conddb_time.to_timestamp().

554  def filter_iovs_by_fcsr(self, upload_session_id):
555  """
556  Ask for the server for the FCSR based on the synchronization type of the source Tag.
557  Then, modify the IOVs (possibly remove some) based on the FCSR we received.
558  This is useful in the case that most IOVs have different payloads, and our FCSR is close to the end of the range the IOVs cover.
559  """
560  self._outputter.write("Getting the First Condition Safe Run for the current sync type.")
561 
562  fcsr_data = self.get_fcsr_from_server(upload_session_id)
563  fcsr = fcsr_data["fcsr"]
564  fcsr_changed = fcsr_data["fcsr_changed"]
565  new_sync = fcsr_data["new_sync"]
566 
567  if fcsr_changed:
568  self._outputter.write("Synchronization '%s' given was changed to '%s' to match destination Tag." % (self.data_to_send["fcsr_filter"], new_sync))
569 
570  self._outputter.write("Synchronization '%s' gave FCSR %d for FCSR Filtering."\
571  % (self.data_to_send["fcsr_filter"], friendly_since(self.data_to_send["inputTagData"]["time_type"], fcsr)))
572 
573  """
574  There may be cases where this assumption is not correct (that we can reassign since if fcsr > since)
575  Only set since to fcsr from server if the fcsr is further along than the user is trying to upload to
576  Note: this applies to run, lumi and timestamp run_types.
577  """
578 
579  # if the fcsr is above the since given by the user, we need to set the user since to the fcsr
580  if fcsr > self.data_to_send["since"]:
581  # check if we're uploading to offline sync - if so, then user since must be >= fcsr, so we should report an error
582  if self.data_to_send["fcsr_filter"].lower() == "offline":
583  self._outputter.write("If you're uploading to offline, you can't upload to a since < FCSR.\nNo upload has been processed.")
584  self.exit_upload()
585  self.data_to_send["since"] = fcsr
586 
587  self._outputter.write("Final FCSR after comparison with FCSR received from server is %d."\
588  % friendly_since(self.data_to_send["inputTagData"]["time_type"], int(self.data_to_send["since"])))
589 
590  """
591  Post validation processing assuming destination since is now valid.
592 
593  Because we don't have an sqlite database to query (everything's in a dictionary),
594  we have to go through the IOVs manually find the greatest since that's less than
595  the destination since.
596 
597  Purpose of this algorithm: move any IOV sinces that we can use up to the fcsr without leaving a hole in the Conditions coverage
598  """
599 
600  max_since_below_dest = self.data_to_send["iovs"][0]["since"]
601  for (i, iov) in enumerate(self.data_to_send["iovs"]):
602  if self.data_to_send["iovs"][i]["since"] <= self.data_to_send["since"] and self.data_to_send["iovs"][i]["since"] > max_since_below_dest:
603  max_since_below_dest = self.data_to_send["iovs"][i]["since"]
604 
605  # only select iovs that have sinces >= max_since_below_dest
606  # and then shift any IOVs left to the destination since
607  self.data_to_send["iovs"] = [iov for iov in self.data_to_send["iovs"] if iov["since"] >= max_since_below_dest]
608  for (i, iov) in enumerate(self.data_to_send["iovs"]):
609  if self.data_to_send["iovs"][i]["since"] < self.data_to_send["since"]:
610  self.data_to_send["iovs"][i]["since"] = self.data_to_send["since"]
611 
612  # modify insertion_time of iovs
613  new_time = to_timestamp(datetime.utcnow())
614  for (i, iov) in enumerate(self.data_to_send["iovs"]):
615  self.data_to_send["iovs"][i]["insertion_time"] = new_time
def get_fcsr_from_server
Definition: uploads.py:532
def friendly_since
Definition: uploads.py:28
def to_timestamp
Definition: conddb_time.py:13
def filter_iovs_by_fcsr
Definition: uploads.py:553
def uploads.uploader.get_all_hashes (   self)
Get all the hashes from the dictionary of IOVs we have from the SQLite file.

Definition at line 616 of file uploads.py.

References errors.check_response(), and uploads.uploader.data_to_send.

Referenced by uploads.uploader.get_hashes_to_send().

617  def get_all_hashes(self):
618  """
619  Get all the hashes from the dictionary of IOVs we have from the SQLite file.
620  """
621  self._outputter.write("\tGetting list of all hashes found in SQLite database.")
622  hashes = [iov["payload_hash"] for iov in self.data_to_send["iovs"]]
623  return hashes
def get_all_hashes
Definition: uploads.py:616
def uploads.uploader.get_fcsr_from_server (   self,
  upload_session_id 
)
Execute the HTTPs request to ask the server for the FCSR.

Note: we do this in a separate function we so we can do the decoding check for json data with check_response.

Definition at line 532 of file uploads.py.

References uploads.uploader._SERVICE_URL, uploads.uploader.data_to_send, and relativeConstraints.keys.

Referenced by uploads.uploader.filter_iovs_by_fcsr().

533  def get_fcsr_from_server(self, upload_session_id):
534  """
535  Execute the HTTPs request to ask the server for the FCSR.
536 
537  Note: we do this in a separate function we so we can do the decoding check for json data with check_response.
538  """
539  # tiny amount of client-side logic here - all of the work is done on the server
540  # tier0_response uses get() so if the key isn't present, we default to None
541  # tier0_response is for replaying uploads from the old upload service, with knowledge of the tier0 response
542  # when those uploads happened.
543  url_data = {
544  "database" : self.data_to_send["destinationDatabase"],
545  "upload_session_id" : upload_session_id,
546  "destinationTag" : list(self.data_to_send["destinationTags"].keys())[0],
547  "sourceTagSync" : self.data_to_send["fcsr_filter"],
548  "tier0_response" : self.data_to_send.get("tier0_response")
549  }
550  query = url_query(url=self._SERVICE_URL + "get_fcsr/", url_data=url_data)
551  result = query.send()
552  return result
def get_fcsr_from_server
Definition: uploads.py:532
def uploads.uploader.get_hashes_to_send (   self,
  upload_session_id 
)
Get the hashes of the payloads we want to send that the server doesn't have yet.

Definition at line 625 of file uploads.py.

References uploads.uploader._SERVICE_URL, uploads.uploader.data_to_send, and uploads.uploader.get_all_hashes().

626  def get_hashes_to_send(self, upload_session_id):
627  """
628  Get the hashes of the payloads we want to send that the server doesn't have yet.
629  """
630  self._outputter.write("Getting list of hashes that the server does not have Payloads for, to send to server.")
631  post_data = json.dumps(self.get_all_hashes())
632  url_data = {"database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id}
633  query = url_query(url=self._SERVICE_URL + "check_hashes/", url_data=url_data, body=post_data)
634  response = query.send()
635  return response
def get_all_hashes
Definition: uploads.py:616
def get_hashes_to_send
Definition: uploads.py:625
def uploads.uploader.get_tag_dictionary (   self)

Definition at line 242 of file uploads.py.

References uploads.uploader._SERVICE_URL, and uploads.uploader.metadata_source.

243  def get_tag_dictionary(self):
244  url_data = {"tag_name" : self.metadata_source["destinationTag"], "database" : self.metadata_source["destinationDatabase"]}
245  request = url_query(url=self._SERVICE_URL + "get_tag_dictionary/", url_data=url_data)
246  response = request.send()
247  return response
def get_tag_dictionary
Definition: uploads.py:242
def uploads.uploader.get_upload_session_id (   self)
Open an upload session on the server, and get a unique token back that we can use to authenticate for all future requests,
as long as the upload session is still open.

Definition at line 494 of file uploads.py.

References uploads.uploader._SERVICE_URL, errors.check_response(), uploads.uploader.data_to_send, alcaDQMUpload.encode(), and relativeConstraints.keys.

Referenced by uploads.uploader.upload().

495  def get_upload_session_id(self):
496  """
497  Open an upload session on the server, and get a unique token back that we can use to authenticate for all future requests,
498  as long as the upload session is still open.
499  """
500  self._outputter.write("Getting upload session.")
501 
502  # send password in the body so it can be encrypted over https
503  # username and password are taken from the netrc file
504  # at this point, the value in username_or_token is always a username, since
505  # this method's end result is obtaining a token.
506  body_data = base64.b64encode(json.dumps(
507  {
508  "destinationTag" : list(self.data_to_send["destinationTags"].keys())[0],
509  "username_or_token" : self.data_to_send["username"],
510  "password" : self.data_to_send["password"]
511  }
512  ).encode('UTF-8'))
513 
514  url_data = {"database" : self.data_to_send["destinationDatabase"]}
515 
516  query = url_query(url=self._SERVICE_URL + "get_upload_session/", body=body_data, url_data=url_data)
517  response = query.send()
518  return response
def get_upload_session_id
Definition: uploads.py:494
def uploads.uploader.send_blob (   self,
  payload,
  upload_session_id 
)
Send the BLOB of a payload over HTTP.
The BLOB is put in the request body, so no additional processing has to be done on the server side, apart from decoding from base64.

Definition at line 688 of file uploads.py.

References uploads.uploader._SERVICE_URL, errors.check_response(), uploads.uploader.data_to_send, str, and conddb_time.to_timestamp().

Referenced by uploads.uploader.send_payloads().

689  def send_blob(self, payload, upload_session_id):
690  """
691  Send the BLOB of a payload over HTTP.
692  The BLOB is put in the request body, so no additional processing has to be done on the server side, apart from decoding from base64.
693  """
694  # encode the BLOB data of the Payload to make sure we don't send a character that will influence the HTTPs request
695  blob_data = base64.b64encode(payload["data"])
696 
697  url_data = {"database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id}
698 
699  # construct the data to send in the body and header of the HTTPs request
700  for key in list(payload.keys()):
701  # skip blob
702  if key != "data":
703  if key == "insertion_time":
704  url_data[key] = to_timestamp(payload[key])
705  else:
706  url_data[key] = payload[key]
707 
708  request = url_query(url=self._SERVICE_URL + "store_payload/", url_data=url_data, body=blob_data)
709 
710  # send the request and return the response
711  # Note - the url_query module will handle retries, and will throw a NoMoreRetriesException if it runs out
712  try:
713  request_response = request.send()
714  return request_response
715  except Exception as e:
716  # make sure we don't try again - if a NoMoreRetriesException has been thrown, retries have run out
717  if type(e) == errors.NoMoreRetriesException:
718  self._outputter.write("\t\t\tPayload with hash '%s' was not uploaded because the maximum number of retries was exceeded." % payload["hash"])
719  self._outputter.write("Payload with hash '%s' was not uploaded because the maximum number of retries was exceeded." % payload["hash"])
720  return json.dumps({"error" : str(e), "traceback" : traceback.format_exc()})
def to_timestamp
Definition: conddb_time.py:13
#define str(s)
def uploads.uploader.send_metadata (   self,
  upload_session_id 
)
Final part of the upload process - send the Conditions metadata (Tag, IOVs - not upload metadata).
The server closes the session (and releases the tag lock) after processing has been completed.

Definition at line 722 of file uploads.py.

References uploads.uploader._SERVICE_URL, uploads.uploader.data_to_send, relativeConstraints.keys, print(), and uploads.uploader.upload_session_id.

723  def send_metadata(self, upload_session_id):
724  """
725  Final part of the upload process - send the Conditions metadata (Tag, IOVs - not upload metadata).
726  The server closes the session (and releases the tag lock) after processing has been completed.
727  """
728 
729  # set user text if it's empty
730  if self.data_to_send["userText"] in ["", None]:
731  self.data_to_send["userText"] = "Tag '%s' uploaded from CondDBFW client." % list(self.data_to_send["destinationTags"].keys())[0]
732 
733  self._outputter.write("Sending metadata to server - see server_side_log at server_side_logs/upload_log_%s for details on metadata processing on server side."\
734  % self.upload_session_id)
735 
736  # sent the HTTPs request to the server
737  url_data = {"database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id, "tier0_response" : self.data_to_send.get("tier0_response")}
738  request = url_query(url=self._SERVICE_URL + "upload_metadata/", url_data=url_data, body=json.dumps(self.data_to_send))
739  response = request.send()
740  self._outputter.write("Response received - conditions upload process complete.")
741  return response
def uploads.uploader.send_payloads (   self,
  hashes,
  upload_session_id 
)
Send a list of payloads corresponding to hashes we got from the SQLite file and filtered by asking the server.

Definition at line 636 of file uploads.py.

References errors.check_response(), uploads.uploader.check_response_for_error_key(), querying.connect(), data_sources.json_data_node.make(), uploads.uploader.send_blob(), and uploads.uploader.sqlite_file_name.

637  def send_payloads(self, hashes, upload_session_id):
638  """
639  Send a list of payloads corresponding to hashes we got from the SQLite file and filtered by asking the server.
640  """
641  # if we have no hashes, we can't send anything
642  # but don't exit since it might mean all the Payloads were already on the server
643  if len(hashes) == 0:
644  self._outputter.write("No hashes to send - moving to metadata upload.")
645  return True
646  else:
647  self._outputter.write("Sending payloads of hashes not found:")
648  # construct connection string for local SQLite database file
649  database = ("sqlite://%s" % os.path.abspath(self.sqlite_file_name)) if type(self.sqlite_file_name) == str else self.sqlite_file_name
650  # create CondDBFW connection that maps blobs - as we need to query for payload BLOBs (disabled by default in CondDBFW)
651  self._outputter.write("\tConnecting to input SQLite database.")
652  con = querying.connect(database, map_blobs=True)
653 
654  # query for the Payloads
655  self._outputter.write("\tGetting Payloads from SQLite database based on list of hashes.")
656  byte_hashes = [bytes(h, 'utf-8') for h in hashes]
657  payloads = con.payload(hash=byte_hashes)
658  # if we get a single Payload back, put it in a list and turn it into a json_list
659  if payloads and payloads.__class__ != data_sources.json_list:
660  payloads = data_sources.json_data_node.make([payloads])
661 
662  # close the session with the SQLite database file - we won't use it again
663  con.close_session()
664 
665  # if found some Payloads, send them
666  if payloads:
667  # Note: there is an edge case in which the SQLite file could have been queried
668  # to delete the Payloads since we queried it for IOV hashes. This may be handled in the next iteration.
669  # send http post with data blob in body, and everything else as URL parameters
670  # convert Payload to a dictionary - we can put most of this into the URL of the HTTPs request
671  dicts = payloads.as_dicts()
672  self._outputter.write("Uploading Payload BLOBs:")
673 
674  # for each payload, send the BLOB to the server
675  for n, payload in enumerate(dicts):
676  self._outputter.write("\t(%d/%d) Sending payload with hash '%s'." % (n+1, len(dicts), payload["hash"]))
677  response = self.send_blob(payload, upload_session_id)
678  # check response for errors
679  no_error = self.check_response_for_error_key(response, exit_if_error=True)
680  if not(no_error):
681  return False
682  self._outputter.write("\tPayload sent - moving to next one.")
683  self._outputter.write("All Payloads uploaded.")
684  return True
685  else:
686  return False
def connect
Definition: querying.py:453
def check_response_for_error_key
Definition: uploads.py:248
def uploads.uploader.upload (   self)
Calls methods that send HTTP requests to the upload server.
Open an upload session on the server - this also gives us a tag lock on the tag being uploaded, if it is available.

Definition at line 340 of file uploads.py.

References uploads.uploader._testing, uploads.uploader.check_response_for_error_key(), uploads.uploader.get_upload_session_id(), and uploads.uploader.upload_session_id.

341  def upload(self):
342  """
343  Calls methods that send HTTP requests to the upload server.
344  """
345 
346  """
347  Open an upload session on the server - this also gives us a tag lock on the tag being uploaded, if it is available.
348  """
349  try:
350 
351  # get upload session, check response for error key
352  upload_session_data = self.get_upload_session_id()
353  no_error = self.check_response_for_error_key(upload_session_data)
354 
355  # if there was an error and we're testing, return False for the testing module
356  if not(no_error) and self._testing:
357  return False
358 
359  self.upload_session_id = upload_session_data["id"]
360  self._outputter.write("Upload session obtained with token '%s'." % self.upload_session_id)
361  self.server_side_log_file = upload_session_data["log_file"]
362 
363  except errors.NoMoreRetriesException as no_more_retries:
364  return self.exit_upload("Ran out of retries opening an upload session, where the limit was 3.")
365  except Exception as e:
366  # something went wrong that we have no specific exception for, so just exit and output the traceback if --debug is set.
367  self._outputter.write(traceback.format_exc(), ignore_verbose=True)
368 
369  if not(self._verbose):
370  self._outputter.write("Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
371  else:
372  self._outputter.write("Something went wrong that isn't handled by code - the traceback is above.")
373 
374  return self.exit_upload()
375 
376  """
377  Only if a value is given for --fcsr-filter, run FCSR filtering on the IOVs locally.
378  """
379  if self.data_to_send["fcsr_filter"] != None:
380  """
381  FCSR Filtering:
382  Filtering the IOVs before we send them by getting the First Conditions Safe Run
383  from the server based on the target synchronization type.
384  """
385  if self.data_to_send["inputTagData"]["time_type"] != "Time":
386  # if we have a time-based tag, we can't do FCSR validation - this is also the case on the server side
387  try:
389  # this function does not return a value, since it just operates on data - so no point checking for an error key
390  # the error key check is done inside the function on the response from the server
391  except errors.NoMoreRetriesException as no_more_retries:
392  return self.exit_upload("Ran out of retries trying to filter IOVs by FCSR from server, where the limit was 3.")
393  except Exception as e:
394  # something went wrong that we have no specific exception for, so just exit and output the traceback if --debug is set.
395  self._outputter.write(traceback.format_exc(), ignore_verbose=True)
396 
397  if not(self._verbose):
398  self._outputter.write("Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
399  else:
400  self._outputter.write("Something went wrong that isn't handled by code - the traceback is above.")
401 
402  return self.exit_upload()
403  else:
404  self._outputter.write("The Tag you're uploading is time-based, so we can't do any FCSR-based validation. FCSR filtering is being skipped.")
405 
406  """
407  Check for the hashes that the server doesn't have - only send these (but in the next step).
408  """
409  try:
410 
411  check_hashes_response = self.get_hashes_to_send(self.upload_session_id)
412  # check for an error key in the response
413  no_error = self.check_response_for_error_key(check_hashes_response)
414 
415  # if there was an error and we're testing, return False for the testing module
416  if not(no_error) and self._testing:
417  return False
418 
419  # finally, check hashes_not_found with hashes not found locally - if there is an intersection, we stop the upload
420  # because if a hash is not found and is not on the server, there is no data to upload
421  all_hashes = [iov["payload_hash"] for iov in self.data_to_send["iovs"]]
422  hashes_not_found = check_hashes_response["hashes_not_found"]
423  hashes_found = list(set(all_hashes) - set(hashes_not_found))
424  self._outputter.write("Checking for IOVs that have no Payload locally or on the server.")
425  # check if any hashes not found on the server is used in the local SQLite database
426  for hash_not_found in hashes_not_found:
427  if hash_not_found in self.hashes_with_no_local_payload:
428  return self.exit_upload("IOV with hash '%s' does not have a Payload locally or on the server. Cannot continue." % hash_not_found)
429 
430  for hash_found in hashes_found:
431  if hash_found in self.hashes_with_no_local_payload:
432  self._outputter.write("Payload with hash %s on server, so can upload IOV." % hash_found)
433 
434  self._outputter.write("All IOVs either come with Payloads or point to a Payload already on the server.")
435 
436  except errors.NoMoreRetriesException as no_more_retries:
437  # for now, just write the log if we get a NoMoreRetriesException
438  return self.exit_upload("Ran out of retries trying to check hashes of payloads to send, where the limit was 3.")
439  except Exception as e:
440  # something went wrong that we have no specific exception for, so just exit and output the traceback if --debug is set.
441  self._outputter.write(traceback.format_exc(), ignore_verbose=True)
442 
443  if not(self._verbose):
444  self._outputter.write("Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
445  else:
446  self._outputter.write("Something went wrong that isn't handled by code - the traceback is above.")
447 
448  return self.exit_upload()
449 
450  """
451  Send the payloads the server told us about in the previous step (returned from get_hashes_to_send)
452  exception handling is done inside this method, since it calls a method itself for each payload.
453  """
454  send_payloads_response = self.send_payloads(check_hashes_response["hashes_not_found"], self.upload_session_id)
455  if self._testing and not(send_payloads_response):
456  return False
457 
458  """
459  Final stage - send metadata to server (since the payloads are there now)
460  if this is successful, once it finished the upload session is closed on the server and the tag lock is released.
461  """
462  try:
463 
464  # note that the response (in send_metadata_response) is already decoded from base64 by the response check decorator
465  send_metadata_response = self.send_metadata(self.upload_session_id)
466  no_error = self.check_response_for_error_key(send_metadata_response)
467  if not(no_error) and self._testing:
468  return False
469 
470  # we have to call this explicitly here since check_response_for_error_key only writes the log file
471  # if an error has occurred, whereas it should always be written here
473 
474  except errors.NoMoreRetriesException as no_more_retries:
475  return self.exit_upload("Ran out of retries trying to send metadata, where the limit was 3.")
476  except Exception as e:
477  # something went wrong that we have no specific exception for, so just exit and output the traceback if --debug is set.
478  self._outputter.write(traceback.format_exc(), ignore_verbose=True)
479 
480  if not(self._verbose):
481  self._outputter.write("Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
482  else:
483  self._outputter.write("Something went wrong that isn't handled by code - the traceback is above.")
484 
485  return self.exit_upload()
486 
487  # close client side log handle
488  self._handle.close()
489 
490  # if we're running the testing script, return True to say the upload has worked
491  if self._testing:
492  return True
hashes_with_no_local_payload
Definition: uploads.py:182
def get_upload_session_id
Definition: uploads.py:494
def check_response_for_error_key
Definition: uploads.py:248
def write_server_side_log
Definition: uploads.py:284
def filter_iovs_by_fcsr
Definition: uploads.py:553
def get_hashes_to_send
Definition: uploads.py:625
def uploads.uploader.write_server_side_log (   self,
  log_data 
)
Given the log data from the server, write it to a client-side log file.

Definition at line 284 of file uploads.py.

References print(), str, uploads.uploader.upload_log_file_name, and uploads.uploader.upload_session_id.

Referenced by uploads.uploader.check_response_for_error_key(), and uploads.uploader.exit_upload().

285  def write_server_side_log(self, log_data):
286  """
287  Given the log data from the server, write it to a client-side log file.
288  """
289  # if the server_side_log directory doesn't exist, create it
290  # without it we can't write the log when we download it from the server
291  if not(os.path.exists(os.path.join(os.getcwd(), "server_side_logs/"))):
292  os.makedirs("server_side_logs/")
293 
294  # directory exists now, write to client-side log file
295  server_log_file_name = None
296  try:
297  # if the upload session does not exist yet, don't try to write the log file
298  if self.upload_session_id == None:
299  raise Exception("No upload session")
300  # create a write handle to the file, decode the log data from base64, write and close
301  server_log_file_name = "server_side_logs/upload_log_%s" % str(self.upload_session_id)
302  handle = open(server_log_file_name, "w")
303  handle.write(base64.b64decode(log_data))
304  handle.close()
305  except Exception as e:
306  # reset log file name to None so we don't try to write it later
307  server_log_file_name = None
308  #self._outputter.write("Couldn't write the server-side log file.\nThis may be because no upload session could be opened.")
309 
310  # tell the user where the log files are
311  # in the next iteration we may just merge the log files and store one log (how it's done in the plotter module)
312  if server_log_file_name != None:
313  print("Log file from server written to '%s'." % server_log_file_name)
314  else:
315  print("No server log file could be written locally.")
316 
317  print("Log file from CondDBFW written to '%s'." % self.upload_log_file_name)
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
def write_server_side_log
Definition: uploads.py:284
#define str(s)

Member Data Documentation

uploads.uploader._debug
private

Definition at line 96 of file uploads.py.

Referenced by FrontierCondition_GT_autoExpress_cfi.Tier0Handler._queryTier0DataSvc(), tier0.Tier0Handler._queryTier0DataSvc(), uploads.uploader.check_response_for_error_key(), FrontierCondition_GT_autoExpress_cfi.Tier0Handler.setDebug(), tier0.Tier0Handler.setDebug(), FrontierCondition_GT_autoExpress_cfi.Tier0Handler.unsetDebug(), and tier0.Tier0Handler.unsetDebug().

uploads.uploader._handle
private

Definition at line 106 of file uploads.py.

uploads.uploader._log_data
private

Definition at line 100 of file uploads.py.

Referenced by uploads.uploader.check_response_for_error_key(), and uploads.uploader.exit_upload().

uploads.uploader._outputter
private

Definition at line 109 of file uploads.py.

uploads.uploader._SERVICE_URL
private

Definition at line 101 of file uploads.py.

Referenced by uploads.uploader.close_upload_session(), uploads.uploader.get_fcsr_from_server(), uploads.uploader.get_hashes_to_send(), uploads.uploader.get_tag_dictionary(), uploads.uploader.get_upload_session_id(), uploads.uploader.send_blob(), and uploads.uploader.send_metadata().

uploads.uploader._testing
private

Definition at line 98 of file uploads.py.

Referenced by uploads.uploader.check_response_for_error_key(), uploads.uploader.exit_upload(), and uploads.uploader.upload().

uploads.uploader._verbose
private

Definition at line 97 of file uploads.py.

Referenced by helpers.CloneSequenceVisitor.clonedSequence(), helpers.CloneTaskVisitor.clonedTask(), MassReplace.MassSearchReplaceAnyInputTagVisitor.doIt(), MassReplace.MassSearchReplaceParamVisitor.doIt(), and ConfigBuilder.ConfigBuilder.MassSearchReplaceProcessNameVisitor.doIt().

uploads.uploader.data_to_send

Definition at line 221 of file uploads.py.

Referenced by uploads.uploader.close_upload_session(), uploads.uploader.filter_iovs_by_fcsr(), uploads.uploader.get_all_hashes(), uploads.uploader.get_fcsr_from_server(), uploads.uploader.get_hashes_to_send(), uploads.uploader.get_upload_session_id(), uploads.uploader.send_blob(), and uploads.uploader.send_metadata().

uploads.uploader.hashes_with_no_local_payload

Definition at line 182 of file uploads.py.

uploads.uploader.input_tag

Definition at line 146 of file uploads.py.

uploads.uploader.metadata_source

Definition at line 118 of file uploads.py.

Referenced by uploads.uploader.get_tag_dictionary().

uploads.uploader.server_side_log_file

Definition at line 360 of file uploads.py.

uploads.uploader.sqlite_file_name

Definition at line 152 of file uploads.py.

Referenced by uploads.uploader.send_payloads().

uploads.uploader.upload_log_file_name

Definition at line 105 of file uploads.py.

Referenced by uploads.uploader.write_server_side_log().

uploads.uploader.upload_session_id

Definition at line 102 of file uploads.py.

Referenced by uploads.uploader.exit_upload(), uploads.uploader.send_metadata(), uploads.uploader.upload(), and uploads.uploader.write_server_side_log().