test
CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Pages
List of all members | Public Member Functions | Public Attributes | Private Attributes
uploads.uploader Class Reference
Inheritance diagram for uploads.uploader:

Public Member Functions

def __init__
 
def check_response_for_error_key
 
def close_upload_session
 
def exit_upload
 
def filter_iovs_by_fcsr
 
def get_all_hashes
 
def get_fcsr_from_server
 
def get_hashes_to_send
 
def get_tag_dictionary
 
def get_upload_session_id
 
def send_blob
 
def send_metadata
 
def send_payloads
 
def upload
 
def write_server_side_log
 

Public Attributes

 data_to_send
 
 hashes_with_no_local_payload
 
 input_tag
 
 metadata_source
 
 server_side_log_file
 
 sqlite_file_name
 
 upload_log_file_name
 
 upload_session_id
 

Private Attributes

 _debug
 
 _handle
 
 _log_data
 
 _outputter
 
 _SERVICE_URL
 
 _testing
 
 _verbose
 

Detailed Description

Upload session controller - creates, tracks, and deletes upload sessions on the server.

Definition at line 83 of file uploads.py.

Constructor & Destructor Documentation

def uploads.uploader.__init__ (   self,
  metadata_source = None,
  debug = False,
  verbose = False,
  testing = False,
  server = "https://cms-conddb-dev.cern.ch/cmsDbCondUpload/",
  kwargs 
)
Upload constructor:
Given an SQLite file and a Metadata sources, reads into a dictionary read for it to be encoded and uploaded.

Note: kwargs is used to capture stray arguments - arguments that do not match keywords will not be used.

Note: default value of service_url should be changed for production.

Definition at line 88 of file uploads.py.

