CMS 3D CMS Logo

uploads.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 """
3 
4 Joshua Dawes - CERN, CMS - The University of Manchester
5 
6 This module holds classes to help with uploading conditions to the drop box web service, which also uses CondDBFW to read and write data.
7 
8 """
9 from __future__ import print_function
10 from __future__ import absolute_import
11 
12 import os
13 import json
14 import base64
15 from datetime import datetime
16 from urllib import urlencode
17 import math
18 import sys
19 import traceback
20 import netrc
21 
22 from .url_query import url_query
23 from . import models
24 from . import errors
25 from . import data_sources
26 from . import querying
27 from .errors import *
28 from .utils import to_timestamp, to_datetime, friendly_since
29 
30 def friendly_since(time_type, since):
31  """
32  Takes a since and, if it is Run-based expressed as Lumi-based, returns the run number.
33  Otherwise, returns the since without transformations.
34  """
35  if time_type == "Run" and (since & 0xffffff) == 0:
36  return since >> 32
37  else:
38  return since
39 
40 # this is simple, and works for now - if logging requirements change, I will write a logging class to manage logging
41 def log(file_handle, message):
42  """
43  Very simple logging function, used by output class.
44  """
45  file_handle.write("[%s] %s\n" % (to_timestamp(datetime.now()), message))
46 
48  """
49  Find a new client-side log file name.
50 
51  Note: This cannot use the upload session token since logs need to be written before this is opened.
52  However, this can be changed so that the filename that uses the token is written to once
53  it is obtained.
54  """
55  # new id = number of log files + 1
56  # (primitive - matching the hash of the upload session may be a better idea)
57  log_files = [file for file in os.listdir(os.path.join(os.getcwd(), "upload_logs")) if "upload_log" in file]
58  new_id = len(log_files)+1
59  return new_id
60 
61 class output():
62  INFO = 0
63  ERROR = 1
64  WARNING = 2
65  VERBOSE = 3
66  DEBUG = 4
67 
68  """
69  Used to control output to the console and to the client-side log.
70  """
71 
72  def __init__(self, log_handle=None, verbose=False, debug=False):
73  # first time writing progress bar, don't need to go back along the line
75  self._verbose = verbose
76  self._log_handle = log_handle
77  self._debug = debug
78  self.labels = ["INFO", "ERROR", "WARNING", "VERBOSE", "DEBUG"]
79 
80  def write(self, message="", level=INFO):
81  """
82  Write to the console and to the log file held by self.
83  """
84  message = "[%s] %s: %s"%(datetime.now(), self.labels[level], message)
85  if self._verbose:
86  if level == output.DEBUG and self._debug:
87  print(message)
88  elif level < output.DEBUG:
89  print(message)
90  elif self._debug:
91  if level == output.DEBUG:
92  print(message)
93  elif level <= output.ERROR:
94  print(message)
95  if self._log_handle != None:
96  log(self._log_handle, message)
97 
99  """
100  Upload session controller - creates, tracks, and deletes upload sessions on the server.
101  """
102 
103  def __init__(self, metadata_source=None, debug=False, verbose=False, testing=False, server="https://cms-conddb-dev.cern.ch/cmsDbCondUpload/", **kwargs):
104  """
105  Upload constructor:
106  Given an SQLite file and a Metadata sources, reads into a dictionary read for it to be encoded and uploaded.
107 
108  Note: kwargs is used to capture stray arguments - arguments that do not match keywords will not be used.
109 
110  Note: default value of service_url should be changed for production.
111  """
112  # set private variables
113  self._debug = debug
114  self._verbose = verbose
115  self._testing = testing
116  # initialise server-side log data as empty string - will be replaced when we get a response back from the server
117  self._log_data = ""
118  self._SERVICE_URL = server
119  self.upload_session_id = None
120 
121  # set up client-side log file
122  self.upload_log_file_name = "upload_logs/upload_log_%d" % new_log_file_id()
123  self._handle = open(self.upload_log_file_name, "a")
124 
125  # set up client-side logging object
126  self._outputter = output(verbose=verbose, log_handle=self._handle, debug = self._debug)
127  self._outputter.write("Using server instance at '%s'." % self._SERVICE_URL)
128 
129  # expect a CondDBFW data_source object for metadata_source
130  if metadata_source == None:
131  # no upload metadat has been given - we cannot continue with the upload
132  self.exit_upload("A source of metadata must be given so CondDBFW knows how to upload conditions.")
133  else:
134  # set up global metadata source variable
135  self.metadata_source = metadata_source.data()
136 
137  # check for the destination tag
138  # this is required whatever type of upload we're performing
139  if self.metadata_source.get("destinationTags") == None:
140  self.exit_upload("No destination Tag was given.")
141  else:
142  if isinstance(self.metadata_source.get("destinationTags"), dict) and self.metadata_source.get("destinationTags").keys()[0] == None:
143  self.exit_upload("No destination Tag was given.")
144 
145  # make sure a destination database was given
146  if self.metadata_source.get("destinationDatabase") == None:
147  self.exit_upload("No destination database was given.")
148 
149  # get Conditions metadata
150  if self.metadata_source.get("sourceDB") == None and self.metadata_source.get("hashToUse") == None:
151  """
152  If we have neither an sqlite file nor the command line data
153  """
154  self.exit_upload("You must give either an SQLite database file, or the necessary command line arguments to replace one."\
155  + "\nSee --help for command line argument information.")
156  elif self.metadata_source.get("sourceDB") != None:
157  """
158  We've been given an SQLite file, so try to extract Conditions Metadata based on that and the Upload Metadata in metadata_source
159  We now extract the Tag and IOV data from SQLite. It is added to the dictionary for sending over HTTPs later.
160  """
161 
162  # make sure we have an input tag to look for in the source db
163  self.input_tag = metadata_source.data().get("inputTag")
164  if self.input_tag == None:
165  self.exit_upload("No input Tag name was given.")
166 
167  # set empty dictionary to contain Tag and IOV data from SQLite
168  result_dictionary = {}
169  self.sqlite_file_name = self.metadata_source["sourceDB"]
170  if not(os.path.isfile(self.sqlite_file_name)):
171  self.exit_upload("SQLite file '%s' given doesn't exist." % self.sqlite_file_name)
172  sqlite_con = querying.connect("sqlite://%s" % os.path.abspath(self.sqlite_file_name))
173 
174  self._outputter.write("Getting Tag and IOVs from SQLite database.", output.VERBOSE)
175 
176  # query for Tag, check for existence, then convert to dictionary
177  tag = sqlite_con.tag(name=self.input_tag)
178  if tag == None:
179  self.exit_upload("The source Tag '%s' you gave was not found in the SQLite file." % self.input_tag)
180  tag = tag.as_dicts(convert_timestamps=True)
181 
182  # query for IOVs, check for existence, then convert to dictionaries
183  iovs = sqlite_con.iov(tag_name=self.input_tag)
184  if iovs == None:
185  self.exit_upload("No IOVs found in the SQLite file given for Tag '%s'." % self.input_tag)
186  iovs = iovs.as_dicts(convert_timestamps=True)
187  iovs = [iovs] if not isinstance(iovs, list) else iovs
188 
189  """
190  Finally, get the list of all Payload hashes of IOVs,
191  then compute the list of hashes for which there is no Payload for
192  this is used later to decide if we can continue the upload if the Payload was not found on the server.
193  """
194  iovs_for_hashes = sqlite_con.iov(tag_name=self.input_tag)
195  if iovs_for_hashes.__class__ == data_sources.json_list:
196  hashes_of_iovs = iovs_for_hashes.get_members("payload_hash").data()
197  else:
198  hashes_of_iovs = [iovs_for_hashes.payload_hash]
199  self.hashes_with_no_local_payload = [payload_hash for payload_hash in hashes_of_iovs if sqlite_con.payload(hash=payload_hash) == None]
200 
201  # close session open on SQLite database file
202  sqlite_con.close_session()
203 
204  elif metadata_source.data().get("hashToUse") != None:
205  """
206  Assume we've been given metadata in the command line (since no sqlite file is there, and we have command line arguments).
207  We now use Tag and IOV data from command line. It is added to the dictionary for sending over HTTPs later.
208  """
209 
210  # set empty dictionary to contain Tag and IOV data from command line
211  result_dictionary = {}
212 
213  now = to_timestamp(datetime.now())
214  # tag dictionary will be taken from the server
215  # this does not require any authentication
216  tag = self.get_tag_dictionary()
218  iovs = [{"tag_name" : self.metadata_source["destinationTag"], "since" : self.metadata_source["since"], "payload_hash" : self.metadata_source["hashToUse"],\
219  "insertion_time" : now}]
220 
221  # hashToUse cannot be stored locally (no sqlite file is given), so register it as not found
222  self.hashes_with_no_local_payload = [self.metadata_source["hashToUse"]]
223 
224  # Note: normal optimisations will still take place - since the hash checking stage can tell if hashToUse does not exist on the server side
225 
226  # if the source Tag is run-based, convert sinces to lumi-based sinces with lumi-section = 0
227  if tag["time_type"] == "Run":
228  for (i, iov) in enumerate(iovs):
229  iovs[i]["since"] = iovs[i]["since"] << 32
230 
231  result_dictionary = {"inputTagData" : tag, "iovs" : iovs}
232 
233  # add command line arguments to dictionary
234  # remembering that metadata_source is a json_dict object
235  result_dictionary.update(metadata_source.data())
236 
237  # store in instance variable
238  self.data_to_send = result_dictionary
239 
240  # if the since doesn't exist, take the first since from the list of IOVs
241  if result_dictionary.get("since") == None:
242  result_dictionary["since"] = sorted(iovs, key=lambda iov : iov["since"])[0]["since"]
243  elif self.data_to_send["inputTagData"]["time_type"] == "Run":
244  # Tag time_type says IOVs use Runs for sinces, so we convert to Lumi-based for uniform processing
245  self.data_to_send["since"] = self.data_to_send["since"] << 32
246 
247  @check_response(check="json")
249  url_data = {"tag_name" : self.metadata_source["destinationTag"], "database" : self.metadata_source["destinationDatabase"]}
250  request = url_query(url=self._SERVICE_URL + "get_tag_dictionary/", url_data=url_data)
251  response = request.send()
252  return response
253 
254  def check_response_for_error_key(self, response_dict, exit_if_error=True):
255  """
256  Checks the decoded response of an HTTP request to the server.
257  If it is a dictionary, and one of its keys is "error", the server returned an error
258  """
259  # if the decoded response data is a dictionary and has an error key in it, we should display an error and its traceback
260  if isinstance(response_dict, dict) and "error" in response_dict.keys():
261  splitter_string = "\n%s\n" % ("-"*50)
262  self._outputter.write("\nERROR: %s" % splitter_string, output.ERROR)
263  self._outputter.write(response_dict["error"], output.ERROR)
264 
265  # if the user has given the --debug flag, show the traceback as well
266  if self._debug:
267  # suggest to the user to email this to db upload experts
268  self._outputter.write("\nTRACEBACK (since --debug is set):%s" % splitter_string, output.DEBUG)
269  if response_dict.get("traceback") != None:
270  self._outputter.write(response_dict["traceback"], output.DEBUG)
271  else:
272  self._outputter.write("No traceback was returned from the server.", output.DEBUG)
273  else:
274  self._outputter.write("Use the --debug option to show the traceback of this error.", output.INFO)
275 
276  # write server side log to client side (if we have an error from creating an upload session, the log is in its initial state (""))
277  # if an error has occurred on the server side, a log will have been written
278  self.write_server_side_log(response_dict.get("log_data"))
279 
280  if exit_if_error:
281  if self._testing:
282  return False
283  else:
284  exit()
285  elif not("error" in response_dict.keys()) and "log_data" in response_dict.keys():
286  # store the log data, if it's there, in memory - this is used if a request times out and we don't get any log data back
287  self._log_data = response_dict["log_data"][2:-1]
288  return True
289 
290  def write_server_side_log(self, log_data):
291  """
292  Given the log data from the server, write it to a client-side log file.
293  """
294  # if the server_side_log directory doesn't exist, create it
295  # without it we can't write the log when we download it from the server
296  if not(os.path.exists(os.path.join(os.getcwd(), "server_side_logs/"))):
297  os.makedirs("server_side_logs/")
298 
299  # directory exists now, write to client-side log file
300  server_log_file_name = None
301  try:
302  # if the upload session does not exist yet, don't try to write the log file
303  if self.upload_session_id == None:
304  raise Exception("No upload session")
305  # create a write handle to the file, decode the log data from base64, write and close
306  server_log_file_name = "server_side_logs/upload_log_%s" % str(self.upload_session_id)
307  handle = open(server_log_file_name, "w")
308  handle.write(base64.b64decode(log_data))
309  handle.close()
310  except Exception as e:
311  # reset log file name to None so we don't try to write it later
312  server_log_file_name = None
313  #self._outputter.write("Couldn't write the server-side log file.\nThis may be because no upload session could be opened.")
314 
315  # tell the user where the log files are
316  # in the next iteration we may just merge the log files and store one log (how it's done in the plotter module)
317  if self._SERVICE_URL.startswith("https://cms-conddb-dev.cern.ch/cmsDbCondUpload"):
318  logUrl = "https://cms-conddb.cern.ch/cmsDbBrowser/logs/show_cond_uploader_log/Prep/%s"%self.upload_session_id
319  else:
320  logUrl = "https://cms-conddb.cern.ch/cmsDbBrowser/logs/show_cond_uploader_log/Prod/%s"%self.upload_session_id
321 
322  print("[%s] INFO: Server log found at %s." % (datetime.now(), logUrl))
323  if server_log_file_name != None:
324  print("[%s] INFO: Local copy of server log file at '%s'." % (datetime.now(), server_log_file_name))
325  else:
326  print("No server log file could be written locally.")
327 
328  print("[%s] INFO: Local copy of client log file at '%s'." % (datetime.now(), self.upload_log_file_name))
329 
330  def exit_upload(self, message=None):
331  """
332  Used to exit the script - which only happens if an error has occurred.
333  If the --testing flag was passed by the user, we should return False for failure, and not exit
334  """
335  if self.upload_session_id != None:
336  # only try to close the upload session if an upload session has been obtained
337  response = self.close_upload_session(self.upload_session_id)
338  no_error = self.check_response_for_error_key(response)
339  # if no error was found in the upload session closure request,
340  # we still have to write the server side log
341  if no_error:
343  # close client-side log handle
344  self._handle.close()
345  if message != None:
346  print("\n%s\n" % message)
347  if self._testing:
348  return False
349  else:
350  exit()
351 
352  def upload(self):
353  """
354  Calls methods that send HTTP requests to the upload server.
355  """
356 
357  """
358  Open an upload session on the server - this also gives us a tag lock on the tag being uploaded, if it is available.
359  """
360  try:
361 
362  # get upload session, check response for error key
363  upload_session_data = self.get_upload_session_id()
364  no_error = self.check_response_for_error_key(upload_session_data)
365 
366  # if there was an error and we're testing, return False for the testing module
367  if not(no_error) and self._testing:
368  return False
369 
370  self.upload_session_id = upload_session_data["id"]
371  self._outputter.write("Upload session obtained with token '%s'." % self.upload_session_id, output.DEBUG)
372  self.server_side_log_file = upload_session_data["log_file"]
373 
374  except errors.NoMoreRetriesException as no_more_retries:
375  return self.exit_upload("Ran out of retries opening an upload session, where the limit was 3.")
376  except Exception as e:
377  # something went wrong that we have no specific exception for, so just exit and output the traceback if --debug is set.
378  self._outputter.write(traceback.format_exc(), output.ERROR)
379 
380  if not(self._verbose):
381  self._outputter.write("Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
382  else:
383  self._outputter.write("Something went wrong that isn't handled by code - the traceback is above.")
384 
385  return self.exit_upload()
386 
387  """
388  Only if a value is given for --fcsr-filter, run FCSR filtering on the IOVs locally.
389  """
390  if self.data_to_send["fcsr_filter"] != None:
391  """
392  FCSR Filtering:
393  Filtering the IOVs before we send them by getting the First Conditions Safe Run
394  from the server based on the target synchronization type.
395  """
396  if self.data_to_send["inputTagData"]["time_type"] != "Time":
397  # if we have a time-based tag, we can't do FCSR validation - this is also the case on the server side
398  try:
400  # this function does not return a value, since it just operates on data - so no point checking for an error key
401  # the error key check is done inside the function on the response from the server
402  except errors.NoMoreRetriesException as no_more_retries:
403  return self.exit_upload("Ran out of retries trying to filter IOVs by FCSR from server, where the limit was 3.")
404  except Exception as e:
405  # something went wrong that we have no specific exception for, so just exit and output the traceback if --debug is set.
406  self._outputter.write(traceback.format_exc(), output.ERROR)
407 
408  if not(self._verbose):
409  self._outputter.write("Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
410  else:
411  self._outputter.write("Something went wrong that isn't handled by code - the traceback is above.")
412 
413  return self.exit_upload()
414  else:
415  self._outputter.write("The Tag you're uploading is time-based, so we can't do any FCSR-based validation. FCSR filtering is being skipped.")
416 
417  """
418  Check for the hashes that the server doesn't have - only send these (but in the next step).
419  """
420  try:
421 
422  check_hashes_response = self.get_hashes_to_send(self.upload_session_id)
423  # check for an error key in the response
424  no_error = self.check_response_for_error_key(check_hashes_response)
425 
426  # if there was an error and we're testing, return False for the testing module
427  if not(no_error) and self._testing:
428  return False
429 
430  # finally, check hashes_not_found with hashes not found locally - if there is an intersection, we stop the upload
431  # because if a hash is not found and is not on the server, there is no data to upload
432  all_hashes = map(lambda iov : iov["payload_hash"], self.data_to_send["iovs"])
433  hashes_not_found = check_hashes_response["hashes_not_found"]
434  hashes_found = list(set(all_hashes) - set(hashes_not_found))
435  self._outputter.write("Checking for IOVs that have no Payload locally or on the server.", output.VERBOSE)
436  # check if any hashes not found on the server is used in the local SQLite database
437  for hash_not_found in hashes_not_found:
438  if hash_not_found in self.hashes_with_no_local_payload:
439  return self.exit_upload("IOV with hash '%s' does not have a Payload locally or on the server. Cannot continue." % hash_not_found)
440 
441  for hash_found in hashes_found:
442  if hash_found in self.hashes_with_no_local_payload:
443  self._outputter.write("Payload with hash %s on server, so can upload IOV." % hash_found, output.VERBOSE)
444 
445  self._outputter.write("Found %i Payloads in remote server" % len(hashes_found), output.INFO)
446  self._outputter.write("Found %i Payloads not in remote server" % len(hashes_not_found), output.INFO)
447 
448  self._outputter.write("All IOVs either come with Payloads or point to a Payload already on the server.", output.VERBOSE)
449 
450  except errors.NoMoreRetriesException as no_more_retries:
451  # for now, just write the log if we get a NoMoreRetriesException
452  return self.exit_upload("Ran out of retries trying to check hashes of payloads to send, where the limit was 3.")
453  except Exception as e:
454  # something went wrong that we have no specific exception for, so just exit and output the traceback if --debug is set.
455  self._outputter.write(traceback.format_exc(), output.ERROR)
456 
457  if not(self._verbose):
458  self._outputter.write("Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
459  else:
460  self._outputter.write("Something went wrong that isn't handled by code - the traceback is above.")
461 
462  return self.exit_upload()
463 
464  """
465  Send the payloads the server told us about in the previous step (returned from get_hashes_to_send)
466  exception handling is done inside this method, since it calls a method itself for each payload.
467  """
468  send_payloads_response = self.send_payloads(check_hashes_response["hashes_not_found"], self.upload_session_id)
469  if self._testing and not(send_payloads_response):
470  return False
471 
472  """
473  Final stage - send metadata to server (since the payloads are there now)
474  if this is successful, once it finished the upload session is closed on the server and the tag lock is released.
475  """
476  try:
477 
478  # note that the response (in send_metadata_response) is already decoded from base64 by the response check decorator
479  send_metadata_response = self.send_metadata(self.upload_session_id)
480 
481  no_error = self.check_response_for_error_key(send_metadata_response)
482  if not(no_error) and self._testing:
483  return False
484 
485  try:
486  self._outputter.write(send_metadata_response["summary"], output.INFO)
487  except KeyError:
488  pass
489 
490  # we have to call this explicitly here since check_response_for_error_key only writes the log file
491  # if an error has occurred, whereas it should always be written here
493 
494  except errors.NoMoreRetriesException as no_more_retries:
495  return self.exit_upload("Ran out of retries trying to send metadata, where the limit was 3.")
496  except Exception as e:
497  # something went wrong that we have no specific exception for, so just exit and output the traceback if --debug is set.
498  self._outputter.write(traceback.format_exc(), output.ERROR)
499 
500  if not(self._verbose):
501  self._outputter.write("Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
502  else:
503  self._outputter.write("Something went wrong that isn't handled by code - the traceback is above.")
504 
505  return self.exit_upload()
506 
507  # close client side log handle
508  self._handle.close()
509 
510  # if we're running the testing script, return True to say the upload has worked
511  if self._testing:
512  return True
513 
514  @check_response(check="json")
516  """
517  Open an upload session on the server, and get a unique token back that we can use to authenticate for all future requests,
518  as long as the upload session is still open.
519  """
520  self._outputter.write("Getting upload session.", output.VERBOSE)
521 
522  # send password in the body so it can be encrypted over https
523  # username and password are taken from the netrc file
524  # at this point, the value in username_or_token is always a username, since
525  # this method's end result is obtaining a token.
526  body_data = base64.b64encode(json.dumps(
527  {
528  "destinationTag" : self.data_to_send["destinationTags"].keys()[0],
529  "username_or_token" : self.data_to_send["username"],
530  "password" : self.data_to_send["password"]
531  }
532  ))
533 
534  url_data = {"database" : self.data_to_send["destinationDatabase"]}
535 
536  query = url_query(url=self._SERVICE_URL + "get_upload_session/", body=body_data, url_data=url_data)
537  response = query.send()
538  return response
539 
540  @check_response(check="json")
541  def close_upload_session(self, upload_session_id):
542  """
543  Close an upload session on the server by calling its close_upload_session end-point.
544  This is done if there is an error on the client-side.
545  """
546  self._outputter.write("An error occurred - closing the upload session on the server.")
547  url_data = {"database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id}
548  query = url_query(url=self._SERVICE_URL + "close_upload_session/", url_data=url_data)
549  response = query.send()
550  return response
551 
552  @check_response(check="json")
553  def get_fcsr_from_server(self, upload_session_id):
554  """
555  Execute the HTTPs request to ask the server for the FCSR.
556 
557  Note: we do this in a separate function we so we can do the decoding check for json data with check_response.
558  """
559  # tiny amount of client-side logic here - all of the work is done on the server
560  url_data = {
561  "database" : self.data_to_send["destinationDatabase"],
562  "upload_session_id" : upload_session_id,
563  "destinationTag" : self.data_to_send["destinationTags"].keys()[0],
564  "sourceTagSync" : self.data_to_send["fcsr_filter"]
565  }
566  query = url_query(url=self._SERVICE_URL + "get_fcsr/", url_data=url_data)
567  result = query.send()
568  return result
569 
570  def filter_iovs_by_fcsr(self, upload_session_id):
571  """
572  Ask for the server for the FCSR based on the synchronization type of the source Tag.
573  Then, modify the IOVs (possibly remove some) based on the FCSR we received.
574  This is useful in the case that most IOVs have different payloads, and our FCSR is close to the end of the range the IOVs cover.
575  """
576  self._outputter.write("Getting the First Condition Safe Run for the current sync type.")
577 
578  fcsr_data = self.get_fcsr_from_server(upload_session_id)
579  fcsr = fcsr_data["fcsr"]
580  fcsr_changed = fcsr_data["fcsr_changed"]
581  new_sync = fcsr_data["new_sync"]
582 
583  if fcsr_changed:
584  self._outputter.write("Synchronization '%s' given was changed to '%s' to match destination Tag." % (self.data_to_send["fcsr_filter"], new_sync))
585 
586  self._outputter.write("Synchronization '%s' gave FCSR %d for FCSR Filtering."\
587  % (self.data_to_send["fcsr_filter"], friendly_since(self.data_to_send["inputTagData"]["time_type"], fcsr)))
588 
589  """
590  There may be cases where this assumption is not correct (that we can reassign since if fcsr > since)
591  Only set since to fcsr from server if the fcsr is further along than the user is trying to upload to
592  Note: this applies to run, lumi and timestamp run_types.
593  """
594 
595  # if the fcsr is above the since given by the user, we need to set the user since to the fcsr
596  if fcsr > self.data_to_send["since"]:
597  # check if we're uploading to offline sync - if so, then user since must be >= fcsr, so we should report an error
598  if self.data_to_send["fcsr_filter"].lower() == "offline":
599  self._outputter.write("If you're uploading to offline, you can't upload to a since < FCSR.\nNo upload has been processed.")
600  self.exit_upload()
601  self.data_to_send["since"] = fcsr
602 
603  self._outputter.write("Final FCSR after comparison with FCSR received from server is %d."\
604  % friendly_since(self.data_to_send["inputTagData"]["time_type"], int(self.data_to_send["since"])))
605 
606  """
607  Post validation processing assuming destination since is now valid.
608 
609  Because we don't have an sqlite database to query (everything's in a dictionary),
610  we have to go through the IOVs manually find the greatest since that's less than
611  the destination since.
612 
613  Purpose of this algorithm: move any IOV sinces that we can use up to the fcsr without leaving a hole in the Conditions coverage
614  """
615 
616  max_since_below_dest = self.data_to_send["iovs"][0]["since"]
617  for (i, iov) in enumerate(self.data_to_send["iovs"]):
618  if self.data_to_send["iovs"][i]["since"] <= self.data_to_send["since"] and self.data_to_send["iovs"][i]["since"] > max_since_below_dest:
619  max_since_below_dest = self.data_to_send["iovs"][i]["since"]
620 
621  # only select iovs that have sinces >= max_since_below_dest
622  # and then shift any IOVs left to the destination since
623  self.data_to_send["iovs"] = [iov for iov in self.data_to_send["iovs"] if iov["since"] >= max_since_below_dest]
624  for (i, iov) in enumerate(self.data_to_send["iovs"]):
625  if self.data_to_send["iovs"][i]["since"] < self.data_to_send["since"]:
626  self.data_to_send["iovs"][i]["since"] = self.data_to_send["since"]
627 
628  # modify insertion_time of iovs
629  new_time = to_timestamp(datetime.now())
630  for (i, iov) in enumerate(self.data_to_send["iovs"]):
631  self.data_to_send["iovs"][i]["insertion_time"] = new_time
632 
633  def get_all_hashes(self):
634  """
635  Get all the hashes from the dictionary of IOVs we have from the SQLite file.
636  """
637  self._outputter.write("\tGetting list of all hashes found in SQLite database.", output.DEBUG)
638  hashes = map(lambda iov : iov["payload_hash"], self.data_to_send["iovs"])
639  self._outputter.write("Found %i local Payload(s) referenced in IOVs"%len(hashes), output.INFO)
640  return hashes
641 
642  @check_response(check="json")
643  def get_hashes_to_send(self, upload_session_id):
644  """
645  Get the hashes of the payloads we want to send that the server doesn't have yet.
646  """
647  self._outputter.write("Getting list of hashes that the server does not have Payloads for, to send to server.", output.DEBUG)
648  post_data = json.dumps(self.get_all_hashes())
649  url_data = {"database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id}
650  query = url_query(url=self._SERVICE_URL + "check_hashes/", url_data=url_data, body=post_data)
651  response = query.send()
652  return response
653 
654  def send_payloads(self, hashes, upload_session_id):
655  """
656  Send a list of payloads corresponding to hashes we got from the SQLite file and filtered by asking the server.
657  """
658  # if we have no hashes, we can't send anything
659  # but don't exit since it might mean all the Payloads were already on the server
660  if len(hashes) == 0:
661  self._outputter.write("No payloads to send - moving to IOV upload.")
662  return True
663  else:
664  self._outputter.write("Sending payloads of hashes not found:")
665  # construct connection string for local SQLite database file
666  database = ("sqlite://%s" % os.path.abspath(self.sqlite_file_name)) if isinstance(self.sqlite_file_name, str) else self.sqlite_file_name
667  # create CondDBFW connection that maps blobs - as we need to query for payload BLOBs (disabled by default in CondDBFW)
668  self._outputter.write("\tConnecting to input SQLite database.")
669  con = querying.connect(database, map_blobs=True)
670 
671  # query for the Payloads
672  self._outputter.write("\tGetting Payloads from SQLite database based on list of hashes.")
673  payloads = con.payload(hash=hashes)
674  # if we get a single Payload back, put it in a list and turn it into a json_list
675  if payloads.__class__ != data_sources.json_list:
676  payloads = data_sources.json_data_node.make([payloads])
677 
678  # close the session with the SQLite database file - we won't use it again
679  con.close_session()
680 
681  # if found some Payloads, send them
682  if payloads:
683  # Note: there is an edge case in which the SQLite file could have been queried
684  # to delete the Payloads since we queried it for IOV hashes. This may be handled in the next iteration.
685  # send http post with data blob in body, and everything else as URL parameters
686  # convert Payload to a dictionary - we can put most of this into the URL of the HTTPs request
687  dicts = payloads.as_dicts()
688  self._outputter.write("Uploading Payload BLOBs:")
689 
690  # for each payload, send the BLOB to the server
691  for n, payload in enumerate(dicts):
692  self._outputter.write("\t(%d/%d) Sending payload with hash '%s'." % (n+1, len(dicts), payload["hash"]))
693  response = self.send_blob(payload, upload_session_id)
694  # check response for errors
695  no_error = self.check_response_for_error_key(response, exit_if_error=True)
696  if not(no_error):
697  return False
698  self._outputter.write("\tPayload sent - moving to next one.")
699  self._outputter.write("All Payloads uploaded.")
700  return True
701  else:
702  return False
703 
704  @check_response(check="json")
705  def send_blob(self, payload, upload_session_id):
706  """
707  Send the BLOB of a payload over HTTP.
708  The BLOB is put in the request body, so no additional processing has to be done on the server side, apart from decoding from base64.
709  """
710  # encode the BLOB data of the Payload to make sure we don't send a character that will influence the HTTPs request
711  blob_data = base64.b64encode(payload["data"])
712 
713  url_data = {"database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id}
714 
715  # construct the data to send in the body and header of the HTTPs request
716  for key in payload.keys():
717  # skip blob
718  if key != "data":
719  if key == "insertion_time":
720  url_data[key] = to_timestamp(payload[key])
721  else:
722  url_data[key] = payload[key]
723 
724  request = url_query(url=self._SERVICE_URL + "store_payload/", url_data=url_data, body=blob_data)
725 
726  # send the request and return the response
727  # Note - the url_query module will handle retries, and will throw a NoMoreRetriesException if it runs out
728  try:
729  request_response = request.send()
730  return request_response
731  except Exception as e:
732  # make sure we don't try again - if a NoMoreRetriesException has been thrown, retries have run out
733  if isinstance(e, errors.NoMoreRetriesException):
734  self._outputter.write("\t\t\tPayload with hash '%s' was not uploaded because the maximum number of retries was exceeded." % payload["hash"])
735  self._outputter.write("Payload with hash '%s' was not uploaded because the maximum number of retries was exceeded." % payload["hash"])
736  return json.dumps({"error" : str(e), "traceback" : traceback.format_exc()})
737 
738  @check_response(check="json")
739  def send_metadata(self, upload_session_id):
740  """
741  Final part of the upload process - send the Conditions metadata (Tag, IOVs - not upload metadata).
742  The server closes the session (and releases the tag lock) after processing has been completed.
743  """
744 
745  # set user text if it's empty
746  if self.data_to_send["userText"] in ["", None]:
747  self.data_to_send["userText"] = "Tag '%s' uploaded from CondDBFW client." % self.data_to_send["destinationTags"].keys()[0]
748 
749  self._outputter.write("Sending metadata to server - see server_side_log at server_side_logs/upload_log_%s for details on metadata processing on server side."\
750  % self.upload_session_id, output.VERBOSE)
751 
752  # sent the HTTPs request to the server
753  url_data = {"database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id}
754  request = url_query(url=self._SERVICE_URL + "upload_metadata/", url_data=url_data, body=json.dumps(self.data_to_send))
755  response = request.send()
756  self._outputter.write("Response received - conditions upload process complete.", output.VERBOSE)
757  return response
758 
759 if __name__ == "__main__":
760  """
761  This code should only be executed for testing.
762  """
763  import sys
764  from .uploadConditions import parse_arguments
765 
766  print(
767 """
768 This code should only be executed for testing.
769 Any uploads done by the user should be done by calling the uploadConditions.py script.
770 See https://cms-conddb-dev.cern.ch/cmsDbCondUpload for information on how to obtain the correct version.
771 """
772  )
773 
774  upload_metadata = parse_arguments()
775 
776  upload_metadata["sqlite_file"] = upload_metadata.get("sourceDB")
777 
778  # make new dictionary, and copy over everything except "metadata_source"
779  upload_metadata_argument = {}
780  for (key, value) in upload_metadata.items():
781  if key != "metadata_source":
782  upload_metadata_argument[key] = value
783 
784  upload_metadata["metadata_source"] = data_sources.json_data_node.make(upload_metadata_argument)
785 
786  upload_controller = uploader(**upload_metadata)
787 
788  result = upload_controller.upload()
uploadConditions.parse_arguments
def parse_arguments()
Definition: uploadConditions.py:179
uploads.uploader.get_fcsr_from_server
def get_fcsr_from_server(self, upload_session_id)
Definition: uploads.py:553
uploads.uploader._testing
_testing
Definition: uploads.py:115
resolutioncreator_cfi.object
object
Definition: resolutioncreator_cfi.py:4
uploads.uploader.upload_session_id
upload_session_id
Definition: uploads.py:119
uploads.uploader.exit_upload
def exit_upload(self, message=None)
Definition: uploads.py:330
uploads.uploader.filter_iovs_by_fcsr
def filter_iovs_by_fcsr(self, upload_session_id)
Definition: uploads.py:570
uploads.uploader.close_upload_session
def close_upload_session(self, upload_session_id)
Definition: uploads.py:541
data_sources.json_data_node.make
def make(data)
Definition: data_sources.py:131
uploads.uploader.upload
def upload(self)
Definition: uploads.py:352
querying.connect
def connect(connection_data, mode="r", map_blobs=False, secrets=None, pooling=True)
Definition: querying.py:452
relativeConstraints.keys
keys
Definition: relativeConstraints.py:89
uploads.uploader._outputter
_outputter
Definition: uploads.py:126
uploads.output.current_output_length
current_output_length
Definition: uploads.py:74
conddb_time.to_timestamp
def to_timestamp(dt)
Definition: conddb_time.py:13
uploads.output._log_handle
_log_handle
Definition: uploads.py:76
uploads.uploader.upload_log_file_name
upload_log_file_name
Definition: uploads.py:122
errors.check_response
def check_response(check="json")
Definition: errors.py:27
str
#define str(s)
Definition: TestProcessor.cc:53
uploads.log
def log(file_handle, message)
Definition: uploads.py:41
uploads.uploader._SERVICE_URL
_SERVICE_URL
Definition: uploads.py:118
uploads.uploader._log_data
_log_data
Definition: uploads.py:117
uploads.uploader._handle
_handle
Definition: uploads.py:123
uploads.output
Definition: uploads.py:61
uploads.friendly_since
def friendly_since(time_type, since)
Definition: uploads.py:30
uploads.uploader.get_upload_session_id
def get_upload_session_id(self)
Definition: uploads.py:515
uploads.uploader.__init__
def __init__(self, metadata_source=None, debug=False, verbose=False, testing=False, server="https://cms-conddb-dev.cern.ch/cmsDbCondUpload/", **kwargs)
Definition: uploads.py:103
print
void print(TMatrixD &m, const char *label=nullptr, bool mathematicaFormat=false)
Definition: Utilities.cc:46
Exception
uploads.output._verbose
_verbose
Definition: uploads.py:75
uploads.uploader._verbose
_verbose
Definition: uploads.py:114
uploads.uploader.hashes_with_no_local_payload
hashes_with_no_local_payload
Definition: uploads.py:199
createfilelist.int
int
Definition: createfilelist.py:10
uploads.uploader.write_server_side_log
def write_server_side_log(self, log_data)
Definition: uploads.py:290
uploads.uploader.input_tag
input_tag
Definition: uploads.py:163
uploads.output._debug
_debug
Definition: uploads.py:77
uploads.uploader
Definition: uploads.py:98
writeEcalDQMStatus.write
write
Definition: writeEcalDQMStatus.py:48
uploads.uploader._debug
_debug
Definition: uploads.py:113
uploads.uploader.get_hashes_to_send
def get_hashes_to_send(self, upload_session_id)
Definition: uploads.py:643
uploads.uploader.metadata_source
metadata_source
Definition: uploads.py:135
uploads.uploader.send_blob
def send_blob(self, payload, upload_session_id)
Definition: uploads.py:705
errors.NoMoreRetriesException
Definition: errors.py:19
uploads.uploader.server_side_log_file
server_side_log_file
Definition: uploads.py:372
uploads.output.write
def write(self, message="", level=INFO)
Definition: uploads.py:80
data_sources.json_list
Definition: data_sources.py:175
uploads.uploader.send_metadata
def send_metadata(self, upload_session_id)
Definition: uploads.py:739
data
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:79
uploads.uploader.get_tag_dictionary
def get_tag_dictionary(self)
Definition: uploads.py:248
uploads.uploader.check_response_for_error_key
def check_response_for_error_key(self, response_dict, exit_if_error=True)
Definition: uploads.py:254
genParticles_cff.map
map
Definition: genParticles_cff.py:11
uploads.output.__init__
def __init__(self, log_handle=None, verbose=False, debug=False)
Definition: uploads.py:72
beamvalidation.exit
def exit(msg="")
Definition: beamvalidation.py:52
uploads.output.labels
labels
Definition: uploads.py:78
uploads.uploader.get_all_hashes
def get_all_hashes(self)
Definition: uploads.py:633
uploads.new_log_file_id
def new_log_file_id()
Definition: uploads.py:47
uploads.uploader.send_payloads
def send_payloads(self, hashes, upload_session_id)
Definition: uploads.py:654
uploads.uploader.data_to_send
data_to_send
Definition: uploads.py:238
uploads.uploader.sqlite_file_name
sqlite_file_name
Definition: uploads.py:169
url_query.url_query
Definition: url_query.py:21