connection class translates either a connection string for sqlite, oracle of frontier into a connection object.
Also sets up ORM with SQLAlchemy.
connection class can also take a pre-constructed engine - useful for web services.
def querying.connect |
( |
|
connection_data, |
|
|
|
mode = "r" , |
|
|
|
map_blobs = False , |
|
|
|
secrets = None , |
|
|
|
pooling = True |
|
) |
| |
Utility method for user - set up a connection object.
Definition at line 450 of file querying.py.
Referenced by shell.connect(), command_line.copy_global_tag(), command_line.copy_tag(), command_line.diff_of_gts(), command_line.diff_of_tags(), command_line.list_object(), command_line.search(), uploads.uploader.send_payloads(), and payload_tests.payload_tests.test_write_blob_to_sqlite().
450 def connect(connection_data, mode="r", map_blobs=False, secrets=None, pooling=True):
452 Utility method for user - set up a connection object. 454 con =
connection(connection_data=connection_data, mode=mode, map_blobs=map_blobs, secrets=secrets, pooling=pooling)
456 return con
def connect(connection_data, mode="r", map_blobs=False, secrets=None, pooling=True)
def querying.engine_from_dictionary |
( |
|
dictionary, |
|
|
|
pooling = True |
|
) |
| |
Definition at line 426 of file querying.py.
427 if dictionary[
"host"] !=
"sqlite":
428 if dictionary[
"host"] !=
"frontier":
431 user = dictionary[
"secrets"][
"login"]
432 pwd = dictionary[
"secrets"][
"password"]
435 return create_engine(connection.build_oracle_url(user, pwd, dictionary[
"database_name"]), label_length=6)
437 return create_engine(connection.build_oracle_url(user, pwd, dictionary[
"database_name"]), label_length=6, poolclass=NullPool)
442 return create_engine(connection.build_frontier_url(dictionary[
"database_name"], dictionary[
"schema"]), label_length=6)
444 return create_engine(connection.build_frontier_url(dictionary[
"database_name"], dictionary[
"schema"]), label_length=6, poolclass=NullPool)
447 return create_engine(
"sqlite:///%s" % dictionary[
"database_name"])
def engine_from_dictionary(dictionary, pooling=True)
def querying.new_connection_dictionary |
( |
|
connection_data, |
|
|
|
secrets = None , |
|
|
|
mode = "r" |
|
) |
| |
Function used to construct connection data dictionaries - internal to framework.
Definition at line 347 of file querying.py.
References _get_netrc_data(), split, and harvestTrackValidationPlots.str.
349 Function used to construct connection data dictionaries - internal to framework. 351 frontier_str_length = len(
"frontier://")
352 sqlite_str_length = len(
"sqlite://")
354 oracle_str_length = len(
"oracle://")
356 if type(connection_data)
in [str, unicode]
and connection_data[0:frontier_str_length] ==
"frontier://":
358 frontier://database_name/schema 360 db_name = connection_data[frontier_str_length:].
split(
"/")[0]
361 schema = connection_data[frontier_str_length:].
split(
"/")[1]
363 connection_data[
"database_name"] = db_name
364 connection_data[
"schema"] = schema
365 connection_data[
"host"] =
"frontier" 366 connection_data[
"secrets"] =
None 367 elif type(connection_data)
in [str, unicode]
and connection_data[0:sqlite_str_length] ==
"sqlite://":
369 sqlite://database_file_name 372 db_name = connection_data[sqlite_str_length:]
375 connection_data[
"database_name"] = os.path.abspath(db_name)
376 connection_data[
"schema"] = schema
377 connection_data[
"host"] =
"sqlite" 378 connection_data[
"secrets"] =
None 379 elif type(connection_data)
in [str, unicode]
and connection_data[0:oracle_str_length] ==
"oracle://":
381 oracle://account:password@database_name 383 oracle://database_name/schema (requires a separate method of authentication - either dictionary or netrc) 385 new_connection_string = connection_data[oracle_str_length:]
387 if ":" in new_connection_string:
389 database_name = new_connection_string[new_connection_string.index(
"@")+1:]
390 schema_name = new_connection_string[0:new_connection_string.index(
":")]
392 username = new_connection_string[0:new_connection_string.index(
":")]
393 password = new_connection_string[new_connection_string.index(
":")+1:new_connection_string.index(
"@")]
395 mode_to_netrc_key_suffix = {
"r" : "read", "w" : "write"}
396 database_name = new_connection_string[0:new_connection_string.index(
"/")]
397 schema_name = new_connection_string[new_connection_string.index(
"/")+1:]
399 username =
str(raw_input(
"Enter the username you want to connect to the schema '%s' with: " % (schema_name)))
400 password =
str(raw_input(
"Enter the password for the user '%s' in database '%s': " % (username, database_name)))
402 if type(secrets) == str:
403 netrc_key =
"%s/%s/%s" % (database_name, schema_name, mode_to_netrc_key_suffix[mode])
407 username = netrc_data[
"login"]
408 password = netrc_data[
"password"]
409 elif type(secrets) == dict:
410 username = secrets[
"user"]
411 password = secrets[
"password"]
413 raise Exception(
"Invalid type given for secrets. Either an str or a dict must be given.")
418 connection_data[
"database_name"] = database_name
419 connection_data[
"schema"] = schema_name
420 connection_data[
"password"] = password
421 connection_data[
"host"] =
"oracle" 422 connection_data[
"secrets"] = {
"login" : username,
"password" : password}
424 return connection_data
def new_connection_dictionary(connection_data, secrets=None, mode="r")
def _get_netrc_data(netrc_file, key)