CMS 3D CMS Logo

uploads.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 """
3 
4 Joshua Dawes - CERN, CMS - The University of Manchester
5 
6 This module holds classes to help with uploading conditions to the drop box web service, which also uses CondDBFW to read and write data.
7 
8 """
9 
10 import os
11 import json
12 import base64
13 from datetime import datetime
14 from urllib import urlencode
15 import math
16 import sys
17 import traceback
18 import netrc
19 
20 from url_query import url_query
21 import models
22 import errors
23 import data_sources
24 import querying
25 from errors import *
26 from utils import to_timestamp, to_datetime, friendly_since
27 
28 def friendly_since(time_type, since):
29  """
30  Takes a since and, if it is Run-based expressed as Lumi-based, returns the run number.
31  Otherwise, returns the since without transformations.
32  """
33  if time_type == "Run" and (since & 0xffffff) == 0:
34  return since >> 32
35  else:
36  return since
37 
38 # this is simple, and works for now - if logging requirements change, I will write a logging class to manage logging
39 def log(file_handle, message):
40  """
41  Very simple logging function, used by output class.
42  """
43  file_handle.write("[%s] %s\n" % (to_timestamp(datetime.now()), message))
44 
46  """
47  Find a new client-side log file name.
48 
49  Note: This cannot use the upload session token since logs need to be written before this is opened.
50  However, this can be changed so that the filename that uses the token is written to once
51  it is obtained.
52  """
53  # new id = number of log files + 1
54  # (primitive - matching the hash of the upload session may be a better idea)
55  log_files = [file for file in os.listdir(os.path.join(os.getcwd(), "upload_logs")) if "upload_log" in file]
56  new_id = len(log_files)+1
57  return new_id
58 
59 class output():
60  """
61  Used to control output to the console and to the client-side log.
62  """
63 
64  def __init__(self, log_handle=None, verbose=False):
65  # first time writing progress bar, don't need to go back along the line
67  self._verbose = verbose
68  self._log_handle = log_handle
69 
70  def write(self, message="", ignore_verbose=False):
71  """
72  Write to the console and to the log file held by self.
73  """
74  if ignore_verbose:
75  print(message)
76  elif self._verbose:
77  print(message)
78  if self._log_handle != None:
79  log(self._log_handle, message)
80 
82  """
83  Upload session controller - creates, tracks, and deletes upload sessions on the server.
84  """
85 
86  def __init__(self, metadata_source=None, debug=False, verbose=False, testing=False, server="https://cms-conddb-dev.cern.ch/cmsDbCondUpload/", **kwargs):
87  """
88  Upload constructor:
89  Given an SQLite file and a Metadata sources, reads into a dictionary read for it to be encoded and uploaded.
90 
91  Note: kwargs is used to capture stray arguments - arguments that do not match keywords will not be used.
92 
93  Note: default value of service_url should be changed for production.
94  """
95  # set private variables
96  self._debug = debug
97  self._verbose = verbose
98  self._testing = testing
99  # initialise server-side log data as empty string - will be replaced when we get a response back from the server
100  self._log_data = ""
101  self._SERVICE_URL = server
102  self.upload_session_id = None
103 
104  # set up client-side log file
105  self.upload_log_file_name = "upload_logs/upload_log_%d" % new_log_file_id()
106  self._handle = open(self.upload_log_file_name, "a")
107 
108  # set up client-side logging object
109  self._outputter = output(verbose=verbose, log_handle=self._handle)
110  self._outputter.write("Using server instance at '%s'." % self._SERVICE_URL)
111 
112  # expect a CondDBFW data_source object for metadata_source
113  if metadata_source == None:
114  # no upload metadat has been given - we cannot continue with the upload
115  self.exit_upload("A source of metadata must be given so CondDBFW knows how to upload conditions.")
116  else:
117  # set up global metadata source variable
118  self.metadata_source = metadata_source.data()
119 
120  # check for the destination tag
121  # this is required whatever type of upload we're performing
122  if self.metadata_source.get("destinationTags") == None:
123  self.exit_upload("No destination Tag was given.")
124  else:
125  if isinstance(self.metadata_source.get("destinationTags"), dict) and self.metadata_source.get("destinationTags").keys()[0] == None:
126  self.exit_upload("No destination Tag was given.")
127 
128  # make sure a destination database was given
129  if self.metadata_source.get("destinationDatabase") == None:
130  self.exit_upload("No destination database was given.")
131 
132  # get Conditions metadata
133  if self.metadata_source.get("sourceDB") == None and self.metadata_source.get("hashToUse") == None:
134  """
135  If we have neither an sqlite file nor the command line data
136  """
137  self.exit_upload("You must give either an SQLite database file, or the necessary command line arguments to replace one."\
138  + "\nSee --help for command line argument information.")
139  elif self.metadata_source.get("sourceDB") != None:
140  """
141  We've been given an SQLite file, so try to extract Conditions Metadata based on that and the Upload Metadata in metadata_source
142  We now extract the Tag and IOV data from SQLite. It is added to the dictionary for sending over HTTPs later.
143  """
144 
145  # make sure we have an input tag to look for in the source db
146  self.input_tag = metadata_source.data().get("inputTag")
147  if self.input_tag == None:
148  self.exit_upload("No input Tag name was given.")
149 
150  # set empty dictionary to contain Tag and IOV data from SQLite
151  result_dictionary = {}
152  self.sqlite_file_name = self.metadata_source["sourceDB"]
153  if not(os.path.isfile(self.sqlite_file_name)):
154  self.exit_upload("SQLite file '%s' given doesn't exist." % self.sqlite_file_name)
155  sqlite_con = querying.connect("sqlite://%s" % os.path.abspath(self.sqlite_file_name))
156 
157  self._outputter.write("Getting Tag and IOVs from SQLite database.")
158 
159  # query for Tag, check for existence, then convert to dictionary
160  tag = sqlite_con.tag(name=self.input_tag)
161  if tag == None:
162  self.exit_upload("The source Tag '%s' you gave was not found in the SQLite file." % self.input_tag)
163  tag = tag.as_dicts(convert_timestamps=True)
164 
165  # query for IOVs, check for existence, then convert to dictionaries
166  iovs = sqlite_con.iov(tag_name=self.input_tag)
167  if iovs == None:
168  self.exit_upload("No IOVs found in the SQLite file given for Tag '%s'." % self.input_tag)
169  iovs = iovs.as_dicts(convert_timestamps=True)
170  iovs = [iovs] if not isinstance(iovs, list) else iovs
171 
172  """
173  Finally, get the list of all Payload hashes of IOVs,
174  then compute the list of hashes for which there is no Payload for
175  this is used later to decide if we can continue the upload if the Payload was not found on the server.
176  """
177  iovs_for_hashes = sqlite_con.iov(tag_name=self.input_tag)
178  if iovs_for_hashes.__class__ == data_sources.json_list:
179  hashes_of_iovs = iovs_for_hashes.get_members("payload_hash").data()
180  else:
181  hashes_of_iovs = [iovs_for_hashes.payload_hash]
182  self.hashes_with_no_local_payload = [payload_hash for payload_hash in hashes_of_iovs if sqlite_con.payload(hash=payload_hash) == None]
183 
184  # close session open on SQLite database file
185  sqlite_con.close_session()
186 
187  elif metadata_source.data().get("hashToUse") != None:
188  """
189  Assume we've been given metadata in the command line (since no sqlite file is there, and we have command line arguments).
190  We now use Tag and IOV data from command line. It is added to the dictionary for sending over HTTPs later.
191  """
192 
193  # set empty dictionary to contain Tag and IOV data from command line
194  result_dictionary = {}
195 
196  now = to_timestamp(datetime.now())
197  # tag dictionary will be taken from the server
198  # this does not require any authentication
199  tag = self.get_tag_dictionary()
201  iovs = [{"tag_name" : self.metadata_source["destinationTag"], "since" : self.metadata_source["since"], "payload_hash" : self.metadata_source["hashToUse"],\
202  "insertion_time" : now}]
203 
204  # hashToUse cannot be stored locally (no sqlite file is given), so register it as not found
205  self.hashes_with_no_local_payload = [self.metadata_source["hashToUse"]]
206 
207  # Note: normal optimisations will still take place - since the hash checking stage can tell if hashToUse does not exist on the server side
208 
209  # if the source Tag is run-based, convert sinces to lumi-based sinces with lumi-section = 0
210  if tag["time_type"] == "Run":
211  for (i, iov) in enumerate(iovs):
212  iovs[i]["since"] = iovs[i]["since"] << 32
213 
214  result_dictionary = {"inputTagData" : tag, "iovs" : iovs}
215 
216  # add command line arguments to dictionary
217  # remembering that metadata_source is a json_dict object
218  result_dictionary.update(metadata_source.data())
219 
220  # store in instance variable
221  self.data_to_send = result_dictionary
222 
223  # if the since doesn't exist, take the first since from the list of IOVs
224  if result_dictionary.get("since") == None:
225  result_dictionary["since"] = sorted(iovs, key=lambda iov : iov["since"])[0]["since"]
226  elif self.data_to_send["inputTagData"]["time_type"] == "Run":
227  # Tag time_type says IOVs use Runs for sinces, so we convert to Lumi-based for uniform processing
228  self.data_to_send["since"] = self.data_to_send["since"] << 32
229 
230  """
231  TODO - Settle on a single destination tag format.
232  """
233  # look for deprecated metadata entries - give warnings
234  # Note - we only really support this format
235  try:
236  if isinstance(result_dictionary["destinationTags"], dict):
237  self._outputter.write("WARNING: Multiple destination tags in a single metadata source is deprecated.")
238  except Exception as e:
239  self._outputter.write("ERROR: %s" % str(e))
240 
241  @check_response(check="json")
243  url_data = {"tag_name" : self.metadata_source["destinationTag"], "database" : self.metadata_source["destinationDatabase"]}
244  request = url_query(url=self._SERVICE_URL + "get_tag_dictionary/", url_data=url_data)
245  response = request.send()
246  return response
247 
248  def check_response_for_error_key(self, response_dict, exit_if_error=True):
249  """
250  Checks the decoded response of an HTTP request to the server.
251  If it is a dictionary, and one of its keys is "error", the server returned an error
252  """
253  # if the decoded response data is a dictionary and has an error key in it, we should display an error and its traceback
254  if isinstance(response_dict, dict) and "error" in response_dict.keys():
255  splitter_string = "\n%s\n" % ("-"*50)
256  self._outputter.write("\nERROR: %s" % splitter_string, ignore_verbose=True)
257  self._outputter.write(response_dict["error"], ignore_verbose=True)
258 
259  # if the user has given the --debug flag, show the traceback as well
260  if self._debug:
261  # suggest to the user to email this to db upload experts
262  self._outputter.write("\nTRACEBACK (since --debug is set):%s" % splitter_string, ignore_verbose=True)
263  if response_dict.get("traceback") != None:
264  self._outputter.write(response_dict["traceback"], ignore_verbose=True)
265  else:
266  self._outputter.write("No traceback was returned from the server.", ignore_verbose=True)
267  else:
268  self._outputter.write("Use the --debug option to show the traceback of this error.", ignore_verbose=True)
269 
270  # write server side log to client side (if we have an error from creating an upload session, the log is in its initial state (""))
271  # if an error has occurred on the server side, a log will have been written
272  self.write_server_side_log(response_dict.get("log_data"))
273 
274  if exit_if_error:
275  if self._testing:
276  return False
277  else:
278  exit()
279  elif not("error" in response_dict.keys()) and "log_data" in response_dict.keys():
280  # store the log data, if it's there, in memory - this is used if a request times out and we don't get any log data back
281  self._log_data = response_dict["log_data"]
282  return True
283 
284  def write_server_side_log(self, log_data):
285  """
286  Given the log data from the server, write it to a client-side log file.
287  """
288  # if the server_side_log directory doesn't exist, create it
289  # without it we can't write the log when we download it from the server
290  if not(os.path.exists(os.path.join(os.getcwd(), "server_side_logs/"))):
291  os.makedirs("server_side_logs/")
292 
293  # directory exists now, write to client-side log file
294  server_log_file_name = None
295  try:
296  # if the upload session does not exist yet, don't try to write the log file
297  if self.upload_session_id == None:
298  raise Exception("No upload session")
299  # create a write handle to the file, decode the log data from base64, write and close
300  server_log_file_name = "server_side_logs/upload_log_%s" % str(self.upload_session_id)
301  handle = open(server_log_file_name, "w")
302  handle.write(base64.b64decode(log_data))
303  handle.close()
304  except Exception as e:
305  # reset log file name to None so we don't try to write it later
306  server_log_file_name = None
307  #self._outputter.write("Couldn't write the server-side log file.\nThis may be because no upload session could be opened.")
308 
309  # tell the user where the log files are
310  # in the next iteration we may just merge the log files and store one log (how it's done in the plotter module)
311  if server_log_file_name != None:
312  print("Log file from server written to '%s'." % server_log_file_name)
313  else:
314  print("No server log file could be written locally.")
315 
316  print("Log file from CondDBFW written to '%s'." % self.upload_log_file_name)
317 
318  def exit_upload(self, message=None):
319  """
320  Used to exit the script - which only happens if an error has occurred.
321  If the --testing flag was passed by the user, we should return False for failure, and not exit
322  """
323  if self.upload_session_id != None:
324  # only try to close the upload session if an upload session has been obtained
325  response = self.close_upload_session(self.upload_session_id)
326  no_error = self.check_response_for_error_key(response)
327  # if no error was found in the upload session closure request,
328  # we still have to write the server side log
329  if no_error:
331  # close client-side log handle
332  self._handle.close()
333  if message != None:
334  print("\n%s\n" % message)
335  if self._testing:
336  return False
337  else:
338  exit()
339 
340  def upload(self):
341  """
342  Calls methods that send HTTP requests to the upload server.
343  """
344 
345  """
346  Open an upload session on the server - this also gives us a tag lock on the tag being uploaded, if it is available.
347  """
348  try:
349 
350  # get upload session, check response for error key
351  upload_session_data = self.get_upload_session_id()
352  no_error = self.check_response_for_error_key(upload_session_data)
353 
354  # if there was an error and we're testing, return False for the testing module
355  if not(no_error) and self._testing:
356  return False
357 
358  self.upload_session_id = upload_session_data["id"]
359  self._outputter.write("Upload session obtained with token '%s'." % self.upload_session_id)
360  self.server_side_log_file = upload_session_data["log_file"]
361 
362  except errors.NoMoreRetriesException as no_more_retries:
363  return self.exit_upload("Ran out of retries opening an upload session, where the limit was 3.")
364  except Exception as e:
365  # something went wrong that we have no specific exception for, so just exit and output the traceback if --debug is set.
366  self._outputter.write(traceback.format_exc(), ignore_verbose=True)
367 
368  if not(self._verbose):
369  self._outputter.write("Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
370  else:
371  self._outputter.write("Something went wrong that isn't handled by code - the traceback is above.")
372 
373  return self.exit_upload()
374 
375  """
376  Only if a value is given for --fcsr-filter, run FCSR filtering on the IOVs locally.
377  """
378  if self.data_to_send["fcsr_filter"] != None:
379  """
380  FCSR Filtering:
381  Filtering the IOVs before we send them by getting the First Conditions Safe Run
382  from the server based on the target synchronization type.
383  """
384  if self.data_to_send["inputTagData"]["time_type"] != "Time":
385  # if we have a time-based tag, we can't do FCSR validation - this is also the case on the server side
386  try:
388  # this function does not return a value, since it just operates on data - so no point checking for an error key
389  # the error key check is done inside the function on the response from the server
390  except errors.NoMoreRetriesException as no_more_retries:
391  return self.exit_upload("Ran out of retries trying to filter IOVs by FCSR from server, where the limit was 3.")
392  except Exception as e:
393  # something went wrong that we have no specific exception for, so just exit and output the traceback if --debug is set.
394  self._outputter.write(traceback.format_exc(), ignore_verbose=True)
395 
396  if not(self._verbose):
397  self._outputter.write("Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
398  else:
399  self._outputter.write("Something went wrong that isn't handled by code - the traceback is above.")
400 
401  return self.exit_upload()
402  else:
403  self._outputter.write("The Tag you're uploading is time-based, so we can't do any FCSR-based validation. FCSR filtering is being skipped.")
404 
405  """
406  Check for the hashes that the server doesn't have - only send these (but in the next step).
407  """
408  try:
409 
410  check_hashes_response = self.get_hashes_to_send(self.upload_session_id)
411  # check for an error key in the response
412  no_error = self.check_response_for_error_key(check_hashes_response)
413 
414  # if there was an error and we're testing, return False for the testing module
415  if not(no_error) and self._testing:
416  return False
417 
418  # finally, check hashes_not_found with hashes not found locally - if there is an intersection, we stop the upload
419  # because if a hash is not found and is not on the server, there is no data to upload
420  all_hashes = map(lambda iov : iov["payload_hash"], self.data_to_send["iovs"])
421  hashes_not_found = check_hashes_response["hashes_not_found"]
422  hashes_found = list(set(all_hashes) - set(hashes_not_found))
423  self._outputter.write("Checking for IOVs that have no Payload locally or on the server.")
424  # check if any hashes not found on the server is used in the local SQLite database
425  for hash_not_found in hashes_not_found:
426  if hash_not_found in self.hashes_with_no_local_payload:
427  return self.exit_upload("IOV with hash '%s' does not have a Payload locally or on the server. Cannot continue." % hash_not_found)
428 
429  for hash_found in hashes_found:
430  if hash_found in self.hashes_with_no_local_payload:
431  self._outputter.write("Payload with hash %s on server, so can upload IOV." % hash_found)
432 
433  self._outputter.write("All IOVs either come with Payloads or point to a Payload already on the server.")
434 
435  except errors.NoMoreRetriesException as no_more_retries:
436  # for now, just write the log if we get a NoMoreRetriesException
437  return self.exit_upload("Ran out of retries trying to check hashes of payloads to send, where the limit was 3.")
438  except Exception as e:
439  # something went wrong that we have no specific exception for, so just exit and output the traceback if --debug is set.
440  self._outputter.write(traceback.format_exc(), ignore_verbose=True)
441 
442  if not(self._verbose):
443  self._outputter.write("Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
444  else:
445  self._outputter.write("Something went wrong that isn't handled by code - the traceback is above.")
446 
447  return self.exit_upload()
448 
449  """
450  Send the payloads the server told us about in the previous step (returned from get_hashes_to_send)
451  exception handling is done inside this method, since it calls a method itself for each payload.
452  """
453  send_payloads_response = self.send_payloads(check_hashes_response["hashes_not_found"], self.upload_session_id)
454  if self._testing and not(send_payloads_response):
455  return False
456 
457  """
458  Final stage - send metadata to server (since the payloads are there now)
459  if this is successful, once it finished the upload session is closed on the server and the tag lock is released.
460  """
461  try:
462 
463  # note that the response (in send_metadata_response) is already decoded from base64 by the response check decorator
464  send_metadata_response = self.send_metadata(self.upload_session_id)
465  no_error = self.check_response_for_error_key(send_metadata_response)
466  if not(no_error) and self._testing:
467  return False
468 
469  # we have to call this explicitly here since check_response_for_error_key only writes the log file
470  # if an error has occurred, whereas it should always be written here
472 
473  except errors.NoMoreRetriesException as no_more_retries:
474  return self.exit_upload("Ran out of retries trying to send metadata, where the limit was 3.")
475  except Exception as e:
476  # something went wrong that we have no specific exception for, so just exit and output the traceback if --debug is set.
477  self._outputter.write(traceback.format_exc(), ignore_verbose=True)
478 
479  if not(self._verbose):
480  self._outputter.write("Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
481  else:
482  self._outputter.write("Something went wrong that isn't handled by code - the traceback is above.")
483 
484  return self.exit_upload()
485 
486  # close client side log handle
487  self._handle.close()
488 
489  # if we're running the testing script, return True to say the upload has worked
490  if self._testing:
491  return True
492 
493  @check_response(check="json")
495  """
496  Open an upload session on the server, and get a unique token back that we can use to authenticate for all future requests,
497  as long as the upload session is still open.
498  """
499  self._outputter.write("Getting upload session.")
500 
501  # send password in the body so it can be encrypted over https
502  # username and password are taken from the netrc file
503  # at this point, the value in username_or_token is always a username, since
504  # this method's end result is obtaining a token.
505  body_data = base64.b64encode(json.dumps(
506  {
507  "destinationTag" : self.data_to_send["destinationTags"].keys()[0],
508  "username_or_token" : self.data_to_send["username"],
509  "password" : self.data_to_send["password"]
510  }
511  ))
512 
513  url_data = {"database" : self.data_to_send["destinationDatabase"]}
514 
515  query = url_query(url=self._SERVICE_URL + "get_upload_session/", body=body_data, url_data=url_data)
516  response = query.send()
517  return response
518 
519  @check_response(check="json")
520  def close_upload_session(self, upload_session_id):
521  """
522  Close an upload session on the server by calling its close_upload_session end-point.
523  This is done if there is an error on the client-side.
524  """
525  self._outputter.write("An error occurred - closing the upload session on the server.")
526  url_data = {"database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id}
527  query = url_query(url=self._SERVICE_URL + "close_upload_session/", url_data=url_data)
528  response = query.send()
529  return response
530 
531  @check_response(check="json")
532  def get_fcsr_from_server(self, upload_session_id):
533  """
534  Execute the HTTPs request to ask the server for the FCSR.
535 
536  Note: we do this in a separate function we so we can do the decoding check for json data with check_response.
537  """
538  # tiny amount of client-side logic here - all of the work is done on the server
539  url_data = {
540  "database" : self.data_to_send["destinationDatabase"],
541  "upload_session_id" : upload_session_id,
542  "destinationTag" : self.data_to_send["destinationTags"].keys()[0],
543  "sourceTagSync" : self.data_to_send["fcsr_filter"]
544  }
545  query = url_query(url=self._SERVICE_URL + "get_fcsr/", url_data=url_data)
546  result = query.send()
547  return result
548 
549  def filter_iovs_by_fcsr(self, upload_session_id):
550  """
551  Ask for the server for the FCSR based on the synchronization type of the source Tag.
552  Then, modify the IOVs (possibly remove some) based on the FCSR we received.
553  This is useful in the case that most IOVs have different payloads, and our FCSR is close to the end of the range the IOVs cover.
554  """
555  self._outputter.write("Getting the First Condition Safe Run for the current sync type.")
556 
557  fcsr_data = self.get_fcsr_from_server(upload_session_id)
558  fcsr = fcsr_data["fcsr"]
559  fcsr_changed = fcsr_data["fcsr_changed"]
560  new_sync = fcsr_data["new_sync"]
561 
562  if fcsr_changed:
563  self._outputter.write("Synchronization '%s' given was changed to '%s' to match destination Tag." % (self.data_to_send["fcsr_filter"], new_sync))
564 
565  self._outputter.write("Synchronization '%s' gave FCSR %d for FCSR Filtering."\
566  % (self.data_to_send["fcsr_filter"], friendly_since(self.data_to_send["inputTagData"]["time_type"], fcsr)))
567 
568  """
569  There may be cases where this assumption is not correct (that we can reassign since if fcsr > since)
570  Only set since to fcsr from server if the fcsr is further along than the user is trying to upload to
571  Note: this applies to run, lumi and timestamp run_types.
572  """
573 
574  # if the fcsr is above the since given by the user, we need to set the user since to the fcsr
575  if fcsr > self.data_to_send["since"]:
576  # check if we're uploading to offline sync - if so, then user since must be >= fcsr, so we should report an error
577  if self.data_to_send["fcsr_filter"].lower() == "offline":
578  self._outputter.write("If you're uploading to offline, you can't upload to a since < FCSR.\nNo upload has been processed.")
579  self.exit_upload()
580  self.data_to_send["since"] = fcsr
581 
582  self._outputter.write("Final FCSR after comparison with FCSR received from server is %d."\
583  % friendly_since(self.data_to_send["inputTagData"]["time_type"], int(self.data_to_send["since"])))
584 
585  """
586  Post validation processing assuming destination since is now valid.
587 
588  Because we don't have an sqlite database to query (everything's in a dictionary),
589  we have to go through the IOVs manually find the greatest since that's less than
590  the destination since.
591 
592  Purpose of this algorithm: move any IOV sinces that we can use up to the fcsr without leaving a hole in the Conditions coverage
593  """
594 
595  max_since_below_dest = self.data_to_send["iovs"][0]["since"]
596  for (i, iov) in enumerate(self.data_to_send["iovs"]):
597  if self.data_to_send["iovs"][i]["since"] <= self.data_to_send["since"] and self.data_to_send["iovs"][i]["since"] > max_since_below_dest:
598  max_since_below_dest = self.data_to_send["iovs"][i]["since"]
599 
600  # only select iovs that have sinces >= max_since_below_dest
601  # and then shift any IOVs left to the destination since
602  self.data_to_send["iovs"] = [iov for iov in self.data_to_send["iovs"] if iov["since"] >= max_since_below_dest]
603  for (i, iov) in enumerate(self.data_to_send["iovs"]):
604  if self.data_to_send["iovs"][i]["since"] < self.data_to_send["since"]:
605  self.data_to_send["iovs"][i]["since"] = self.data_to_send["since"]
606 
607  # modify insertion_time of iovs
608  new_time = to_timestamp(datetime.now())
609  for (i, iov) in enumerate(self.data_to_send["iovs"]):
610  self.data_to_send["iovs"][i]["insertion_time"] = new_time
611 
612  def get_all_hashes(self):
613  """
614  Get all the hashes from the dictionary of IOVs we have from the SQLite file.
615  """
616  self._outputter.write("\tGetting list of all hashes found in SQLite database.")
617  hashes = map(lambda iov : iov["payload_hash"], self.data_to_send["iovs"])
618  return hashes
619 
620  @check_response(check="json")
621  def get_hashes_to_send(self, upload_session_id):
622  """
623  Get the hashes of the payloads we want to send that the server doesn't have yet.
624  """
625  self._outputter.write("Getting list of hashes that the server does not have Payloads for, to send to server.")
626  post_data = json.dumps(self.get_all_hashes())
627  url_data = {"database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id}
628  query = url_query(url=self._SERVICE_URL + "check_hashes/", url_data=url_data, body=post_data)
629  response = query.send()
630  return response
631 
632  def send_payloads(self, hashes, upload_session_id):
633  """
634  Send a list of payloads corresponding to hashes we got from the SQLite file and filtered by asking the server.
635  """
636  # if we have no hashes, we can't send anything
637  # but don't exit since it might mean all the Payloads were already on the server
638  if len(hashes) == 0:
639  self._outputter.write("No hashes to send - moving to metadata upload.")
640  return True
641  else:
642  self._outputter.write("Sending payloads of hashes not found:")
643  # construct connection string for local SQLite database file
644  database = ("sqlite://%s" % os.path.abspath(self.sqlite_file_name)) if isinstance(self.sqlite_file_name, str) else self.sqlite_file_name
645  # create CondDBFW connection that maps blobs - as we need to query for payload BLOBs (disabled by default in CondDBFW)
646  self._outputter.write("\tConnecting to input SQLite database.")
647  con = querying.connect(database, map_blobs=True)
648 
649  # query for the Payloads
650  self._outputter.write("\tGetting Payloads from SQLite database based on list of hashes.")
651  payloads = con.payload(hash=hashes)
652  # if we get a single Payload back, put it in a list and turn it into a json_list
653  if payloads.__class__ != data_sources.json_list:
654  payloads = data_sources.json_data_node.make([payloads])
655 
656  # close the session with the SQLite database file - we won't use it again
657  con.close_session()
658 
659  # if found some Payloads, send them
660  if payloads:
661  # Note: there is an edge case in which the SQLite file could have been queried
662  # to delete the Payloads since we queried it for IOV hashes. This may be handled in the next iteration.
663  # send http post with data blob in body, and everything else as URL parameters
664  # convert Payload to a dictionary - we can put most of this into the URL of the HTTPs request
665  dicts = payloads.as_dicts()
666  self._outputter.write("Uploading Payload BLOBs:")
667 
668  # for each payload, send the BLOB to the server
669  for n, payload in enumerate(dicts):
670  self._outputter.write("\t(%d/%d) Sending payload with hash '%s'." % (n+1, len(dicts), payload["hash"]))
671  response = self.send_blob(payload, upload_session_id)
672  # check response for errors
673  no_error = self.check_response_for_error_key(response, exit_if_error=True)
674  if not(no_error):
675  return False
676  self._outputter.write("\tPayload sent - moving to next one.")
677  self._outputter.write("All Payloads uploaded.")
678  return True
679  else:
680  return False
681 
682  @check_response(check="json")
683  def send_blob(self, payload, upload_session_id):
684  """
685  Send the BLOB of a payload over HTTP.
686  The BLOB is put in the request body, so no additional processing has to be done on the server side, apart from decoding from base64.
687  """
688  # encode the BLOB data of the Payload to make sure we don't send a character that will influence the HTTPs request
689  blob_data = base64.b64encode(payload["data"])
690 
691  url_data = {"database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id}
692 
693  # construct the data to send in the body and header of the HTTPs request
694  for key in payload.keys():
695  # skip blob
696  if key != "data":
697  if key == "insertion_time":
698  url_data[key] = to_timestamp(payload[key])
699  else:
700  url_data[key] = payload[key]
701 
702  request = url_query(url=self._SERVICE_URL + "store_payload/", url_data=url_data, body=blob_data)
703 
704  # send the request and return the response
705  # Note - the url_query module will handle retries, and will throw a NoMoreRetriesException if it runs out
706  try:
707  request_response = request.send()
708  return request_response
709  except Exception as e:
710  # make sure we don't try again - if a NoMoreRetriesException has been thrown, retries have run out
711  if isinstance(e, errors.NoMoreRetriesException):
712  self._outputter.write("\t\t\tPayload with hash '%s' was not uploaded because the maximum number of retries was exceeded." % payload["hash"])
713  self._outputter.write("Payload with hash '%s' was not uploaded because the maximum number of retries was exceeded." % payload["hash"])
714  return json.dumps({"error" : str(e), "traceback" : traceback.format_exc()})
715 
716  @check_response(check="json")
717  def send_metadata(self, upload_session_id):
718  """
719  Final part of the upload process - send the Conditions metadata (Tag, IOVs - not upload metadata).
720  The server closes the session (and releases the tag lock) after processing has been completed.
721  """
722 
723  # set user text if it's empty
724  if self.data_to_send["userText"] in ["", None]:
725  self.data_to_send["userText"] = "Tag '%s' uploaded from CondDBFW client." % self.data_to_send["destinationTags"].keys()[0]
726 
727  self._outputter.write("Sending metadata to server - see server_side_log at server_side_logs/upload_log_%s for details on metadata processing on server side."\
728  % self.upload_session_id)
729 
730  # sent the HTTPs request to the server
731  url_data = {"database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id}
732  request = url_query(url=self._SERVICE_URL + "upload_metadata/", url_data=url_data, body=json.dumps(self.data_to_send))
733  response = request.send()
734  self._outputter.write("Response received - conditions upload process complete.")
735  return response
736 
737 if __name__ == "__main__":
738  """
739  This code should only be executed for testing.
740  """
741  import sys
742  from uploadConditions import parse_arguments
743 
744  print(
745 """
746 This code should only be executed for testing.
747 Any uploads done by the user should be done by calling the uploadConditions.py script.
748 See https://cms-conddb-dev.cern.ch/cmsDbCondUpload for information on how to obtain the correct version.
749 """
750  )
751 
752  upload_metadata = parse_arguments()
753 
754  upload_metadata["sqlite_file"] = upload_metadata.get("sourceDB")
755 
756  # make new dictionary, and copy over everything except "metadata_source"
757  upload_metadata_argument = {}
758  for (key, value) in upload_metadata.items():
759  if key != "metadata_source":
760  upload_metadata_argument[key] = value
761 
762  upload_metadata["metadata_source"] = data_sources.json_data_node.make(upload_metadata_argument)
763 
764  upload_controller = uploader(**upload_metadata)
765 
766  result = upload_controller.upload()
def to_timestamp(obj)
Definition: utils.py:6
def filter_iovs_by_fcsr(self, upload_session_id)
Definition: uploads.py:549
def __init__(self, log_handle=None, verbose=False)
Definition: uploads.py:64
def get_upload_session_id(self)
Definition: uploads.py:494
hashes_with_no_local_payload
Definition: uploads.py:182
def connect(connection_data, mode="r", map_blobs=False, secrets=None, pooling=True)
Definition: querying.py:450
def check_response_for_error_key(self, response_dict, exit_if_error=True)
Definition: uploads.py:248
def check_response(check="json")
Definition: errors.py:27
def send_payloads(self, hashes, upload_session_id)
Definition: uploads.py:632
S & print(S &os, JobReport::InputFile const &f)
Definition: JobReport.cc:65
def get_fcsr_from_server(self, upload_session_id)
Definition: uploads.py:532
def __init__(self, metadata_source=None, debug=False, verbose=False, testing=False, server="https://cms-conddb-dev.cern.ch/cmsDbCondUpload/", kwargs)
Definition: uploads.py:86
def friendly_since(time_type, since)
Definition: uploads.py:28
def write_server_side_log(self, log_data)
Definition: uploads.py:284
def log(file_handle, message)
Definition: uploads.py:39
current_output_length
Definition: uploads.py:66
def exit_upload(self, message=None)
Definition: uploads.py:318
def close_upload_session(self, upload_session_id)
Definition: uploads.py:520
def write(self, message="", ignore_verbose=False)
Definition: uploads.py:70
def get_all_hashes(self)
Definition: uploads.py:612
def get_hashes_to_send(self, upload_session_id)
Definition: uploads.py:621
def get_tag_dictionary(self)
Definition: uploads.py:242
def upload(self)
Definition: uploads.py:340
def send_blob(self, payload, upload_session_id)
Definition: uploads.py:683
def new_log_file_id()
Definition: uploads.py:45
char data[epos_bytes_allocation]
Definition: EPOS_Wrapper.h:82
#define str(s)
def send_metadata(self, upload_session_id)
Definition: uploads.py:717
T get(const Candidate &c)
Definition: component.h:55
How EventSelector::AcceptEvent() decides whether to accept an event for output otherwise it is excluding the probing of A single or multiple positive and the trigger will pass if any such matching triggers are PASS or EXCEPTION[A criterion thatmatches no triggers at all is detected and causes a throw.] A single negative with an expectation of appropriate bit checking in the decision and the trigger will pass if any such matching triggers are FAIL or EXCEPTION A wildcarded negative criterion that matches more than one trigger in the trigger list("!*","!HLTx*"if it matches 2 triggers or more) will accept the event if all the matching triggers are FAIL.It will reject the event if any of the triggers are PASS or EXCEPTION(this matches the behavior of"!*"before the partial wildcard feature was incorporated).Triggers which are in the READY state are completely ignored.(READY should never be returned since the trigger paths have been run