CMS 3D CMS Logo

 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
uploads.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 """
3 
4 Joshua Dawes - CERN, CMS - The University of Manchester
5 
6 This module holds classes to help with uploading conditions to the drop box web service, which also uses CondDBFW to read and write data.
7 
8 """
9 
10 import os
11 import json
12 import base64
13 from datetime import datetime
14 from urllib.parse import urlencode
15 import math
16 import sys
17 import traceback
18 import netrc
19 
20 from .url_query import url_query
21 from . import models
22 from . import errors
23 from . import data_sources
24 from . import querying
25 from .errors import *
26 from .utils import to_timestamp, to_datetime, friendly_since
27 
28 def friendly_since(time_type, since):
29  """
30  Takes a since and, if it is Run-based expressed as Lumi-based, returns the run number.
31  Otherwise, returns the since without transformations.
32  """
33  if time_type == "Run" and (since & 0xffffff) == 0:
34  return since >> 32
35  else:
36  return since
37 
38 # this is simple, and works for now - if logging requirements change, I will write a logging class to manage logging
39 def log(file_handle, message):
40  """
41  Very simple logging function, used by output class.
42  """
43  file_handle.write("[%s] %s\n" % (to_timestamp(datetime.utcnow()), message))
44 
46  """
47  Find a new client-side log file name.
48 
49  Note: This cannot use the upload session token since logs need to be written before this is opened.
50  However, this can be changed so that the filename that uses the token is written to once
51  it is obtained.
52  """
53  # new id = number of log files + 1
54  # (primitive - matching the hash of the upload session may be a better idea)
55  log_files = [file for file in os.listdir(os.path.join(os.getcwd(), "upload_logs")) if "upload_log" in file]
56  new_id = len(log_files)+1
57  return new_id
58 
59 class output():
60  """
61  Used to control output to the console and to the client-side log.
62  """
63 
64  def __init__(self, log_handle=None, verbose=False):
65  # first time writing progress bar, don't need to go back along the line
67  self._verbose = verbose
68  self._log_handle = log_handle
69 
70  def write(self, message="", ignore_verbose=False):
71  """
72  Write to the console and to the log file held by self.
73  """
74  if ignore_verbose:
75  print(message)
76  elif self._verbose:
77  print(message)
78  if self._log_handle != None:
79  log(self._log_handle, message)
80 
81 class uploader(object):
82  """
83  Upload session controller - creates, tracks, and deletes upload sessions on the server.
84  """
85 
86  def __init__(self, metadata_source=None, debug=False, verbose=False, testing=False, server="https://cms-conddb-dev.cern.ch/cmsDbCondUpload/", **kwargs):
87  """
88  Upload constructor:
89  Given an SQLite file and a Metadata sources, reads into a dictionary read for it to be encoded and uploaded.
90 
91  Note: kwargs is used to capture stray arguments - arguments that do not match keywords will not be used.
92 
93  Note: default value of service_url should be changed for production.
94  """
95  # set private variables
96  self._debug = debug
97  self._verbose = verbose
98  self._testing = testing
99  # initialise server-side log data as empty string - will be replaced when we get a response back from the server
100  self._log_data = ""
101  self._SERVICE_URL = server
102  self.upload_session_id = None
103 
104  # set up client-side log file
105  self.upload_log_file_name = "upload_logs/upload_log_%d" % new_log_file_id()
106  self._handle = open(self.upload_log_file_name, "a")
107 
108  # set up client-side logging object
109  self._outputter = output(verbose=verbose, log_handle=self._handle)
110  self._outputter.write("Using server instance at '%s'." % self._SERVICE_URL)
111 
112  # expect a CondDBFW data_source object for metadata_source
113  if metadata_source == None:
114  # no upload metadat has been given - we cannot continue with the upload
115  self.exit_upload("A source of metadata must be given so CondDBFW knows how to upload conditions.")
116  else:
117  # set up global metadata source variable
118  self.metadata_source = metadata_source.data()
119 
120  # check for the destination tag
121  # this is required whatever type of upload we're performing
122  if self.metadata_source.get("destinationTags") == None:
123  self.exit_upload("No destination Tag was given.")
124  else:
125  if type(self.metadata_source.get("destinationTags")) == dict and list(self.metadata_source.get("destinationTags").keys())[0] == None:
126  self.exit_upload("No destination Tag was given.")
127 
128  # make sure a destination database was given
129  if self.metadata_source.get("destinationDatabase") == None:
130  self.exit_upload("No destination database was given.")
131 
132  # get Conditions metadata
133  if self.metadata_source.get("sourceDB") == None and self.metadata_source.get("hashToUse") == None:
134  """
135  If we have neither an sqlite file nor the command line data
136  """
137  self.exit_upload("You must give either an SQLite database file, or the necessary command line arguments to replace one."\
138  + "\nSee --help for command line argument information.")
139  elif self.metadata_source.get("sourceDB") != None:
140  """
141  We've been given an SQLite file, so try to extract Conditions Metadata based on that and the Upload Metadata in metadata_source
142  We now extract the Tag and IOV data from SQLite. It is added to the dictionary for sending over HTTPs later.
143  """
144 
145  # make sure we have an input tag to look for in the source db
146  self.input_tag = metadata_source.data().get("inputTag")
147  if self.input_tag == None:
148  self.exit_upload("No input Tag name was given.")
149 
150  # set empty dictionary to contain Tag and IOV data from SQLite
151  result_dictionary = {}
152  self.sqlite_file_name = self.metadata_source["sourceDB"]
153  if not(os.path.isfile(self.sqlite_file_name)):
154  self.exit_upload("SQLite file '%s' given doesn't exist." % self.sqlite_file_name)
155  sqlite_con = querying.connect("sqlite://%s" % os.path.abspath(self.sqlite_file_name))
156 
157  self._outputter.write("Getting Tag and IOVs from SQLite database.")
158 
159  # query for Tag, check for existence, then convert to dictionary
160  tag = sqlite_con.tag(name=self.input_tag)
161  if tag == None:
162  self.exit_upload("The source Tag '%s' you gave was not found in the SQLite file." % self.input_tag)
163  tag = tag.as_dicts(convert_timestamps=True)
164 
165  # query for IOVs, check for existence, then convert to dictionaries
166  iovs = sqlite_con.iov(tag_name=self.input_tag)
167  if iovs == None:
168  self.exit_upload("No IOVs found in the SQLite file given for Tag '%s'." % self.input_tag)
169  iovs = iovs.as_dicts(convert_timestamps=True)
170  iovs = [iovs] if type(iovs) != list else iovs
171 
172  """
173  Finally, get the list of all Payload hashes of IOVs,
174  then compute the list of hashes for which there is no Payload for
175  this is used later to decide if we can continue the upload if the Payload was not found on the server.
176  """
177  iovs_for_hashes = sqlite_con.iov(tag_name=self.input_tag)
178  if iovs_for_hashes.__class__ == data_sources.json_list:
179  hashes_of_iovs = iovs_for_hashes.get_members("payload_hash").data()
180  else:
181  hashes_of_iovs = [iovs_for_hashes.payload_hash]
182  self.hashes_with_no_local_payload = [payload_hash for payload_hash in hashes_of_iovs if sqlite_con.payload(hash=payload_hash) == None]
183 
184  # close session open on SQLite database file
185  sqlite_con.close_session()
186 
187  elif metadata_source.data().get("hashToUse") != None:
188  """
189  Assume we've been given metadata in the command line (since no sqlite file is there, and we have command line arguments).
190  We now use Tag and IOV data from command line. It is added to the dictionary for sending over HTTPs later.
191  """
192 
193  # set empty dictionary to contain Tag and IOV data from command line
194  result_dictionary = {}
195 
196  now = to_timestamp(datetime.utcnow())
197  # tag dictionary will be taken from the server
198  # this does not require any authentication
199  tag = self.get_tag_dictionary()
201  iovs = [{"tag_name" : self.metadata_source["destinationTag"], "since" : self.metadata_source["since"], "payload_hash" : self.metadata_source["hashToUse"],\
202  "insertion_time" : now}]
203 
204  # hashToUse cannot be stored locally (no sqlite file is given), so register it as not found
205  self.hashes_with_no_local_payload = [self.metadata_source["hashToUse"]]
206 
207  # Note: normal optimisations will still take place - since the hash checking stage can tell if hashToUse does not exist on the server side
208 
209  # if the source Tag is run-based, convert sinces to lumi-based sinces with lumi-section = 0
210  if tag["time_type"] == "Run":
211  for (i, iov) in enumerate(iovs):
212  iovs[i]["since"] = iovs[i]["since"] << 32
213 
214  result_dictionary = {"inputTagData" : tag, "iovs" : iovs}
215 
216  # add command line arguments to dictionary
217  # remembering that metadata_source is a json_dict object
218  result_dictionary.update(metadata_source.data())
219 
220  # store in instance variable
221  self.data_to_send = result_dictionary
222 
223  # if the since doesn't exist, take the first since from the list of IOVs
224  if result_dictionary.get("since") == None:
225  result_dictionary["since"] = sorted(iovs, key=lambda iov : iov["since"])[0]["since"]
226  elif self.data_to_send["inputTagData"]["time_type"] == "Run":
227  # Tag time_type says IOVs use Runs for sinces, so we convert to Lumi-based for uniform processing
228  self.data_to_send["since"] = self.data_to_send["since"] << 32
229 
230  """
231  TODO - Settle on a single destination tag format.
232  """
233  # look for deprecated metadata entries - give warnings
234  # Note - we only really support this format
235  try:
236  if type(result_dictionary["destinationTags"]) == dict:
237  self._outputter.write("WARNING: Multiple destination tags in a single metadata source is deprecated.")
238  except Exception as e:
239  self._outputter.write("ERROR: %s" % str(e))
240 
241  @check_response(check="json")
243  url_data = {"tag_name" : self.metadata_source["destinationTag"], "database" : self.metadata_source["destinationDatabase"]}
244  request = url_query(url=self._SERVICE_URL + "get_tag_dictionary/", url_data=url_data)
245  response = request.send()
246  return response
247 
248  def check_response_for_error_key(self, response_dict, exit_if_error=True):
249  """
250  Checks the decoded response of an HTTP request to the server.
251  If it is a dictionary, and one of its keys is "error", the server returned an error
252  """
253  # if the decoded response data is a dictionary and has an error key in it, we should display an error and its traceback
254  if type(response_dict) == dict and "error" in list(response_dict.keys()):
255  splitter_string = "\n%s\n" % ("-"*50)
256  self._outputter.write("\nERROR: %s" % splitter_string, ignore_verbose=True)
257  self._outputter.write(response_dict["error"], ignore_verbose=True)
258 
259  # if the user has given the --debug flag, show the traceback as well
260  if self._debug:
261  # suggest to the user to email this to db upload experts
262  self._outputter.write("\nTRACEBACK (since --debug is set):%s" % splitter_string, ignore_verbose=True)
263  if response_dict.get("traceback") != None:
264  self._outputter.write(response_dict["traceback"], ignore_verbose=True)
265  else:
266  self._outputter.write("No traceback was returned from the server.", ignore_verbose=True)
267  else:
268  self._outputter.write("Use the --debug option to show the traceback of this error.", ignore_verbose=True)
269 
270  # write server side log to client side (if we have an error from creating an upload session, the log is in its initial state (""))
271  # if an error has occurred on the server side, a log will have been written
272  self.write_server_side_log(response_dict.get("log_data"))
273 
274  if exit_if_error:
275  if self._testing:
276  return False
277  else:
278  exit()
279  elif not("error" in list(response_dict.keys())) and "log_data" in list(response_dict.keys()):
280  # store the log data, if it's there, in memory - this is used if a request times out and we don't get any log data back
281  self._log_data = response_dict["log_data"]
282  return True
283 
284  def write_server_side_log(self, log_data):
285  """
286  Given the log data from the server, write it to a client-side log file.
287  """
288  # if the server_side_log directory doesn't exist, create it
289  # without it we can't write the log when we download it from the server
290  if not(os.path.exists(os.path.join(os.getcwd(), "server_side_logs/"))):
291  os.makedirs("server_side_logs/")
292 
293  # directory exists now, write to client-side log file
294  server_log_file_name = None
295  try:
296  # if the upload session does not exist yet, don't try to write the log file
297  if self.upload_session_id == None:
298  raise Exception("No upload session")
299  # create a write handle to the file, decode the log data from base64, write and close
300  server_log_file_name = "server_side_logs/upload_log_%s" % str(self.upload_session_id)
301  handle = open(server_log_file_name, "w")
302  handle.write(base64.b64decode(log_data))
303  handle.close()
304  except Exception as e:
305  # reset log file name to None so we don't try to write it later
306  server_log_file_name = None
307  #self._outputter.write("Couldn't write the server-side log file.\nThis may be because no upload session could be opened.")
308 
309  # tell the user where the log files are
310  # in the next iteration we may just merge the log files and store one log (how it's done in the plotter module)
311  if server_log_file_name != None:
312  print("Log file from server written to '%s'." % server_log_file_name)
313  else:
314  print("No server log file could be written locally.")
315 
316  print("Log file from CondDBFW written to '%s'." % self.upload_log_file_name)
317 
318  def exit_upload(self, message=None):
319  """
320  Used to exit the script - which only happens if an error has occurred.
321  If the --testing flag was passed by the user, we should return False for failure, and not exit
322  """
323  if self.upload_session_id != None:
324  # only try to close the upload session if an upload session has been obtained
325  response = self.close_upload_session(self.upload_session_id)
326  no_error = self.check_response_for_error_key(response)
327  # if no error was found in the upload session closure request,
328  # we still have to write the server side log
329  if no_error:
331  # close client-side log handle
332  self._handle.close()
333  if message != None:
334  print("\n%s\n" % message)
335  if self._testing:
336  return False
337  else:
338  exit()
339 
340  def upload(self):
341  """
342  Calls methods that send HTTP requests to the upload server.
343  """
344 
345  """
346  Open an upload session on the server - this also gives us a tag lock on the tag being uploaded, if it is available.
347  """
348  try:
349 
350  # get upload session, check response for error key
351  upload_session_data = self.get_upload_session_id()
352  no_error = self.check_response_for_error_key(upload_session_data)
353 
354  # if there was an error and we're testing, return False for the testing module
355  if not(no_error) and self._testing:
356  return False
357 
358  self.upload_session_id = upload_session_data["id"]
359  self._outputter.write("Upload session obtained with token '%s'." % self.upload_session_id)
360  self.server_side_log_file = upload_session_data["log_file"]
361 
362  except errors.NoMoreRetriesException as no_more_retries:
363  return self.exit_upload("Ran out of retries opening an upload session, where the limit was 3.")
364  except Exception as e:
365  # something went wrong that we have no specific exception for, so just exit and output the traceback if --debug is set.
366  self._outputter.write(traceback.format_exc(), ignore_verbose=True)
367 
368  if not(self._verbose):
369  self._outputter.write("Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
370  else:
371  self._outputter.write("Something went wrong that isn't handled by code - the traceback is above.")
372 
373  return self.exit_upload()
374 
375  """
376  Only if a value is given for --fcsr-filter, run FCSR filtering on the IOVs locally.
377  """
378  if self.data_to_send["fcsr_filter"] != None:
379  """
380  FCSR Filtering:
381  Filtering the IOVs before we send them by getting the First Conditions Safe Run
382  from the server based on the target synchronization type.
383  """
384  if self.data_to_send["inputTagData"]["time_type"] != "Time":
385  # if we have a time-based tag, we can't do FCSR validation - this is also the case on the server side
386  try:
388  # this function does not return a value, since it just operates on data - so no point checking for an error key
389  # the error key check is done inside the function on the response from the server
390  except errors.NoMoreRetriesException as no_more_retries:
391  return self.exit_upload("Ran out of retries trying to filter IOVs by FCSR from server, where the limit was 3.")
392  except Exception as e:
393  # something went wrong that we have no specific exception for, so just exit and output the traceback if --debug is set.
394  self._outputter.write(traceback.format_exc(), ignore_verbose=True)
395 
396  if not(self._verbose):
397  self._outputter.write("Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
398  else:
399  self._outputter.write("Something went wrong that isn't handled by code - the traceback is above.")
400 
401  return self.exit_upload()
402  else:
403  self._outputter.write("The Tag you're uploading is time-based, so we can't do any FCSR-based validation. FCSR filtering is being skipped.")
404 
405  """
406  Check for the hashes that the server doesn't have - only send these (but in the next step).
407  """
408  try:
409 
410  check_hashes_response = self.get_hashes_to_send(self.upload_session_id)
411  # check for an error key in the response
412  no_error = self.check_response_for_error_key(check_hashes_response)
413 
414  # if there was an error and we're testing, return False for the testing module
415  if not(no_error) and self._testing:
416  return False
417 
418  # finally, check hashes_not_found with hashes not found locally - if there is an intersection, we stop the upload
419  # because if a hash is not found and is not on the server, there is no data to upload
420  all_hashes = [iov["payload_hash"] for iov in self.data_to_send["iovs"]]
421  hashes_not_found = check_hashes_response["hashes_not_found"]
422  hashes_found = list(set(all_hashes) - set(hashes_not_found))
423  self._outputter.write("Checking for IOVs that have no Payload locally or on the server.")
424  # check if any hashes not found on the server is used in the local SQLite database
425  for hash_not_found in hashes_not_found:
426  if hash_not_found in self.hashes_with_no_local_payload:
427  return self.exit_upload("IOV with hash '%s' does not have a Payload locally or on the server. Cannot continue." % hash_not_found)
428 
429  for hash_found in hashes_found:
430  if hash_found in self.hashes_with_no_local_payload:
431  self._outputter.write("Payload with hash %s on server, so can upload IOV." % hash_found)
432 
433  self._outputter.write("All IOVs either come with Payloads or point to a Payload already on the server.")
434 
435  except errors.NoMoreRetriesException as no_more_retries:
436  # for now, just write the log if we get a NoMoreRetriesException
437  return self.exit_upload("Ran out of retries trying to check hashes of payloads to send, where the limit was 3.")
438  except Exception as e:
439  # something went wrong that we have no specific exception for, so just exit and output the traceback if --debug is set.
440  self._outputter.write(traceback.format_exc(), ignore_verbose=True)
441 
442  if not(self._verbose):
443  self._outputter.write("Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
444  else:
445  self._outputter.write("Something went wrong that isn't handled by code - the traceback is above.")
446 
447  return self.exit_upload()
448 
449  """
450  Send the payloads the server told us about in the previous step (returned from get_hashes_to_send)
451  exception handling is done inside this method, since it calls a method itself for each payload.
452  """
453  send_payloads_response = self.send_payloads(check_hashes_response["hashes_not_found"], self.upload_session_id)
454  if self._testing and not(send_payloads_response):
455  return False
456 
457  """
458  Final stage - send metadata to server (since the payloads are there now)
459  if this is successful, once it finished the upload session is closed on the server and the tag lock is released.
460  """
461  try:
462 
463  # note that the response (in send_metadata_response) is already decoded from base64 by the response check decorator
464  send_metadata_response = self.send_metadata(self.upload_session_id)
465  no_error = self.check_response_for_error_key(send_metadata_response)
466  if not(no_error) and self._testing:
467  return False
468 
469  # we have to call this explicitly here since check_response_for_error_key only writes the log file
470  # if an error has occurred, whereas it should always be written here
472 
473  except errors.NoMoreRetriesException as no_more_retries:
474  return self.exit_upload("Ran out of retries trying to send metadata, where the limit was 3.")
475  except Exception as e:
476  # something went wrong that we have no specific exception for, so just exit and output the traceback if --debug is set.
477  self._outputter.write(traceback.format_exc(), ignore_verbose=True)
478 
479  if not(self._verbose):
480  self._outputter.write("Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
481  else:
482  self._outputter.write("Something went wrong that isn't handled by code - the traceback is above.")
483 
484  return self.exit_upload()
485 
486  # close client side log handle
487  self._handle.close()
488 
489  # if we're running the testing script, return True to say the upload has worked
490  if self._testing:
491  return True
492 
493  @check_response(check="json")
495  """
496  Open an upload session on the server, and get a unique token back that we can use to authenticate for all future requests,
497  as long as the upload session is still open.
498  """
499  self._outputter.write("Getting upload session.")
500 
501  # send password in the body so it can be encrypted over https
502  # username and password are taken from the netrc file
503  # at this point, the value in username_or_token is always a username, since
504  # this method's end result is obtaining a token.
505  body_data = base64.b64encode(json.dumps(
506  {
507  "destinationTag" : list(self.data_to_send["destinationTags"].keys())[0],
508  "username_or_token" : self.data_to_send["username"],
509  "password" : self.data_to_send["password"]
510  }
511  ).encode('UTF-8'))
512 
513  url_data = {"database" : self.data_to_send["destinationDatabase"]}
514 
515  query = url_query(url=self._SERVICE_URL + "get_upload_session/", body=body_data, url_data=url_data)
516  response = query.send()
517  return response
518 
519  @check_response(check="json")
520  def close_upload_session(self, upload_session_id):
521  """
522  Close an upload session on the server by calling its close_upload_session end-point.
523  This is done if there is an error on the client-side.
524  """
525  self._outputter.write("An error occurred - closing the upload session on the server.")
526  url_data = {"database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id}
527  query = url_query(url=self._SERVICE_URL + "close_upload_session/", url_data=url_data)
528  response = query.send()
529  return response
530 
531  @check_response(check="json")
532  def get_fcsr_from_server(self, upload_session_id):
533  """
534  Execute the HTTPs request to ask the server for the FCSR.
535 
536  Note: we do this in a separate function we so we can do the decoding check for json data with check_response.
537  """
538  # tiny amount of client-side logic here - all of the work is done on the server
539  # tier0_response uses get() so if the key isn't present, we default to None
540  # tier0_response is for replaying uploads from the old upload service, with knowledge of the tier0 response
541  # when those uploads happened.
542  url_data = {
543  "database" : self.data_to_send["destinationDatabase"],
544  "upload_session_id" : upload_session_id,
545  "destinationTag" : list(self.data_to_send["destinationTags"].keys())[0],
546  "sourceTagSync" : self.data_to_send["fcsr_filter"],
547  "tier0_response" : self.data_to_send.get("tier0_response")
548  }
549  query = url_query(url=self._SERVICE_URL + "get_fcsr/", url_data=url_data)
550  result = query.send()
551  return result
552 
553  def filter_iovs_by_fcsr(self, upload_session_id):
554  """
555  Ask for the server for the FCSR based on the synchronization type of the source Tag.
556  Then, modify the IOVs (possibly remove some) based on the FCSR we received.
557  This is useful in the case that most IOVs have different payloads, and our FCSR is close to the end of the range the IOVs cover.
558  """
559  self._outputter.write("Getting the First Condition Safe Run for the current sync type.")
560 
561  fcsr_data = self.get_fcsr_from_server(upload_session_id)
562  fcsr = fcsr_data["fcsr"]
563  fcsr_changed = fcsr_data["fcsr_changed"]
564  new_sync = fcsr_data["new_sync"]
565 
566  if fcsr_changed:
567  self._outputter.write("Synchronization '%s' given was changed to '%s' to match destination Tag." % (self.data_to_send["fcsr_filter"], new_sync))
568 
569  self._outputter.write("Synchronization '%s' gave FCSR %d for FCSR Filtering."\
570  % (self.data_to_send["fcsr_filter"], friendly_since(self.data_to_send["inputTagData"]["time_type"], fcsr)))
571 
572  """
573  There may be cases where this assumption is not correct (that we can reassign since if fcsr > since)
574  Only set since to fcsr from server if the fcsr is further along than the user is trying to upload to
575  Note: this applies to run, lumi and timestamp run_types.
576  """
577 
578  # if the fcsr is above the since given by the user, we need to set the user since to the fcsr
579  if fcsr > self.data_to_send["since"]:
580  # check if we're uploading to offline sync - if so, then user since must be >= fcsr, so we should report an error
581  if self.data_to_send["fcsr_filter"].lower() == "offline":
582  self._outputter.write("If you're uploading to offline, you can't upload to a since < FCSR.\nNo upload has been processed.")
583  self.exit_upload()
584  self.data_to_send["since"] = fcsr
585 
586  self._outputter.write("Final FCSR after comparison with FCSR received from server is %d."\
587  % friendly_since(self.data_to_send["inputTagData"]["time_type"], int(self.data_to_send["since"])))
588 
589  """
590  Post validation processing assuming destination since is now valid.
591 
592  Because we don't have an sqlite database to query (everything's in a dictionary),
593  we have to go through the IOVs manually find the greatest since that's less than
594  the destination since.
595 
596  Purpose of this algorithm: move any IOV sinces that we can use up to the fcsr without leaving a hole in the Conditions coverage
597  """
598 
599  max_since_below_dest = self.data_to_send["iovs"][0]["since"]
600  for (i, iov) in enumerate(self.data_to_send["iovs"]):
601  if self.data_to_send["iovs"][i]["since"] <= self.data_to_send["since"] and self.data_to_send["iovs"][i]["since"] > max_since_below_dest:
602  max_since_below_dest = self.data_to_send["iovs"][i]["since"]
603 
604  # only select iovs that have sinces >= max_since_below_dest
605  # and then shift any IOVs left to the destination since
606  self.data_to_send["iovs"] = [iov for iov in self.data_to_send["iovs"] if iov["since"] >= max_since_below_dest]
607  for (i, iov) in enumerate(self.data_to_send["iovs"]):
608  if self.data_to_send["iovs"][i]["since"] < self.data_to_send["since"]:
609  self.data_to_send["iovs"][i]["since"] = self.data_to_send["since"]
610 
611  # modify insertion_time of iovs
612  new_time = to_timestamp(datetime.utcnow())
613  for (i, iov) in enumerate(self.data_to_send["iovs"]):
614  self.data_to_send["iovs"][i]["insertion_time"] = new_time
615 
616  def get_all_hashes(self):
617  """
618  Get all the hashes from the dictionary of IOVs we have from the SQLite file.
619  """
620  self._outputter.write("\tGetting list of all hashes found in SQLite database.")
621  hashes = [iov["payload_hash"] for iov in self.data_to_send["iovs"]]
622  return hashes
623 
624  @check_response(check="json")
625  def get_hashes_to_send(self, upload_session_id):
626  """
627  Get the hashes of the payloads we want to send that the server doesn't have yet.
628  """
629  self._outputter.write("Getting list of hashes that the server does not have Payloads for, to send to server.")
630  post_data = json.dumps(self.get_all_hashes())
631  url_data = {"database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id}
632  query = url_query(url=self._SERVICE_URL + "check_hashes/", url_data=url_data, body=post_data)
633  response = query.send()
634  return response
635 
636  def send_payloads(self, hashes, upload_session_id):
637  """
638  Send a list of payloads corresponding to hashes we got from the SQLite file and filtered by asking the server.
639  """
640  # if we have no hashes, we can't send anything
641  # but don't exit since it might mean all the Payloads were already on the server
642  if len(hashes) == 0:
643  self._outputter.write("No hashes to send - moving to metadata upload.")
644  return True
645  else:
646  self._outputter.write("Sending payloads of hashes not found:")
647  # construct connection string for local SQLite database file
648  database = ("sqlite://%s" % os.path.abspath(self.sqlite_file_name)) if type(self.sqlite_file_name) == str else self.sqlite_file_name
649  # create CondDBFW connection that maps blobs - as we need to query for payload BLOBs (disabled by default in CondDBFW)
650  self._outputter.write("\tConnecting to input SQLite database.")
651  con = querying.connect(database, map_blobs=True)
652 
653  # query for the Payloads
654  self._outputter.write("\tGetting Payloads from SQLite database based on list of hashes.")
655  byte_hashes = [bytes(h, 'utf-8') for h in hashes]
656  payloads = con.payload(hash=byte_hashes)
657  # if we get a single Payload back, put it in a list and turn it into a json_list
658  if payloads and payloads.__class__ != data_sources.json_list:
659  payloads = data_sources.json_data_node.make([payloads])
660 
661  # close the session with the SQLite database file - we won't use it again
662  con.close_session()
663 
664  # if found some Payloads, send them
665  if payloads:
666  # Note: there is an edge case in which the SQLite file could have been queried
667  # to delete the Payloads since we queried it for IOV hashes. This may be handled in the next iteration.
668  # send http post with data blob in body, and everything else as URL parameters
669  # convert Payload to a dictionary - we can put most of this into the URL of the HTTPs request
670  dicts = payloads.as_dicts()
671  self._outputter.write("Uploading Payload BLOBs:")
672 
673  # for each payload, send the BLOB to the server
674  for n, payload in enumerate(dicts):
675  self._outputter.write("\t(%d/%d) Sending payload with hash '%s'." % (n+1, len(dicts), payload["hash"]))
676  response = self.send_blob(payload, upload_session_id)
677  # check response for errors
678  no_error = self.check_response_for_error_key(response, exit_if_error=True)
679  if not(no_error):
680  return False
681  self._outputter.write("\tPayload sent - moving to next one.")
682  self._outputter.write("All Payloads uploaded.")
683  return True
684  else:
685  return False
686 
687  @check_response(check="json")
688  def send_blob(self, payload, upload_session_id):
689  """
690  Send the BLOB of a payload over HTTP.
691  The BLOB is put in the request body, so no additional processing has to be done on the server side, apart from decoding from base64.
692  """
693  # encode the BLOB data of the Payload to make sure we don't send a character that will influence the HTTPs request
694  blob_data = base64.b64encode(payload["data"])
695 
696  url_data = {"database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id}
697 
698  # construct the data to send in the body and header of the HTTPs request
699  for key in list(payload.keys()):
700  # skip blob
701  if key != "data":
702  if key == "insertion_time":
703  url_data[key] = to_timestamp(payload[key])
704  else:
705  url_data[key] = payload[key]
706 
707  request = url_query(url=self._SERVICE_URL + "store_payload/", url_data=url_data, body=blob_data)
708 
709  # send the request and return the response
710  # Note - the url_query module will handle retries, and will throw a NoMoreRetriesException if it runs out
711  try:
712  request_response = request.send()
713  return request_response
714  except Exception as e:
715  # make sure we don't try again - if a NoMoreRetriesException has been thrown, retries have run out
716  if type(e) == errors.NoMoreRetriesException:
717  self._outputter.write("\t\t\tPayload with hash '%s' was not uploaded because the maximum number of retries was exceeded." % payload["hash"])
718  self._outputter.write("Payload with hash '%s' was not uploaded because the maximum number of retries was exceeded." % payload["hash"])
719  return json.dumps({"error" : str(e), "traceback" : traceback.format_exc()})
720 
721  @check_response(check="json")
722  def send_metadata(self, upload_session_id):
723  """
724  Final part of the upload process - send the Conditions metadata (Tag, IOVs - not upload metadata).
725  The server closes the session (and releases the tag lock) after processing has been completed.
726  """
727 
728  # set user text if it's empty
729  if self.data_to_send["userText"] in ["", None]:
730  self.data_to_send["userText"] = "Tag '%s' uploaded from CondDBFW client." % list(self.data_to_send["destinationTags"].keys())[0]
731 
732  self._outputter.write("Sending metadata to server - see server_side_log at server_side_logs/upload_log_%s for details on metadata processing on server side."\
733  % self.upload_session_id)
734 
735  # sent the HTTPs request to the server
736  url_data = {"database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id, "tier0_response" : self.data_to_send.get("tier0_response")}
737  request = url_query(url=self._SERVICE_URL + "upload_metadata/", url_data=url_data, body=json.dumps(self.data_to_send))
738  response = request.send()
739  self._outputter.write("Response received - conditions upload process complete.")
740  return response
741 
742 if __name__ == "__main__":
743  """
744  This code should only be executed for testing.
745  """
746  import sys
747  from .uploadConditions import parse_arguments
748 
749  print(
750 """
751 This code should only be executed for testing.
752 Any uploads done by the user should be done by calling the uploadConditions.py script.
753 See https://cms-conddb-dev.cern.ch/cmsDbCondUpload for information on how to obtain the correct version.
754 """
755  )
756 
757  upload_metadata = parse_arguments()
758 
759  upload_metadata["sqlite_file"] = upload_metadata.get("sourceDB")
760 
761  # make new dictionary, and copy over everything except "metadata_source"
762  upload_metadata_argument = {}
763  for (key, value) in list(upload_metadata.items()):
764  if key != "metadata_source":
765  upload_metadata_argument[key] = value
766 
767  upload_metadata["metadata_source"] = data_sources.json_data_node.make(upload_metadata_argument)
768 
769  upload_controller = uploader(**upload_metadata)
770 
771  result = upload_controller.upload()
def get_fcsr_from_server
Definition: uploads.py:532
hashes_with_no_local_payload
Definition: uploads.py:182
def get_upload_session_id
Definition: uploads.py:494
def new_log_file_id
Definition: uploads.py:45
current_output_length
Definition: uploads.py:66
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:47
def connect
Definition: querying.py:453
def friendly_since
Definition: uploads.py:28
def log
Definition: uploads.py:39
def get_all_hashes
Definition: uploads.py:616
def check_response_for_error_key
Definition: uploads.py:248
def write_server_side_log
Definition: uploads.py:284
def __init__
Definition: uploads.py:64
def to_timestamp
Definition: conddb_time.py:13
def filter_iovs_by_fcsr
Definition: uploads.py:553
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:79
def get_tag_dictionary
Definition: uploads.py:242
def get_hashes_to_send
Definition: uploads.py:625
#define str(s)
def close_upload_session
Definition: uploads.py:520
def check_response
Definition: errors.py:27