88 
89  def __init__(self, metadata_source=None, debug=False, verbose=False, testing=False, server="https://cms-conddb-dev.cern.ch/cmsDbCondUpload/", **kwargs):
90  """
91  Upload constructor:
92  Given an SQLite file and a Metadata sources, reads into a dictionary read for it to be encoded and uploaded.
93 
94  Note: kwargs is used to capture stray arguments - arguments that do not match keywords will not be used.
95 
96  Note: default value of service_url should be changed for production.
97  """
98  # set private variables
99  self._debug = debug
100  self._verbose = verbose
101  self._testing = testing
102  # initialise server-side log data as empty string - will be replaced when we get a response back from the server
103  self._log_data = ""
104  self._SERVICE_URL = server
105  self.upload_session_id = None
106 
107  # set up client-side log file
108  self.upload_log_file_name = "upload_logs/upload_log_%d" % new_log_file_id()
109  self._handle = open(self.upload_log_file_name, "a")
110 
111  # set up client-side logging object
112  self._outputter = output(verbose=verbose, log_handle=self._handle)
113  self._outputter.write("Using server instance at '%s'." % self._SERVICE_URL)
114 
115  # expect a CondDBFW data_source object for metadata_source
116  if metadata_source == None:
117  # no upload metadat has been given - we cannot continue with the upload
118  self.exit_upload("A source of metadata must be given so CondDBFW knows how to upload conditions.")
119  else:
120  # set up global metadata source variable
121  self.metadata_source = metadata_source.data()
122 
123  # check for the destination tag
124  # this is required whatever type of upload we're performing
125  if self.metadata_source.get("destinationTags") == None:
126  self.exit_upload("No destination Tag was given.")
127  else:
128  if type(self.metadata_source.get("destinationTags")) == dict and self.metadata_source.get("destinationTags").keys()[0] == None:
129  self.exit_upload("No destination Tag was given.")
130 
131  # make sure a destination database was given
132  if self.metadata_source.get("destinationDatabase") == None:
133  self.exit_upload("No destination database was given.")
134 
135  # get Conditions metadata
136  if self.metadata_source.get("sourceDB") == None and self.metadata_source.get("hashToUse") == None:
137  """
138  If we have neither an sqlite file nor the command line data
139  """
140  self.exit_upload("You must give either an SQLite database file, or the necessary command line arguments to replace one."\
141  + "\nSee --help for command line argument information.")
142  elif self.metadata_source.get("sourceDB") != None:
143  """
144  We've been given an SQLite file, so try to extract Conditions Metadata based on that and the Upload Metadata in metadata_source
145  We now extract the Tag and IOV data from SQLite. It is added to the dictionary for sending over HTTPs later.
146  """
147 
148  # make sure we have an input tag to look for in the source db
149  self.input_tag = metadata_source.data().get("inputTag")
150  if self.input_tag == None:
151  self.exit_upload("No input Tag name was given.")
152 
153  # set empty dictionary to contain Tag and IOV data from SQLite
154  result_dictionary = {}
155  self.sqlite_file_name = self.metadata_source["sourceDB"]
156  if not(os.path.isfile(self.sqlite_file_name)):
157  self.exit_upload("SQLite file '%s' given doesn't exist." % self.sqlite_file_name)
158  sqlite_con = querying.connect("sqlite://%s" % os.path.abspath(self.sqlite_file_name))
159 
160  self._outputter.write("Getting Tag and IOVs from SQLite database.")
161 
162  # query for Tag, check for existence, then convert to dictionary
163  tag = sqlite_con.tag(name=self.input_tag)
164  if tag == None:
165  self.exit_upload("The source Tag '%s' you gave was not found in the SQLite file." % self.input_tag)
166  tag = tag.as_dicts(convert_timestamps=True)
167 
168  # query for IOVs, check for existence, then convert to dictionaries
169  iovs = sqlite_con.iov(tag_name=self.input_tag)
170  if iovs == None:
171  self.exit_upload("No IOVs found in the SQLite file given for Tag '%s'." % self.input_tag)
172  iovs = iovs.as_dicts(convert_timestamps=True)
173  iovs = [iovs] if type(iovs) != list else iovs
174 
175  """
176  Finally, get the list of all Payload hashes of IOVs,
177  then compute the list of hashes for which there is no Payload for
178  this is used later to decide if we can continue the upload if the Payload was not found on the server.
179  """
180  iovs_for_hashes = sqlite_con.iov(tag_name=self.input_tag)
181  if iovs_for_hashes.__class__ == data_sources.json_list:
182  hashes_of_iovs = iovs_for_hashes.get_members("payload_hash").data()
183  else:
184  hashes_of_iovs = [iovs_for_hashes.payload_hash]
185  self.hashes_with_no_local_payload = [payload_hash for payload_hash in hashes_of_iovs if sqlite_con.payload(hash=payload_hash) == None]
186 
187  # close session open on SQLite database file
188  sqlite_con.close_session()
189 
190  elif metadata_source.data().get("hashToUse") != None:
191  """
192  Assume we've been given metadata in the command line (since no sqlite file is there, and we have command line arguments).
193  We now use Tag and IOV data from command line. It is added to the dictionary for sending over HTTPs later.
194  """
195 
196  # set empty dictionary to contain Tag and IOV data from command line
197  result_dictionary = {}
198 
199  now = to_timestamp(datetime.now())
200  # tag dictionary will be taken from the server
201  # this does not require any authentication
202  tag = self.get_tag_dictionary()
204  iovs = [{"tag_name" : self.metadata_source["destinationTag"], "since" : self.metadata_source["since"], "payload_hash" : self.metadata_source["hashToUse"],\
205  "insertion_time" : now}]
206 
207  # hashToUse cannot be stored locally (no sqlite file is given), so register it as not found
208  self.hashes_with_no_local_payload = [self.metadata_source["hashToUse"]]
209 
210  # Note: normal optimisations will still take place - since the hash checking stage can tell if hashToUse does not exist on the server side
211 
212  # if the source Tag is run-based, convert sinces to lumi-based sinces with lumi-section = 0
213  if tag["time_type"] == "Run":
214  for (i, iov) in enumerate(iovs):
215  iovs[i]["since"] = iovs[i]["since"] << 32
216 
217  result_dictionary = {"inputTagData" : tag, "iovs" : iovs}
218 
219  # add command line arguments to dictionary
220  # remembering that metadata_source is a json_dict object
221  result_dictionary.update(metadata_source.data())
222 
223  # store in instance variable
224  self.data_to_send = result_dictionary
225 
226  # if the since doesn't exist, take the first since from the list of IOVs
227  if result_dictionary.get("since") == None:
228  result_dictionary["since"] = sorted(iovs, key=lambda iov : iov["since"])[0]["since"]
229  elif self.data_to_send["inputTagData"]["time_type"] == "Run":
230  # Tag time_type says IOVs use Runs for sinces, so we convert to Lumi-based for uniform processing
231  self.data_to_send["since"] = self.data_to_send["since"] << 32
232 
233  """
234  TODO - Settle on a single destination tag format.
235  """
236  # look for deprecated metadata entries - give warnings
237  # Note - we only really support this format
238  try:
239  if type(result_dictionary["destinationTags"]) == dict:
240  self._outputter.write("WARNING: Multiple destination tags in a single metadata source is deprecated.")
241  except Exception as e:
242  self._outputter.write("ERROR: %s" % str(e))
hashes_with_no_local_payload
Definition: uploads.py:184
def new_log_file_id
Definition: uploads.py:45
def to_timestamp
Definition: utils.py:6
def connect
Definition: querying.py:450
def check_response_for_error_key
Definition: uploads.py:250
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
def get_tag_dictionary
Definition: uploads.py:244
T get(const Candidate &c)
Definition: component.h:55

Member Function Documentation

def uploads.uploader.check_response_for_error_key (   self,
  response_dict,
  exit_if_error = True 
)
Checks the decoded response of an HTTP request to the server.
If it is a dictionary, and one of its keys is "error", the server returned an error

Definition at line 250 of file uploads.py.

References EcalMatacqAnalyzer._debug, EcalABAnalyzer._debug, EcalLaserAnalyzer2._debug, uploads.uploader._debug, EcalLaserAnalyzer._debug, uploads.uploader._log_data, uploads.uploader._testing, cmsRelvalreport.exit, and uploads.uploader.write_server_side_log().

Referenced by uploads.uploader.exit_upload(), uploads.uploader.send_payloads(), and uploads.uploader.upload().

