connection class translates either a connection string for sqlite, oracle of frontier into a connection object.
Also sets up ORM with SQLAlchemy.
connection class can also take a pre-constructed engine - useful for web services.
def querying.connect |
( |
|
connection_data, |
|
|
|
mode = "r" , |
|
|
|
map_blobs = False , |
|
|
|
secrets = None , |
|
|
|
pooling = True |
|
) |
| |
Utility method for user - set up a connection object.
Definition at line 453 of file querying.py.
Referenced by shell.connect(), command_line.copy_global_tag(), command_line.copy_tag(), command_line.diff_of_gts(), command_line.diff_of_tags(), command_line.list_object(), command_line.search(), uploads.uploader.send_payloads(), and payload_tests.payload_tests.test_write_blob_to_sqlite().
453 def connect(connection_data, mode="r", map_blobs=False, secrets=None, pooling=True):
455 Utility method for user - set up a connection object. 457 con = connection(connection_data=connection_data, mode=mode, map_blobs=map_blobs, secrets=secrets, pooling=pooling)
460 def connect(connection_data, mode="r", map_blobs=False, secrets=None, pooling=True)
◆ engine_from_dictionary()
def querying.engine_from_dictionary |
( |
|
dictionary, |
|
|
|
pooling = True |
|
) |
| |
Definition at line 429 of file querying.py.
Referenced by querying.connection.setup().
430 if dictionary[
"host"] !=
"sqlite":
431 if dictionary[
"host"] !=
"frontier":
434 user = dictionary[
"secrets"][
"login"]
435 pwd = dictionary[
"secrets"][
"password"]
438 return create_engine(connection.build_oracle_url(user, pwd, dictionary[
"database_name"]), label_length=6)
440 return create_engine(connection.build_oracle_url(user, pwd, dictionary[
"database_name"]), label_length=6, poolclass=NullPool)
445 return create_engine(connection.build_frontier_url(dictionary[
"database_name"], dictionary[
"schema"]), label_length=6)
447 return create_engine(connection.build_frontier_url(dictionary[
"database_name"], dictionary[
"schema"]), label_length=6, poolclass=NullPool)
450 return create_engine(
"sqlite:///%s" % dictionary[
"database_name"])
def engine_from_dictionary(dictionary, pooling=True)
◆ new_connection_dictionary()
def querying.new_connection_dictionary |
( |
|
connection_data, |
|
|
|
secrets = None , |
|
|
|
mode = "r" |
|
) |
| |
Function used to construct connection data dictionaries - internal to framework.
Definition at line 350 of file querying.py.
References _get_netrc_data(), input, submitPVValidationJobs.split(), and str.
352 Function used to construct connection data dictionaries - internal to framework. 354 frontier_str_length = len(
"frontier://")
355 sqlite_str_length = len(
"sqlite://")
357 oracle_str_length = len(
"oracle://")
359 if type(connection_data)
in [str, str]
and connection_data[0:frontier_str_length] ==
"frontier://":
361 frontier://database_name/schema 363 db_name = connection_data[frontier_str_length:].
split(
"/")[0]
364 schema = connection_data[frontier_str_length:].
split(
"/")[1]
366 connection_data[
"database_name"] = db_name
367 connection_data[
"schema"] = schema
368 connection_data[
"host"] =
"frontier" 369 connection_data[
"secrets"] =
None 370 elif type(connection_data)
in [str, str]
and connection_data[0:sqlite_str_length] ==
"sqlite://":
372 sqlite://database_file_name 375 db_name = connection_data[sqlite_str_length:]
378 connection_data[
"database_name"] = os.path.abspath(db_name)
379 connection_data[
"schema"] = schema
380 connection_data[
"host"] =
"sqlite" 381 connection_data[
"secrets"] =
None 382 elif type(connection_data)
in [str, str]
and connection_data[0:oracle_str_length] ==
"oracle://":
384 oracle://account:password@database_name 386 oracle://database_name/schema (requires a separate method of authentication - either dictionary or netrc) 388 new_connection_string = connection_data[oracle_str_length:]
390 if ":" in new_connection_string:
392 database_name = new_connection_string[new_connection_string.index(
"@")+1:]
393 schema_name = new_connection_string[0:new_connection_string.index(
":")]
395 username = new_connection_string[0:new_connection_string.index(
":")]
396 password = new_connection_string[new_connection_string.index(
":")+1:new_connection_string.index(
"@")]
398 mode_to_netrc_key_suffix = {
"r" : "read", "w" : "write"} database_name = new_connection_string[0:new_connection_string.index(
"/")]
399 schema_name = new_connection_string[new_connection_string.index(
"/")+1:]
401 username =
str(
input(
"Enter the username you want to connect to the schema '%s' with: " % (schema_name)))
402 password =
str(
input(
"Enter the password for the user '%s' in database '%s': " % (username, database_name)))
404 if type(secrets) == str:
405 netrc_key =
"%s/%s/%s" % (database_name, schema_name, mode_to_netrc_key_suffix[mode])
409 username = netrc_data[
"login"]
410 password = netrc_data[
"password"]
411 elif type(secrets) == dict:
412 username = secrets[
"user"]
413 password = secrets[
"password"]
415 raise Exception(
"Invalid type given for secrets. Either an str or a dict must be given.")
420 connection_data[
"database_name"] = database_name
421 connection_data[
"schema"] = schema_name
422 connection_data[
"password"] = password
423 connection_data[
"host"] =
"oracle" 424 connection_data[
"secrets"] = {
"login" : username,
"password" : password}
426 return connection_data
428 static std::string const input
def split(sequence, size)
def new_connection_dictionary(connection_data, secrets=None, mode="r")
def _get_netrc_data(netrc_file, key)