aind_data_access_api package¶
Subpackages¶
Submodules¶
aind_data_access_api.credentials module¶
Module to manage credentials to connect to databases.
- class aind_data_access_api.credentials.AWSConfigSettingsSource(settings_cls, config_file_location)¶
Bases:
JsonConfigSettingsSourceClass that parses from aws secrets manager.
- class aind_data_access_api.credentials.CoreCredentials(_case_sensitive: bool | None = None, _nested_model_default_partial_update: bool | None = None, _env_prefix: str | None = None, _env_file: DotenvType | None = PosixPath('.'), _env_file_encoding: str | None = None, _env_ignore_empty: bool | None = None, _env_nested_delimiter: str | None = None, _env_nested_max_split: int | None = None, _env_parse_none_str: str | None = None, _env_parse_enums: bool | None = None, _cli_prog_name: str | None = None, _cli_parse_args: bool | list[str] | tuple[str, ...] | None = None, _cli_settings_source: CliSettingsSource[Any] | None = None, _cli_parse_none_str: str | None = None, _cli_hide_none_type: bool | None = None, _cli_avoid_json: bool | None = None, _cli_enforce_required: bool | None = None, _cli_use_class_docs_for_groups: bool | None = None, _cli_exit_on_error: bool | None = None, _cli_prefix: str | None = None, _cli_flag_prefix_char: str | None = None, _cli_implicit_flags: bool | None = None, _cli_ignore_unknown_args: bool | None = None, _cli_kebab_case: bool | None = None, _secrets_dir: PathType | None = None, *, aws_secrets_name: str | None = None, username: str, password: SecretStr, host: str, port: int, database: str | None = None)¶
Bases:
BaseSettingsCore credentials for most of our databases.
- aws_secrets_name: str | None¶
- database: str | None¶
- host: str¶
- model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'cli_avoid_json': False, 'cli_enforce_required': False, 'cli_exit_on_error': True, 'cli_flag_prefix_char': '-', 'cli_hide_none_type': False, 'cli_ignore_unknown_args': False, 'cli_implicit_flags': False, 'cli_kebab_case': False, 'cli_parse_args': None, 'cli_parse_none_str': None, 'cli_prefix': '', 'cli_prog_name': None, 'cli_use_class_docs_for_groups': False, 'enable_decoding': True, 'env_file': None, 'env_file_encoding': None, 'env_ignore_empty': False, 'env_nested_delimiter': None, 'env_nested_max_split': None, 'env_parse_enums': None, 'env_parse_none_str': None, 'env_prefix': '', 'extra': 'forbid', 'json_file': None, 'json_file_encoding': None, 'nested_model_default_partial_update': False, 'protected_namespaces': ('model_validate', 'model_dump', 'settings_customise_sources'), 'secrets_dir': None, 'toml_file': None, 'validate_default': True, 'yaml_file': None, 'yaml_file_encoding': None}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- password: SecretStr¶
- port: int¶
- classmethod settings_customise_sources(settings_cls: Type[BaseSettings], init_settings: InitSettingsSource, env_settings: EnvSettingsSource, dotenv_settings: PydanticBaseSettingsSource, file_secret_settings: PydanticBaseSettingsSource) Tuple[PydanticBaseSettingsSource, ...]¶
Method to pull configs from a variety sources, such as a file or aws. Arguments are required and set by pydantic.
- Parameters:
settings_cls (Type[BaseSettings]) – Top level class. Model fields can be pulled from this.
init_settings (InitSettingsSource) – The settings in the init arguments.
env_settings (EnvSettingsSource) – The settings pulled from environment variables.
dotenv_settings (PydanticBaseSettingsSource) – Settings from .env files. Currently, not supported.
file_secret_settings (PydanticBaseSettingsSource) – Settings from secret files such as used in Docker. Currently, not supported.
- Return type:
Tuple[PydanticBaseSettingsSource, …]
- username: str¶
- class aind_data_access_api.credentials.JsonConfigSettingsSource(settings_cls, config_file_location)¶
Bases:
PydanticBaseSettingsSource,ABCAbstract base class for settings that parse json
- get_field_value(field: FieldInfo, field_name: str) Tuple[Any, str, bool]¶
Gets the value, the key for model creation, and a flag to determine whether value is complex. :param field: The field :type field: FieldInfo :param field_name: The field name :type field_name: str
- Returns:
A tuple contains the key, value and a flag to determine whether value is complex.
- Return type:
Tuple[Any, str, bool]
- prepare_field_value(field_name: str, field: FieldInfo, value: Any, value_is_complex: bool) Any¶
Prepares the value of a field. :param field_name: The field name :type field_name: str :param field: The field :type field: FieldInfo :param value: The value of the field that has to be prepared :type value: Any :param value_is_complex: A flag to determine whether value is complex :type value_is_complex: bool
- Returns:
The prepared value
- Return type:
Any
aind_data_access_api.document_db module¶
Module to interface with the DocumentDB
- class aind_data_access_api.document_db.AnalysisDbClient(host: str, collection: str, database: str = 'analysis', version: str = 'v1', boto: Session | None = None, session: Session | None = None)¶
Bases:
ClientClass to manage reading and writing to analysis db
- class aind_data_access_api.document_db.Client(host: str, database: str, collection: str, version: str = 'v1', boto: Session | None = None, session: Session | None = None)¶
Bases:
objectClass to create client to interface with DocumentDB via a REST api
- aggregate_docdb_records(pipeline: List[dict]) List[dict]¶
Aggregate records using an aggregation pipeline.
- property boto: Session¶
Boto3 session
- close()¶
Close the clients.
- delete_many_records(data_asset_record_ids: List[str]) Response¶
Delete many records by their ids
- delete_one_record(data_asset_record_id: str) Response¶
Delete one record by id
- fetch_records_by_filter_list(filter_key: str, filter_values: List[str], projection: dict | None = None) List[dict]¶
Queries DocDB for records where the value of a specified field is in a list of values. Uses an aggregation pipeline with $in filter operator.
- Parameters:
filter_key (str) – The field to filter on.
filter_values (List[str]) – The list of values to filter on.
projection (Optional[dict]) – Subset of fields to return. Default is None which returns all fields.
- Return type:
List[dict]
- insert_one_docdb_record(record: dict) Response¶
Insert one new record
- retrieve_docdb_records(filter_query: dict | None = None, projection: dict | None = None, sort: dict | None = None, limit: int = 0, paginate: bool | None = None, paginate_batch_size: int | None = None, paginate_max_iterations: int | None = None) List[dict]¶
Retrieve raw json records from DocDB API Gateway as a list of dicts. Queries to the API Gateway are paginated.
- Parameters:
filter_query (Optional[dict]) – Filter to apply to the records being returned. Default is None.
projection (Optional[dict]) – Subset of document fields to return. Default is None.
sort (Optional[dict]) – Sort records when returned. Default is None.
limit (int) – Return a smaller set of records. 0 for all records. Default is 0.
paginate (bool) – (deprecated) If set to true, will batch the queries to the API Gateway.
paginate_batch_size (int) – (deprecated) Number of records to return at a time. Default is 500.
paginate_max_iterations (int) – (deprecated) Max number of iterations to run to prevent indefinite calls to the API Gateway. Default is 20000.
- Return type:
List[dict]
- property session: Session¶
Requests session
- upsert_list_of_docdb_records(records: List[dict], max_payload_size: int = 5000000.0) List[Response]¶
Upsert a list of records. There’s a limit to the size of the request that can be sent, so we chunk the requests.
- Parameters:
records (List[dict]) – List of records to upsert into the DocDB database
max_payload_size (int) – Chunk requests into smaller lists no bigger than this value in bytes. If a single record is larger than this value in bytes, an attempt will be made to upsert the record but will most likely receive a 413 status code. The Default is 2e6 bytes. The max payload for the API Gateway including headers is 10MB.
- Returns:
A list of responses from the API Gateway.
- Return type:
List[Response]
- upsert_one_docdb_record(record: dict) Response¶
Upsert one record if the record is not corrupt
- class aind_data_access_api.document_db.MetadataDbClient(host: str, database: str = 'metadata_index', collection: str = 'data_assets', version: str = 'v1', boto: Session | None = None, session: Session | None = None)¶
Bases:
ClientClass to manage reading and writing to metadata db
- add_qc_evaluation(data_asset_id: str, qc_contents: Dict[str, Any]) Dict[str, Any]¶
Add one or more QC evaluations (or other QC content) to a data asset.
- deregister_asset(s3_location: str) Dict[str, Any]¶
De-register (delete) a data asset from Code Ocean and remove its metadata from DocDB given that the asset and its metadata are located at the provided S3 location.
- Parameters:
s3_location (str) – The S3 location containing the asset and metadata to be removed.
- Returns:
The response from the deregistration API, including deregistration status and details.
- Return type:
Dict[str, Any]
- generate_data_summary(record_id: str) Dict[str, Any]¶
Get an LLM-generated summary for a data asset with the given record id.
- register_asset(s3_location: str) Dict[str, Any]¶
Register a data asset to Code Ocean and add its metadata to DocDB given the metadata exists at the top level of the provided S3 location.
- Parameters:
s3_location (str) – The S3 location containing the asset and its metadata.
- Returns:
The response from the registration API, including registration status and details.
- Return type:
Dict[str, Any]
- register_co_result(s3_location: str, name: str, co_asset_id: str, co_computation_id: str) Dict[str, Any]¶
Register a Code Ocean result asset and add its metadata to DocDB given the metadata exists at the top level of the Code Ocean computation result.
- Parameters:
s3_location (str) – The S3 location containing the result asset and its metadata.
name (str) – The name of the result asset.
co_asset_id (str) – The Code Ocean asset ID for the result.
co_computation_id (str) – The Code Ocean computation ID associated with the result.
- Returns:
The response from the registration API, including registration status and details.
- Return type:
Dict[str, Any]
- class aind_data_access_api.document_db.SchemaDbClient(host: str, collection: str, database: str = 'schemas', version: str = 'v1', boto: Session | None = None, session: Session | None = None)¶
Bases:
ClientClass to manage reading and writing to schemas db
- retrieve_schema_records(schema_version: str | None = None, projection: dict | None = None, sort: dict | None = None, limit: int = 0) List[dict]¶
Retrieve schemas records from DocDB API Gateway as a list of dicts.
- Parameters:
schema_version (Optional[str]) – Schema version to use as a filter_query. Default is None.
projection (Optional[dict]) – Subset of document fields to return. Default is None.
sort (Optional[dict]) – Sort records when returned. Default is None.
limit (int) – Return a smaller set of records. 0 for all records. Default is 0.
- Return type:
List[dict]
aind_data_access_api.document_db_ssh module¶
Module to interface with the Document Database using SSH tunneling.
- class aind_data_access_api.document_db_ssh.DocumentDbSSHClient(credentials: DocumentDbSSHCredentials)¶
Bases:
objectClass to establish a Document Store client with SSH tunneling.
- close()¶
Close the client and SSH tunnel.
- property collection¶
Collection of metadata records in Document Database.
- start()¶
Start the client and SSH tunnel.
- class aind_data_access_api.document_db_ssh.DocumentDbSSHCredentials(_case_sensitive: bool | None = None, _nested_model_default_partial_update: bool | None = None, _env_prefix: str | None = None, _env_file: DotenvType | None = PosixPath('.'), _env_file_encoding: str | None = None, _env_ignore_empty: bool | None = None, _env_nested_delimiter: str | None = None, _env_nested_max_split: int | None = None, _env_parse_none_str: str | None = None, _env_parse_enums: bool | None = None, _cli_prog_name: str | None = None, _cli_parse_args: bool | list[str] | tuple[str, ...] | None = None, _cli_settings_source: CliSettingsSource[Any] | None = None, _cli_parse_none_str: str | None = None, _cli_hide_none_type: bool | None = None, _cli_avoid_json: bool | None = None, _cli_enforce_required: bool | None = None, _cli_use_class_docs_for_groups: bool | None = None, _cli_exit_on_error: bool | None = None, _cli_prefix: str | None = None, _cli_flag_prefix_char: str | None = None, _cli_implicit_flags: bool | None = None, _cli_ignore_unknown_args: bool | None = None, _cli_kebab_case: bool | None = None, _secrets_dir: PathType | None = None, *, aws_secrets_name: str | None = None, username: str, password: SecretStr, host: str, port: int = 27017, database: str = 'metadata_index', collection: str = 'data_assets', ssh_local_bind_address: str = 'localhost', ssh_host: str, ssh_port: int = 22, ssh_username: str, ssh_password: SecretStr)¶
Bases:
CoreCredentialsDocument Store credentials with SSH tunneling.
- collection: str¶
- database: str¶
- classmethod from_secrets_manager(doc_db_secret_name: str, ssh_secret_name: str)¶
Construct class from AWS Secrets Manager
- Parameters:
doc_db_secret_name (str) – The name of the secret that contains the document store credentials (host, port, username, password).
ssh_secret_name (str) – The name of the secret that contains the ssh credentials (host, username, password).
- host: str¶
- model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'cli_avoid_json': False, 'cli_enforce_required': False, 'cli_exit_on_error': True, 'cli_flag_prefix_char': '-', 'cli_hide_none_type': False, 'cli_ignore_unknown_args': False, 'cli_implicit_flags': False, 'cli_kebab_case': False, 'cli_parse_args': None, 'cli_parse_none_str': None, 'cli_prefix': '', 'cli_prog_name': None, 'cli_use_class_docs_for_groups': False, 'enable_decoding': True, 'env_file': None, 'env_file_encoding': None, 'env_ignore_empty': False, 'env_nested_delimiter': None, 'env_nested_max_split': None, 'env_parse_enums': None, 'env_parse_none_str': None, 'env_prefix': 'DOC_DB_', 'extra': 'ignore', 'json_file': None, 'json_file_encoding': None, 'nested_model_default_partial_update': False, 'protected_namespaces': ('model_validate', 'model_dump', 'settings_customise_sources'), 'secrets_dir': None, 'toml_file': None, 'validate_default': True, 'yaml_file': None, 'yaml_file_encoding': None}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- password: SecretStr¶
- port: int¶
- ssh_host: str¶
- ssh_local_bind_address: str¶
- ssh_password: SecretStr¶
- ssh_port: int¶
- ssh_username: str¶
- username: str¶
aind_data_access_api.rds_tables module¶
Module to interface with the Relational Database
- class aind_data_access_api.rds_tables.Client(credentials: RDSCredentials, drivername: str | None = 'postgresql')¶
Bases:
objectClass to establish a relational database client. Includes methods to read/write pandas dataframes to backend.
- append_df_to_table(df: DataFrame, table_name: str, dtype: dict | str | None = None) None¶
Append a dataframe to an existing table. :param df: :type df: pd.Dataframe :param table_name: :type table_name: str :param dtype: :type dtype: Optional[Union[dict, str]]
- Return type:
None
- execute_query(query: str) CursorResult¶
Run a sql query against the database :param query: :type query: str
- Returns:
The result of the query.
- Return type:
CursorResult
- overwrite_table_with_df(df: DataFrame, table_name: str, dtype: dict | str | None = None) None¶
Overwrite an existing table with a dataframe. :param df: :type df: pd.Dataframe :param table_name: :type table_name: str :param dtype: :type dtype: Optional[Union[dict, str]]
- Return type:
None
- read_table(table_name: str, where_clause: str | None = None) DataFrame¶
Import sql table as a pandas dataframe.
- Parameters:
table_name (str) –
where_clause (Optional[str]) – If None, this method will pull the entire table. The user can set a custom clause if additional filtering is desired. Default is None.
- Returns:
A pandas dataframe created from the sql table.
- Return type:
pd.Dataframe
- class aind_data_access_api.rds_tables.RDSCredentials(_case_sensitive: bool | None = None, _nested_model_default_partial_update: bool | None = None, _env_prefix: str | None = None, _env_file: DotenvType | None = PosixPath('.'), _env_file_encoding: str | None = None, _env_ignore_empty: bool | None = None, _env_nested_delimiter: str | None = None, _env_nested_max_split: int | None = None, _env_parse_none_str: str | None = None, _env_parse_enums: bool | None = None, _cli_prog_name: str | None = None, _cli_parse_args: bool | list[str] | tuple[str, ...] | None = None, _cli_settings_source: CliSettingsSource[Any] | None = None, _cli_parse_none_str: str | None = None, _cli_hide_none_type: bool | None = None, _cli_avoid_json: bool | None = None, _cli_enforce_required: bool | None = None, _cli_use_class_docs_for_groups: bool | None = None, _cli_exit_on_error: bool | None = None, _cli_prefix: str | None = None, _cli_flag_prefix_char: str | None = None, _cli_implicit_flags: bool | None = None, _cli_ignore_unknown_args: bool | None = None, _cli_kebab_case: bool | None = None, _secrets_dir: PathType | None = None, *, aws_secrets_name: str | None = None, username: str, password: SecretStr, host: str, port: int = 5432, database: str | None = None, dbname: str | None = None)¶
Bases:
CoreCredentialsRDS db credentials
- dbname: str | None¶
- host: str¶
- model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'cli_avoid_json': False, 'cli_enforce_required': False, 'cli_exit_on_error': True, 'cli_flag_prefix_char': '-', 'cli_hide_none_type': False, 'cli_ignore_unknown_args': False, 'cli_implicit_flags': False, 'cli_kebab_case': False, 'cli_parse_args': None, 'cli_parse_none_str': None, 'cli_prefix': '', 'cli_prog_name': None, 'cli_use_class_docs_for_groups': False, 'enable_decoding': True, 'env_file': None, 'env_file_encoding': None, 'env_ignore_empty': False, 'env_nested_delimiter': None, 'env_nested_max_split': None, 'env_parse_enums': None, 'env_parse_none_str': None, 'env_prefix': 'RDS_', 'extra': 'forbid', 'json_file': None, 'json_file_encoding': None, 'nested_model_default_partial_update': False, 'protected_namespaces': ('model_validate', 'model_dump', 'settings_customise_sources'), 'secrets_dir': None, 'toml_file': None, 'validate_default': True, 'yaml_file': None, 'yaml_file_encoding': None}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- password: SecretStr¶
- port: int¶
- username: str¶
- validate_database_name() Self¶
Sets database to db_name
aind_data_access_api.secrets module¶
Module to access secrets and parameters
- aind_data_access_api.secrets.get_parameter(parameter_name: str, with_decryption=False) str¶
Retrieves a parameter from AWS Parameter Store.
param parameter_name: The name of the parameter to retrieve.
- aind_data_access_api.secrets.get_secret(secret_name: str) dict¶
Retrieves a secret from AWS Secrets Manager.
param secret_name: The name of the secret to retrieve.
aind_data_access_api.utils module¶
Package for common methods used for interfacing with DocDB.
- aind_data_access_api.utils.build_docdb_location_to_id_map(docdb_api_client: MetadataDbClient, bucket: str, prefixes: List[str]) Dict[str, str]¶
For a given s3 bucket and list of prefixes, return a dictionary that looks like {‘s3://bucket/prefix’: ‘abc-1234’} where the value is the id of the record in DocDb. If the record does not exist, then there will be no key in the dictionary.
- Parameters:
docdb_api_client (MetadataDbClient) –
bucket (str) –
prefixes (List[str]) –
- Return type:
Dict[str, str]
- aind_data_access_api.utils.does_metadata_record_exist_in_docdb(docdb_api_client: MetadataDbClient, bucket: str, prefix: str) bool¶
For a given bucket and prefix, check if there is already a record in DocDb
- Parameters:
docdb_api_client (MetadataDbClient) –
bucket (str) –
prefix (str) –
- Return type:
True if there is a record in DocDb. Otherwise, False.
- aind_data_access_api.utils.fetch_records_by_filter_list(docdb_api_client: MetadataDbClient, filter_key: str, filter_values: List[str], projection: dict | None = None) List[dict]¶
Queries DocDB for records where the value of a specified field is in a list of values. Uses an aggregation pipeline with $in filter operator.
- Parameters:
docdb_api_client (MetadataDbClient) –
filter_key (str) – The field to filter on.
filter_values (List[str]) – The list of values to filter on.
projection (Optional[dict]) – Subset of fields to return. Default is None which returns all fields.
- Return type:
List[dict]
- aind_data_access_api.utils.get_record_from_docdb(docdb_api_client: MetadataDbClient, record_id: str) dict | None¶
Download a record from docdb using the record _id.
- Parameters:
docdb_api_client (MetadataDbClient) –
record_id (str) –
- Returns:
None if record does not exist. Otherwise, it will return the record as a dict.
- Return type:
Optional[dict]
- aind_data_access_api.utils.get_s3_bucket_and_prefix(s3_location: str) Dict[str, str]¶
For a location url like s3://bucket/prefix, it will return the bucket and prefix. It doesn’t check the scheme is s3. It will strip the leading and trailing forward slashes from the prefix.
- Parameters:
s3_location (str) – For example, ‘s3://some_bucket/some_prefix’
- Returns:
For example, {‘bucket’: ‘some_bucket’, ‘prefix’: ‘some_prefix’}
- Return type:
Dict[str, str]
- aind_data_access_api.utils.get_s3_location(bucket: str, prefix: str) str¶
For a given bucket and prefix, return a location url in format s3://{bucket}/{prefix}
- Parameters:
bucket (str) –
prefix (str) –
- Returns:
For example, ‘s3://some_bucket/some_prefix’
- Return type:
str
- aind_data_access_api.utils.paginate_docdb(docdb_api_client: MetadataDbClient, page_size: int = 500, filter_query: dict | None = None, projection: dict | None = None) Iterator[List[dict]]¶
Paginate through records in DocDb.
- Parameters:
docdb_api_client (MetadataDbClient) –
page_size (int) – Default is 500
filter_query (Optional[dict]) –
projection (Optional[dict]) –
- Return type:
Iterator[List[dict]]
Module contents¶
Init package