251  def check_response_for_error_key(self, response_dict, exit_if_error=True):
252  """
253  Checks the decoded response of an HTTP request to the server.
254  If it is a dictionary, and one of its keys is "error", the server returned an error
255  """
256  # if the decoded response data is a dictionary and has an error key in it, we should display an error and its traceback
257  if type(response_dict) == dict and "error" in response_dict.keys():
258  splitter_string = "\n%s\n" % ("-"*50)
259  self._outputter.write("\nERROR: %s" % splitter_string, ignore_verbose=True)
260  self._outputter.write(response_dict["error"], ignore_verbose=True)
261 
262  # if the user has given the --debug flag, show the traceback as well
263  if self._debug:
264  # suggest to the user to email this to db upload experts
265  self._outputter.write("\nTRACEBACK (since --debug is set):%s" % splitter_string, ignore_verbose=True)
266  if response_dict.get("traceback") != None:
267  self._outputter.write(response_dict["traceback"], ignore_verbose=True)
268  else:
269  self._outputter.write("No traceback was returned from the server.", ignore_verbose=True)
270  else:
271  self._outputter.write("Use the --debug option to show the traceback of this error.", ignore_verbose=True)
272 
273  # write server side log to client side (if we have an error from creating an upload session, the log is in its initial state (""))
274  # if an error has occurred on the server side, a log will have been written
275  self.write_server_side_log(response_dict.get("log_data"))
276 
277  if exit_if_error:
278  if self._testing:
279  return False
280  else:
281  exit()
282  elif not("error" in response_dict.keys()) and "log_data" in response_dict.keys():
283  # store the log data, if it's there, in memory - this is used if a request times out and we don't get any log data back
284  self._log_data = response_dict["log_data"]
285  return True
def check_response_for_error_key
Definition: uploads.py:250
def write_server_side_log
Definition: uploads.py:286
def uploads.uploader.close_upload_session (   self,
  upload_session_id 
)
Close an upload session on the server by calling its close_upload_session end-point.
This is done if there is an error on the client-side.

Definition at line 522 of file uploads.py.

References uploads.uploader._SERVICE_URL, errors.check_response(), and uploads.uploader.data_to_send.

Referenced by uploads.uploader.exit_upload().

523  def close_upload_session(self, upload_session_id):
524  """
525  Close an upload session on the server by calling its close_upload_session end-point.
526  This is done if there is an error on the client-side.
527  """
528  self._outputter.write("An error occurred - closing the upload session on the server.")
529  url_data = {"database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id}
530  query = url_query(url=self._SERVICE_URL + "close_upload_session/", url_data=url_data)
531  response = query.send()
532  return response
def close_upload_session
Definition: uploads.py:522
def uploads.uploader.exit_upload (   self,
  message = None 
)
Used to exit the script - which only happens if an error has occurred.
If the --testing flag was passed by the user, we should return False for failure, and not exit

Definition at line 320 of file uploads.py.

References uploads.uploader._log_data, uploads.uploader._testing, uploads.uploader.check_response_for_error_key(), uploads.uploader.close_upload_session(), cmsRelvalreport.exit, reco.print(), uploads.uploader.upload_session_id, and uploads.uploader.write_server_side_log().

Referenced by uploads.uploader.filter_iovs_by_fcsr().

321  def exit_upload(self, message=None):
322  """
323  Used to exit the script - which only happens if an error has occurred.
324  If the --testing flag was passed by the user, we should return False for failure, and not exit
325  """
326  if self.upload_session_id != None:
327  # only try to close the upload session if an upload session has been obtained
328  response = self.close_upload_session(self.upload_session_id)
329  no_error = self.check_response_for_error_key(response)
330  # if no error was found in the upload session closure request,
331  # we still have to write the server side log
332  if no_error:
334  # close client-side log handle
335  self._handle.close()
336  if message != None:
337  print("\n%s\n" % message)
338  if self._testing:
339  return False
340  else:
341  exit()
std::string print(const Track &, edm::Verbosity=edm::Concise)
Track print utility.
Definition: print.cc:10
def check_response_for_error_key
Definition: uploads.py:250
def write_server_side_log
Definition: uploads.py:286
def close_upload_session
Definition: uploads.py:522
def uploads.uploader.filter_iovs_by_fcsr (   self,
  upload_session_id 
)
Ask for the server for the FCSR based on the synchronization type of the source Tag.
Then, modify the IOVs (possibly remove some) based on the FCSR we received.
This is useful in the case that most IOVs have different payloads, and our FCSR is close to the end of the range the IOVs cover.

Definition at line 551 of file uploads.py.

References uploads.uploader.data_to_send, uploads.uploader.exit_upload(), alcazmumu_cfi.filter, uploads.friendly_since(), uploads.uploader.get_fcsr_from_server(), and utils.to_timestamp().

552  def filter_iovs_by_fcsr(self, upload_session_id):
553  """
554  Ask for the server for the FCSR based on the synchronization type of the source Tag.
555  Then, modify the IOVs (possibly remove some) based on the FCSR we received.
556  This is useful in the case that most IOVs have different payloads, and our FCSR is close to the end of the range the IOVs cover.
557  """
558  self._outputter.write("Getting the First Condition Safe Run for the current sync type.")
559 
560  fcsr_data = self.get_fcsr_from_server(upload_session_id)
561  fcsr = fcsr_data["fcsr"]
562  fcsr_changed = fcsr_data["fcsr_changed"]
563  new_sync = fcsr_data["new_sync"]
564 
565  if fcsr_changed:
566  self._outputter.write("Synchronization '%s' given was changed to '%s' to match destination Tag." % (self.data_to_send["fcsr_filter"], new_sync))
567 
568  self._outputter.write("Synchronization '%s' gave FCSR %d for FCSR Filtering."\
569  % (self.data_to_send["fcsr_filter"], friendly_since(self.data_to_send["inputTagData"]["time_type"], fcsr)))
570 
571  """
572  There may be cases where this assumption is not correct (that we can reassign since if fcsr > since)
573  Only set since to fcsr from server if the fcsr is further along than the user is trying to upload to
574  Note: this applies to run, lumi and timestamp run_types.
575  """
576 
577  # if the fcsr is above the since given by the user, we need to set the user since to the fcsr
578  if fcsr > self.data_to_send["since"]:
579  # check if we're uploading to offline sync - if so, then user since must be >= fcsr, so we should report an error
580  if self.data_to_send["fcsr_filter"].lower() == "offline":
581  self._outputter.write("If you're uploading to offline, you can't upload to a since < FCSR.\nNo upload has been processed.")
582  self.exit_upload()
583  self.data_to_send["since"] = fcsr
584 
585  self._outputter.write("Final FCSR after comparison with FCSR received from server is %d."\
586  % friendly_since(self.data_to_send["inputTagData"]["time_type"], int(self.data_to_send["since"])))
587 
588  """
589  Post validation processing assuming destination since is now valid.
590 
591  Because we don't have an sqlite database to query (everything's in a dictionary),
592  we have to go through the IOVs manually find the greatest since that's less than
593  the destination since.
594 
595  Purpose of this algorithm: move any IOV sinces that we can use up to the fcsr without leaving a hole in the Conditions coverage
596  """
597 
598  max_since_below_dest = self.data_to_send["iovs"][0]["since"]
599  for (i, iov) in enumerate(self.data_to_send["iovs"]):
600  if self.data_to_send["iovs"][i]["since"] <= self.data_to_send["since"] and self.data_to_send["iovs"][i]["since"] > max_since_below_dest:
601  max_since_below_dest = self.data_to_send["iovs"][i]["since"]
602 
603  # only select iovs that have sinces >= max_since_below_dest
604  # and then shift any IOVs left to the destination since
605  self.data_to_send["iovs"] = filter(lambda iov : iov["since"] >= max_since_below_dest, self.data_to_send["iovs"])
606  for (i, iov) in enumerate(self.data_to_send["iovs"]):
607  if self.data_to_send["iovs"][i]["since"] < self.data_to_send["since"]:
608  self.data_to_send["iovs"][i]["since"] = self.data_to_send["since"]
609 
610  # modify insertion_time of iovs
611  new_time = to_timestamp(datetime.now())
612  for (i, iov) in enumerate(self.data_to_send["iovs"]):
613  self.data_to_send["iovs"][i]["insertion_time"] = new_time
def get_fcsr_from_server
Definition: uploads.py:534
def to_timestamp
Definition: utils.py:6
def friendly_since
Definition: uploads.py:28
def filter_iovs_by_fcsr
Definition: uploads.py:551
def uploads.uploader.get_all_hashes (   self)
Get all the hashes from the dictionary of IOVs we have from the SQLite file.

Definition at line 614 of file uploads.py.

References errors.check_response(), and uploads.uploader.data_to_send.

Referenced by uploads.uploader.get_hashes_to_send().

615  def get_all_hashes(self):
616  """
617  Get all the hashes from the dictionary of IOVs we have from the SQLite file.
618  """
619  self._outputter.write("\tGetting list of all hashes found in SQLite database.")
620  hashes = map(lambda iov : iov["payload_hash"], self.data_to_send["iovs"])
621  return hashes
def get_all_hashes
Definition: uploads.py:614
def uploads.uploader.get_fcsr_from_server (   self,
  upload_session_id 
)
Execute the HTTPs request to ask the server for the FCSR.

Note: we do this in a separate function we so we can do the decoding check for json data with check_response.

Definition at line 534 of file uploads.py.

References uploads.uploader._SERVICE_URL, uploads.uploader.data_to_send, and relativeConstraints.keys.

Referenced by uploads.uploader.filter_iovs_by_fcsr().

535  def get_fcsr_from_server(self, upload_session_id):
536  """
537  Execute the HTTPs request to ask the server for the FCSR.
538 
539  Note: we do this in a separate function we so we can do the decoding check for json data with check_response.
540  """
541  # tiny amount of client-side logic here - all of the work is done on the server
542  url_data = {
543  "database" : self.data_to_send["destinationDatabase"],
544  "upload_session_id" : upload_session_id,
545  "destinationTag" : self.data_to_send["destinationTags"].keys()[0],
546  "sourceTagSync" : self.data_to_send["fcsr_filter"]
547  }
548  query = url_query(url=self._SERVICE_URL + "get_fcsr/", url_data=url_data)
549  result = query.send()
550  return result
def get_fcsr_from_server
Definition: uploads.py:534
def uploads.uploader.get_hashes_to_send (   self,
  upload_session_id 
)
Get the hashes of the payloads we want to send that the server doesn't have yet.

Definition at line 623 of file uploads.py.

References uploads.uploader._SERVICE_URL, uploads.uploader.data_to_send, and uploads.uploader.get_all_hashes().

624  def get_hashes_to_send(self, upload_session_id):
625  """
626  Get the hashes of the payloads we want to send that the server doesn't have yet.
627  """
628  self._outputter.write("Getting list of hashes that the server does not have Payloads for, to send to server.")
629  post_data = json.dumps(self.get_all_hashes())
630  url_data = {"database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id}
631  query = url_query(url=self._SERVICE_URL + "check_hashes/", url_data=url_data, body=post_data)
632  response = query.send()
633  return response
def get_all_hashes
Definition: uploads.py:614
def get_hashes_to_send
Definition: uploads.py:623
def uploads.uploader.get_tag_dictionary (   self)

Definition at line 244 of file uploads.py.

References uploads.uploader._SERVICE_URL, and uploads.uploader.metadata_source.

245  def get_tag_dictionary(self):
246  url_data = {"tag_name" : self.metadata_source["destinationTag"], "database" : self.metadata_source["destinationDatabase"]}
247  request = url_query(url=self._SERVICE_URL + "get_tag_dictionary/", url_data=url_data)
248  response = request.send()
249  return response
def get_tag_dictionary
Definition: uploads.py:244
def uploads.uploader.get_upload_session_id (   self)
Open an upload session on the server, and get a unique token back that we can use to authenticate for all future requests,
as long as the upload session is still open.

Definition at line 496 of file uploads.py.

References uploads.uploader._SERVICE_URL, errors.check_response(), uploads.uploader.data_to_send, and relativeConstraints.keys.

Referenced by uploads.uploader.upload().

497  def get_upload_session_id(self):
498  """
499  Open an upload session on the server, and get a unique token back that we can use to authenticate for all future requests,
500  as long as the upload session is still open.
501  """
502  self._outputter.write("Getting upload session.")
503 
504  # send password in the body so it can be encrypted over https
505  # username and password are taken from the netrc file
506  # at this point, the value in username_or_token is always a username, since
507  # this method's end result is obtaining a token.
508  body_data = base64.b64encode(json.dumps(
509  {
510  "destinationTag" : self.data_to_send["destinationTags"].keys()[0],
511  "username_or_token" : self.data_to_send["username"],
512  "password" : self.data_to_send["password"]
513  }
514  ))
515 
516  url_data = {"database" : self.data_to_send["destinationDatabase"]}
517 
518  query = url_query(url=self._SERVICE_URL + "get_upload_session/", body=body_data, url_data=url_data)
519  response = query.send()
520  return response
def get_upload_session_id
Definition: uploads.py:496
def uploads.uploader.send_blob (   self,
  payload,
  upload_session_id 
)
Send the BLOB of a payload over HTTP.
The BLOB is put in the request body, so no additional processing has to be done on the server side, apart from decoding from base64.

Definition at line 685 of file uploads.py.

References uploads.uploader._SERVICE_URL, errors.check_response(), uploads.uploader.data_to_send, and utils.to_timestamp().

Referenced by uploads.uploader.send_payloads().

686  def send_blob(self, payload, upload_session_id):
687  """
688  Send the BLOB of a payload over HTTP.
689  The BLOB is put in the request body, so no additional processing has to be done on the server side, apart from decoding from base64.
690  """
691  # encode the BLOB data of the Payload to make sure we don't send a character that will influence the HTTPs request
692  blob_data = base64.b64encode(payload["data"])
693 
694  url_data = {"database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id}
695 
696  # construct the data to send in the body and header of the HTTPs request
697  for key in payload.keys():
698  # skip blob
699  if key != "data":
700  if key == "insertion_time":
701  url_data[key] = to_timestamp(payload[key])
702  else:
703  url_data[key] = payload[key]
704 
705  request = url_query(url=self._SERVICE_URL + "store_payload/", url_data=url_data, body=blob_data)
706 
707  # send the request and return the response
708  # Note - the url_query module will handle retries, and will throw a NoMoreRetriesException if it runs out
709  try:
710  request_response = request.send()
711  return request_response
712  except Exception as e:
713  # make sure we don't try again - if a NoMoreRetriesException has been thrown, retries have run out
714  if type(e) == errors.NoMoreRetriesException:
715  self._outputter.write("\t\t\tPayload with hash '%s' was not uploaded because the maximum number of retries was exceeded." % payload["hash"])
716  self._outputter.write("Payload with hash '%s' was not uploaded because the maximum number of retries was exceeded." % payload["hash"])
717  return json.dumps({"error" : str(e), "traceback" : traceback.format_exc()})
def to_timestamp
Definition: utils.py:6
def uploads.uploader.send_metadata (   self,
  upload_session_id 
)
Final part of the upload process - send the Conditions metadata (Tag, IOVs - not upload metadata).
The server closes the session (and releases the tag lock) after processing has been completed.

Definition at line 719 of file uploads.py.

References uploads.uploader._SERVICE_URL, uploads.uploader.data_to_send, relativeConstraints.keys, reco.print(), and uploads.uploader.upload_session_id.

720  def send_metadata(self, upload_session_id):
721  """
722  Final part of the upload process - send the Conditions metadata (Tag, IOVs - not upload metadata).
723  The server closes the session (and releases the tag lock) after processing has been completed.
724  """
725 
726  # set user text if it's empty
727  if self.data_to_send["userText"] in ["", None]:
728  self.data_to_send["userText"] = "Tag '%s' uploaded from CondDBFW client." % self.data_to_send["destinationTags"].keys()[0]
729 
730  self._outputter.write("Sending metadata to server - see server_side_log at server_side_logs/upload_log_%s for details on metadata processing on server side."\
731  % self.upload_session_id)
732 
733  # sent the HTTPs request to the server
734  url_data = {"database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id}
735  request = url_query(url=self._SERVICE_URL + "upload_metadata/", url_data=url_data, body=json.dumps(self.data_to_send))
736  response = request.send()
737  self._outputter.write("Response received - conditions upload process complete.")
738  return response
def uploads.uploader.send_payloads (   self,
  hashes,
  upload_session_id 
)
Send a list of payloads corresponding to hashes we got from the SQLite file and filtered by asking the server.

Definition at line 634 of file uploads.py.

References errors.check_response(), uploads.uploader.check_response_for_error_key(), querying.connect(), data_sources.json_data_node.make(), uploads.uploader.send_blob(), and uploads.uploader.sqlite_file_name.

635  def send_payloads(self, hashes, upload_session_id):
636  """
637  Send a list of payloads corresponding to hashes we got from the SQLite file and filtered by asking the server.
638  """
639  # if we have no hashes, we can't send anything
640  # but don't exit since it might mean all the Payloads were already on the server
641  if len(hashes) == 0:
642  self._outputter.write("No hashes to send - moving to metadata upload.")
643  return True
644  else:
645  self._outputter.write("Sending payloads of hashes not found:")
646  # construct connection string for local SQLite database file
647  database = ("sqlite://%s" % os.path.abspath(self.sqlite_file_name)) if type(self.sqlite_file_name) == str else self.sqlite_file_name
648  # create CondDBFW connection that maps blobs - as we need to query for payload BLOBs (disabled by default in CondDBFW)
649  self._outputter.write("\tConnecting to input SQLite database.")
650  con = querying.connect(database, map_blobs=True)
651 
652  # query for the Payloads
653  self._outputter.write("\tGetting Payloads from SQLite database based on list of hashes.")
654  payloads = con.payload(hash=hashes)
655  # if we get a single Payload back, put it in a list and turn it into a json_list
656  if payloads.__class__ != data_sources.json_list:
657  payloads = data_sources.json_data_node.make([payloads])
658 
659  # close the session with the SQLite database file - we won't use it again
660  con.close_session()
661 
662  # if found some Payloads, send them
663  if payloads:
664  # Note: there is an edge case in which the SQLite file could have been queried
665  # to delete the Payloads since we queried it for IOV hashes. This may be handled in the next iteration.
666  # send http post with data blob in body, and everything else as URL parameters
667  # convert Payload to a dictionary - we can put most of this into the URL of the HTTPs request
668  dicts = payloads.as_dicts()
669  self._outputter.write("Uploading Payload BLOBs:")
670 
671  # for each payload, send the BLOB to the server
672  for n, payload in enumerate(dicts):
673  self._outputter.write("\t(%d/%d) Sending payload with hash '%s'." % (n+1, len(dicts), payload["hash"]))
674  response = self.send_blob(payload, upload_session_id)
675  # check response for errors
676  no_error = self.check_response_for_error_key(response, exit_if_error=True)
677  if not(no_error):
678  return False
679  self._outputter.write("\tPayload sent - moving to next one.")
680  self._outputter.write("All Payloads uploaded.")
681  return True
682  else:
683  return False
def connect
Definition: querying.py:450
def check_response_for_error_key
Definition: uploads.py:250
def uploads.uploader.upload (   self)
Calls methods that send HTTP requests to the upload server.
Open an upload session on the server - this also gives us a tag lock on the tag being uploaded, if it is available.

Definition at line 342 of file uploads.py.

References uploads.uploader._testing, uploads.uploader.check_response_for_error_key(), uploads.uploader.get_upload_session_id(), and uploads.uploader.upload_session_id.

343  def upload(self):
344  """
345  Calls methods that send HTTP requests to the upload server.
346  """
347 
348  """
349  Open an upload session on the server - this also gives us a tag lock on the tag being uploaded, if it is available.
350  """
351  try:
352 
353  # get upload session, check response for error key
354  upload_session_data = self.get_upload_session_id()
355  no_error = self.check_response_for_error_key(upload_session_data)
356 
357  # if there was an error and we're testing, return False for the testing module
358  if not(no_error) and self._testing:
359  return False
360 
361  self.upload_session_id = upload_session_data["id"]
362  self._outputter.write("Upload session obtained with token '%s'." % self.upload_session_id)
363  self.server_side_log_file = upload_session_data["log_file"]
364 
365  except errors.NoMoreRetriesException as no_more_retries:
366  return self.exit_upload("Ran out of retries opening an upload session, where the limit was 3.")
367  except Exception as e:
368  # something went wrong that we have no specific exception for, so just exit and output the traceback if --debug is set.
369  self._outputter.write(traceback.format_exc(), ignore_verbose=True)
370 
371  if not(self._verbose):
372  self._outputter.write("Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
373  else:
374  self._outputter.write("Something went wrong that isn't handled by code - the traceback is above.")
375 
376  return self.exit_upload()
377 
378  """
379  Only if a value is given for --fcsr-filter, run FCSR filtering on the IOVs locally.
380  """
381  if self.data_to_send["fcsr_filter"] != None:
382  """
383  FCSR Filtering:
384  Filtering the IOVs before we send them by getting the First Conditions Safe Run
385  from the server based on the target synchronization type.
386  """
387  if self.data_to_send["inputTagData"]["time_type"] != "Time":
388  # if we have a time-based tag, we can't do FCSR validation - this is also the case on the server side
389  try:
391  # this function does not return a value, since it just operates on data - so no point checking for an error key
392  # the error key check is done inside the function on the response from the server
393  except errors.NoMoreRetriesException as no_more_retries:
394  return self.exit_upload("Ran out of retries trying to filter IOVs by FCSR from server, where the limit was 3.")
395  except Exception as e:
396  # something went wrong that we have no specific exception for, so just exit and output the traceback if --debug is set.
397  self._outputter.write(traceback.format_exc(), ignore_verbose=True)
398 
399  if not(self._verbose):
400  self._outputter.write("Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
401  else:
402  self._outputter.write("Something went wrong that isn't handled by code - the traceback is above.")
403 
404  return self.exit_upload()
405  else:
406  self._outputter.write("The Tag you're uploading is time-based, so we can't do any FCSR-based validation. FCSR filtering is being skipped.")
407 
408  """
409  Check for the hashes that the server doesn't have - only send these (but in the next step).
410  """
411  try:
412 
413  check_hashes_response = self.get_hashes_to_send(self.upload_session_id)
414  # check for an error key in the response
415  no_error = self.check_response_for_error_key(check_hashes_response)
416 
417  # if there was an error and we're testing, return False for the testing module
418  if not(no_error) and self._testing:
419  return False
420 
421  # finally, check hashes_not_found with hashes not found locally - if there is an intersection, we stop the upload
422  # because if a hash is not found and is not on the server, there is no data to upload
423  all_hashes = map(lambda iov : iov["payload_hash"], self.data_to_send["iovs"])
424  hashes_not_found = check_hashes_response["hashes_not_found"]
425  hashes_found = list(set(all_hashes) - set(hashes_not_found))
426  self._outputter.write("Checking for IOVs that have no Payload locally or on the server.")
427  # check if any hashes not found on the server is used in the local SQLite database
428  for hash_not_found in hashes_not_found:
429  if hash_not_found in self.hashes_with_no_local_payload:
430  return self.exit_upload("IOV with hash '%s' does not have a Payload locally or on the server. Cannot continue." % hash_not_found)
431 
432  for hash_found in hashes_found:
433  if hash_found in self.hashes_with_no_local_payload:
434  self._outputter.write("Payload with hash %s on server, so can upload IOV." % hash_found)
435 
436  self._outputter.write("All IOVs either come with Payloads or point to a Payload already on the server.")
437 
438  except errors.NoMoreRetriesException as no_more_retries:
439  # for now, just write the log if we get a NoMoreRetriesException
440  return self.exit_upload("Ran out of retries trying to check hashes of payloads to send, where the limit was 3.")
441  except Exception as e:
442  # something went wrong that we have no specific exception for, so just exit and output the traceback if --debug is set.
443  self._outputter.write(traceback.format_exc(), ignore_verbose=True)
444 
445  if not(self._verbose):
446  self._outputter.write("Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
447  else:
448  self._outputter.write("Something went wrong that isn't handled by code - the traceback is above.")
449 
450  return self.exit_upload()
451 
452  """
453  Send the payloads the server told us about in the previous step (returned from get_hashes_to_send)
454  exception handling is done inside this method, since it calls a method itself for each payload.
455  """
456  send_payloads_response = self.send_payloads(check_hashes_response["hashes_not_found"], self.upload_session_id)
457  if self._testing and not(send_payloads_response):
458  return False
459 
460  """
461  Final stage - send metadata to server (since the payloads are there now)
462  if this is successful, once it finished the upload session is closed on the server and the tag lock is released.
463  """
464  try:
465 
466  # note that the response (in send_metadata_response) is already decoded from base64 by the response check decorator
467  send_metadata_response = self.send_metadata(self.upload_session_id)
468  no_error = self.check_response_for_error_key(send_metadata_response)
469  if not(no_error) and self._testing:
470  return False
471 
472  # we have to call this explicitly here since check_response_for_error_key only writes the log file
473  # if an error has occurred, whereas it should always be written here
475 
476  except errors.NoMoreRetriesException as no_more_retries:
477  return self.exit_upload("Ran out of retries trying to send metadata, where the limit was 3.")
478  except Exception as e:
479  # something went wrong that we have no specific exception for, so just exit and output the traceback if --debug is set.
480  self._outputter.write(traceback.format_exc(), ignore_verbose=True)
481 
482  if not(self._verbose):
483  self._outputter.write("Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
484  else:
485  self._outputter.write("Something went wrong that isn't handled by code - the traceback is above.")
486 
487  return self.exit_upload()
488 
489  # close client side log handle
490  self._handle.close()
491 
492  # if we're running the testing script, return True to say the upload has worked
493  if self._testing:
494  return True
hashes_with_no_local_payload
Definition: uploads.py:184
def get_upload_session_id
Definition: uploads.py:496
def check_response_for_error_key
Definition: uploads.py:250
def write_server_side_log
Definition: uploads.py:286
def filter_iovs_by_fcsr
Definition: uploads.py:551
def get_hashes_to_send
Definition: uploads.py:623
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run
def uploads.uploader.write_server_side_log (   self,
  log_data 
)
Given the log data from the server, write it to a client-side log file.

Definition at line 286 of file uploads.py.

References reco.print(), uploads.uploader.upload_log_file_name, and uploads.uploader.upload_session_id.

Referenced by uploads.uploader.check_response_for_error_key(), and uploads.uploader.exit_upload().

287  def write_server_side_log(self, log_data):
288  """
289  Given the log data from the server, write it to a client-side log file.
290  """
291  # if the server_side_log directory doesn't exist, create it
292  # without it we can't write the log when we download it from the server
293  if not(os.path.exists(os.path.join(os.getcwd(), "server_side_logs/"))):
294  os.makedirs("server_side_logs/")
295 
296  # directory exists now, write to client-side log file
297  server_log_file_name = None
298  try:
299  # if the upload session does not exist yet, don't try to write the log file
300  if self.upload_session_id == None:
301  raise Exception("No upload session")
302  # create a write handle to the file, decode the log data from base64, write and close
303  server_log_file_name = "server_side_logs/upload_log_%s" % str(self.upload_session_id)
304  handle = open(server_log_file_name, "w")
305  handle.write(base64.b64decode(log_data))
306  handle.close()
307  except Exception as e:
308  # reset log file name to None so we don't try to write it later
309  server_log_file_name = None
310  #self._outputter.write("Couldn't write the server-side log file.\nThis may be because no upload session could be opened.")
311 
312  # tell the user where the log files are
313  # in the next iteration we may just merge the log files and store one log (how it's done in the plotter module)
314  if server_log_file_name != None:
315  print("Log file from server written to '%s'." % server_log_file_name)
316  else:
317  print("No server log file could be written locally.")
318 
319  print("Log file from CondDBFW written to '%s'." % self.upload_log_file_name)
std::string print(const Track &, edm::Verbosity=edm::Concise)
Track print utility.
Definition: print.cc:10
def write_server_side_log
Definition: uploads.py:286

Member Data Documentation

uploads.uploader._debug
private

Definition at line 98 of file uploads.py.

Referenced by FrontierCondition_GT_autoExpress_cfi.Tier0Handler._queryTier0DataSvc(), uploads.uploader.check_response_for_error_key(), cmsPerfSuite.PerfSuite.optionParse(), cmsPerfSuite.PerfSuite.runCmsReport(), FrontierCondition_GT_autoExpress_cfi.Tier0Handler.setDebug(), and FrontierCondition_GT_autoExpress_cfi.Tier0Handler.unsetDebug().

uploads.uploader._handle
private

Definition at line 108 of file uploads.py.

uploads.uploader._log_data
private

Definition at line 102 of file uploads.py.

Referenced by uploads.uploader.check_response_for_error_key(), and uploads.uploader.exit_upload().

uploads.uploader._outputter
private

Definition at line 111 of file uploads.py.

uploads.uploader._SERVICE_URL
private

Definition at line 103 of file uploads.py.

Referenced by uploads.uploader.close_upload_session(), uploads.uploader.get_fcsr_from_server(), uploads.uploader.get_hashes_to_send(), uploads.uploader.get_tag_dictionary(), uploads.uploader.get_upload_session_id(), uploads.uploader.send_blob(), and uploads.uploader.send_metadata().

uploads.uploader._testing
private

Definition at line 100 of file uploads.py.

Referenced by uploads.uploader.check_response_for_error_key(), uploads.uploader.exit_upload(), and uploads.uploader.upload().

uploads.uploader._verbose
private

Definition at line 99 of file uploads.py.

Referenced by HiHelperTools.MassSearchReplaceAnyInputTagVisitor.doIt(), helpers.MassSearchReplaceAnyInputTagVisitor.doIt(), ConfigBuilder.ConfigBuilder.MassSearchReplaceProcessNameVisitor.doIt(), HiHelperTools.MassSearchReplaceParamVisitor.enter(), helpers.MassSearchReplaceParamVisitor.enter(), cmsPerfSuite.PerfSuite.mkCandleDir(), cmsPerfSuite.PerfSuite.optionParse(), cmsPerfSuite.PerfSuite.printFlush(), cmsPerfSuite.PerfSuite.runcmd(), and cmsPerfSuite.PerfSuite.runCmdSet().

uploads.uploader.data_to_send

Definition at line 223 of file uploads.py.

Referenced by uploads.uploader.close_upload_session(), uploads.uploader.filter_iovs_by_fcsr(), uploads.uploader.get_all_hashes(), uploads.uploader.get_fcsr_from_server(), uploads.uploader.get_hashes_to_send(), uploads.uploader.get_upload_session_id(), uploads.uploader.send_blob(), and uploads.uploader.send_metadata().

uploads.uploader.hashes_with_no_local_payload

Definition at line 184 of file uploads.py.

uploads.uploader.input_tag

Definition at line 148 of file uploads.py.

uploads.uploader.metadata_source

Definition at line 120 of file uploads.py.

Referenced by uploads.uploader.get_tag_dictionary().

uploads.uploader.server_side_log_file

Definition at line 362 of file uploads.py.

uploads.uploader.sqlite_file_name

Definition at line 154 of file uploads.py.

Referenced by uploads.uploader.send_payloads().

uploads.uploader.upload_log_file_name

Definition at line 107 of file uploads.py.

Referenced by uploads.uploader.write_server_side_log().

uploads.uploader.upload_session_id

Definition at line 104 of file uploads.py.

Referenced by uploads.uploader.exit_upload(), uploads.uploader.send_metadata(), uploads.uploader.upload(), and uploads.uploader.write_server_side_log().