aind_data_access_api package

Subpackages

Submodules

aind_data_access_api.credentials module

Module to manage credentials to connect to databases.

class aind_data_access_api.credentials.AWSConfigSettingsSource(settings_cls, config_file_location)

Bases: JsonConfigSettingsSource

Class that parses from aws secrets manager.

class aind_data_access_api.credentials.CoreCredentials(_case_sensitive: bool | None = None, _nested_model_default_partial_update: bool | None = None, _env_prefix: str | None = None, _env_file: DotenvType | None = PosixPath('.'), _env_file_encoding: str | None = None, _env_ignore_empty: bool | None = None, _env_nested_delimiter: str | None = None, _env_nested_max_split: int | None = None, _env_parse_none_str: str | None = None, _env_parse_enums: bool | None = None, _cli_prog_name: str | None = None, _cli_parse_args: bool | list[str] | tuple[str, ...] | None = None, _cli_settings_source: CliSettingsSource[Any] | None = None, _cli_parse_none_str: str | None = None, _cli_hide_none_type: bool | None = None, _cli_avoid_json: bool | None = None, _cli_enforce_required: bool | None = None, _cli_use_class_docs_for_groups: bool | None = None, _cli_exit_on_error: bool | None = None, _cli_prefix: str | None = None, _cli_flag_prefix_char: str | None = None, _cli_implicit_flags: bool | None = None, _cli_ignore_unknown_args: bool | None = None, _cli_kebab_case: bool | None = None, _secrets_dir: PathType | None = None, *, aws_secrets_name: str | None = None, username: str, password: SecretStr, host: str, port: int, database: str | None = None)

Bases: BaseSettings

Core credentials for most of our databases.

aws_secrets_name: str | None
database: str | None
host: str
model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'cli_avoid_json': False, 'cli_enforce_required': False, 'cli_exit_on_error': True, 'cli_flag_prefix_char': '-', 'cli_hide_none_type': False, 'cli_ignore_unknown_args': False, 'cli_implicit_flags': False, 'cli_kebab_case': False, 'cli_parse_args': None, 'cli_parse_none_str': None, 'cli_prefix': '', 'cli_prog_name': None, 'cli_use_class_docs_for_groups': False, 'enable_decoding': True, 'env_file': None, 'env_file_encoding': None, 'env_ignore_empty': False, 'env_nested_delimiter': None, 'env_nested_max_split': None, 'env_parse_enums': None, 'env_parse_none_str': None, 'env_prefix': '', 'extra': 'forbid', 'json_file': None, 'json_file_encoding': None, 'nested_model_default_partial_update': False, 'protected_namespaces': ('model_validate', 'model_dump', 'settings_customise_sources'), 'secrets_dir': None, 'toml_file': None, 'validate_default': True, 'yaml_file': None, 'yaml_file_encoding': None}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

password: SecretStr
port: int
classmethod settings_customise_sources(settings_cls: Type[BaseSettings], init_settings: InitSettingsSource, env_settings: EnvSettingsSource, dotenv_settings: PydanticBaseSettingsSource, file_secret_settings: PydanticBaseSettingsSource) Tuple[PydanticBaseSettingsSource, ...]

Method to pull configs from a variety sources, such as a file or aws. Arguments are required and set by pydantic.

Parameters:
  • settings_cls (Type[BaseSettings]) – Top level class. Model fields can be pulled from this.

  • init_settings (InitSettingsSource) – The settings in the init arguments.

  • env_settings (EnvSettingsSource) – The settings pulled from environment variables.

  • dotenv_settings (PydanticBaseSettingsSource) – Settings from .env files. Currently, not supported.

  • file_secret_settings (PydanticBaseSettingsSource) – Settings from secret files such as used in Docker. Currently, not supported.

Return type:

Tuple[PydanticBaseSettingsSource, …]

username: str
class aind_data_access_api.credentials.JsonConfigSettingsSource(settings_cls, config_file_location)

Bases: PydanticBaseSettingsSource, ABC

Abstract base class for settings that parse json

get_field_value(field: FieldInfo, field_name: str) Tuple[Any, str, bool]

Gets the value, the key for model creation, and a flag to determine whether value is complex. :param field: The field :type field: FieldInfo :param field_name: The field name :type field_name: str

Returns:

A tuple contains the key, value and a flag to determine whether value is complex.

Return type:

Tuple[Any, str, bool]

prepare_field_value(field_name: str, field: FieldInfo, value: Any, value_is_complex: bool) Any

Prepares the value of a field. :param field_name: The field name :type field_name: str :param field: The field :type field: FieldInfo :param value: The value of the field that has to be prepared :type value: Any :param value_is_complex: A flag to determine whether value is complex :type value_is_complex: bool

Returns:

The prepared value

Return type:

Any

aind_data_access_api.document_db module

Module to interface with the DocumentDB

class aind_data_access_api.document_db.AnalysisDbClient(host: str, collection: str, database: str = 'analysis', version: str = 'v1', boto: Session | None = None, session: Session | None = None)

Bases: Client

Class to manage reading and writing to analysis db

class aind_data_access_api.document_db.Client(host: str, database: str, collection: str, version: str = 'v1', boto: Session | None = None, session: Session | None = None)

Bases: object

Class to create client to interface with DocumentDB via a REST api

aggregate_docdb_records(pipeline: List[dict]) List[dict]

Aggregate records using an aggregation pipeline.

property boto: Session

Boto3 session

close()

Close the clients.

delete_many_records(data_asset_record_ids: List[str]) Response

Delete many records by their ids

delete_one_record(data_asset_record_id: str) Response

Delete one record by id

fetch_records_by_filter_list(filter_key: str, filter_values: List[str], projection: dict | None = None) List[dict]

Queries DocDB for records where the value of a specified field is in a list of values. Uses an aggregation pipeline with $in filter operator.

Parameters:
  • filter_key (str) – The field to filter on.

  • filter_values (List[str]) – The list of values to filter on.

  • projection (Optional[dict]) – Subset of fields to return. Default is None which returns all fields.

Return type:

List[dict]

insert_one_docdb_record(record: dict) Response

Insert one new record

retrieve_docdb_records(filter_query: dict | None = None, projection: dict | None = None, sort: dict | None = None, limit: int = 0, paginate: bool | None = None, paginate_batch_size: int | None = None, paginate_max_iterations: int | None = None) List[dict]

Retrieve raw json records from DocDB API Gateway as a list of dicts. Queries to the API Gateway are paginated.

Parameters:
  • filter_query (Optional[dict]) – Filter to apply to the records being returned. Default is None.

  • projection (Optional[dict]) – Subset of document fields to return. Default is None.

  • sort (Optional[dict]) – Sort records when returned. Default is None.

  • limit (int) – Return a smaller set of records. 0 for all records. Default is 0.

  • paginate (bool) – (deprecated) If set to true, will batch the queries to the API Gateway.

  • paginate_batch_size (int) – (deprecated) Number of records to return at a time. Default is 500.

  • paginate_max_iterations (int) – (deprecated) Max number of iterations to run to prevent indefinite calls to the API Gateway. Default is 20000.

Return type:

List[dict]

property session: Session

Requests session

upsert_list_of_docdb_records(records: List[dict], max_payload_size: int = 5000000.0) List[Response]

Upsert a list of records. There’s a limit to the size of the request that can be sent, so we chunk the requests.

Parameters:
  • records (List[dict]) – List of records to upsert into the DocDB database

  • max_payload_size (int) – Chunk requests into smaller lists no bigger than this value in bytes. If a single record is larger than this value in bytes, an attempt will be made to upsert the record but will most likely receive a 413 status code. The Default is 2e6 bytes. The max payload for the API Gateway including headers is 10MB.

Returns:

A list of responses from the API Gateway.

Return type:

List[Response]

upsert_one_docdb_record(record: dict) Response

Upsert one record if the record is not corrupt

class aind_data_access_api.document_db.MetadataDbClient(host: str, database: str = 'metadata_index', collection: str = 'data_assets', version: str = 'v1', boto: Session | None = None, session: Session | None = None)

Bases: Client

Class to manage reading and writing to metadata db

add_qc_evaluation(data_asset_id: str, qc_contents: Dict[str, Any]) Dict[str, Any]

Add one or more QC evaluations (or other QC content) to a data asset.

deregister_asset(s3_location: str) Dict[str, Any]

De-register (delete) a data asset from Code Ocean and remove its metadata from DocDB given that the asset and its metadata are located at the provided S3 location.

Parameters:

s3_location (str) – The S3 location containing the asset and metadata to be removed.

Returns:

The response from the deregistration API, including deregistration status and details.

Return type:

Dict[str, Any]

generate_data_summary(record_id: str) Dict[str, Any]

Get an LLM-generated summary for a data asset with the given record id.

register_asset(s3_location: str) Dict[str, Any]

Register a data asset to Code Ocean and add its metadata to DocDB given the metadata exists at the top level of the provided S3 location.

Parameters:

s3_location (str) – The S3 location containing the asset and its metadata.

Returns:

The response from the registration API, including registration status and details.

Return type:

Dict[str, Any]

register_co_result(s3_location: str, name: str, co_asset_id: str, co_computation_id: str) Dict[str, Any]

Register a Code Ocean result asset and add its metadata to DocDB given the metadata exists at the top level of the Code Ocean computation result.

Parameters:
  • s3_location (str) – The S3 location containing the result asset and its metadata.

  • name (str) – The name of the result asset.

  • co_asset_id (str) – The Code Ocean asset ID for the result.

  • co_computation_id (str) – The Code Ocean computation ID associated with the result.

Returns:

The response from the registration API, including registration status and details.

Return type:

Dict[str, Any]

class aind_data_access_api.document_db.SchemaDbClient(host: str, collection: str, database: str = 'schemas', version: str = 'v1', boto: Session | None = None, session: Session | None = None)

Bases: Client

Class to manage reading and writing to schemas db

retrieve_schema_records(schema_version: str | None = None, projection: dict | None = None, sort: dict | None = None, limit: int = 0) List[dict]

Retrieve schemas records from DocDB API Gateway as a list of dicts.

Parameters:
  • schema_version (Optional[str]) – Schema version to use as a filter_query. Default is None.

  • projection (Optional[dict]) – Subset of document fields to return. Default is None.

  • sort (Optional[dict]) – Sort records when returned. Default is None.

  • limit (int) – Return a smaller set of records. 0 for all records. Default is 0.

Return type:

List[dict]

aind_data_access_api.document_db_ssh module

Module to interface with the Document Database using SSH tunneling.

class aind_data_access_api.document_db_ssh.DocumentDbSSHClient(credentials: DocumentDbSSHCredentials)

Bases: object

Class to establish a Document Store client with SSH tunneling.

close()

Close the client and SSH tunnel.

property collection

Collection of metadata records in Document Database.

start()

Start the client and SSH tunnel.

class aind_data_access_api.document_db_ssh.DocumentDbSSHCredentials(_case_sensitive: bool | None = None, _nested_model_default_partial_update: bool | None = None, _env_prefix: str | None = None, _env_file: DotenvType | None = PosixPath('.'), _env_file_encoding: str | None = None, _env_ignore_empty: bool | None = None, _env_nested_delimiter: str | None = None, _env_nested_max_split: int | None = None, _env_parse_none_str: str | None = None, _env_parse_enums: bool | None = None, _cli_prog_name: str | None = None, _cli_parse_args: bool | list[str] | tuple[str, ...] | None = None, _cli_settings_source: CliSettingsSource[Any] | None = None, _cli_parse_none_str: str | None = None, _cli_hide_none_type: bool | None = None, _cli_avoid_json: bool | None = None, _cli_enforce_required: bool | None = None, _cli_use_class_docs_for_groups: bool | None = None, _cli_exit_on_error: bool | None = None, _cli_prefix: str | None = None, _cli_flag_prefix_char: str | None = None, _cli_implicit_flags: bool | None = None, _cli_ignore_unknown_args: bool | None = None, _cli_kebab_case: bool | None = None, _secrets_dir: PathType | None = None, *, aws_secrets_name: str | None = None, username: str, password: SecretStr, host: str, port: int = 27017, database: str = 'metadata_index', collection: str = 'data_assets', ssh_local_bind_address: str = 'localhost', ssh_host: str, ssh_port: int = 22, ssh_username: str, ssh_password: SecretStr)

Bases: CoreCredentials

Document Store credentials with SSH tunneling.

collection: str
database: str
classmethod from_secrets_manager(doc_db_secret_name: str, ssh_secret_name: str)

Construct class from AWS Secrets Manager

Parameters:
  • doc_db_secret_name (str) – The name of the secret that contains the document store credentials (host, port, username, password).

  • ssh_secret_name (str) – The name of the secret that contains the ssh credentials (host, username, password).

host: str
model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'cli_avoid_json': False, 'cli_enforce_required': False, 'cli_exit_on_error': True, 'cli_flag_prefix_char': '-', 'cli_hide_none_type': False, 'cli_ignore_unknown_args': False, 'cli_implicit_flags': False, 'cli_kebab_case': False, 'cli_parse_args': None, 'cli_parse_none_str': None, 'cli_prefix': '', 'cli_prog_name': None, 'cli_use_class_docs_for_groups': False, 'enable_decoding': True, 'env_file': None, 'env_file_encoding': None, 'env_ignore_empty': False, 'env_nested_delimiter': None, 'env_nested_max_split': None, 'env_parse_enums': None, 'env_parse_none_str': None, 'env_prefix': 'DOC_DB_', 'extra': 'ignore', 'json_file': None, 'json_file_encoding': None, 'nested_model_default_partial_update': False, 'protected_namespaces': ('model_validate', 'model_dump', 'settings_customise_sources'), 'secrets_dir': None, 'toml_file': None, 'validate_default': True, 'yaml_file': None, 'yaml_file_encoding': None}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

password: SecretStr
port: int
ssh_host: str
ssh_local_bind_address: str
ssh_password: SecretStr
ssh_port: int
ssh_username: str
username: str

aind_data_access_api.rds_tables module

Module to interface with the Relational Database

class aind_data_access_api.rds_tables.Client(credentials: RDSCredentials, drivername: str | None = 'postgresql')

Bases: object

Class to establish a relational database client. Includes methods to read/write pandas dataframes to backend.

append_df_to_table(df: DataFrame, table_name: str, dtype: dict | str | None = None) None

Append a dataframe to an existing table. :param df: :type df: pd.Dataframe :param table_name: :type table_name: str :param dtype: :type dtype: Optional[Union[dict, str]]

Return type:

None

execute_query(query: str) CursorResult

Run a sql query against the database :param query: :type query: str

Returns:

The result of the query.

Return type:

CursorResult

overwrite_table_with_df(df: DataFrame, table_name: str, dtype: dict | str | None = None) None

Overwrite an existing table with a dataframe. :param df: :type df: pd.Dataframe :param table_name: :type table_name: str :param dtype: :type dtype: Optional[Union[dict, str]]

Return type:

None

read_table(table_name: str, where_clause: str | None = None) DataFrame

Import sql table as a pandas dataframe.

Parameters:
  • table_name (str) –

  • where_clause (Optional[str]) – If None, this method will pull the entire table. The user can set a custom clause if additional filtering is desired. Default is None.

Returns:

A pandas dataframe created from the sql table.

Return type:

pd.Dataframe

class aind_data_access_api.rds_tables.RDSCredentials(_case_sensitive: bool | None = None, _nested_model_default_partial_update: bool | None = None, _env_prefix: str | None = None, _env_file: DotenvType | None = PosixPath('.'), _env_file_encoding: str | None = None, _env_ignore_empty: bool | None = None, _env_nested_delimiter: str | None = None, _env_nested_max_split: int | None = None, _env_parse_none_str: str | None = None, _env_parse_enums: bool | None = None, _cli_prog_name: str | None = None, _cli_parse_args: bool | list[str] | tuple[str, ...] | None = None, _cli_settings_source: CliSettingsSource[Any] | None = None, _cli_parse_none_str: str | None = None, _cli_hide_none_type: bool | None = None, _cli_avoid_json: bool | None = None, _cli_enforce_required: bool | None = None, _cli_use_class_docs_for_groups: bool | None = None, _cli_exit_on_error: bool | None = None, _cli_prefix: str | None = None, _cli_flag_prefix_char: str | None = None, _cli_implicit_flags: bool | None = None, _cli_ignore_unknown_args: bool | None = None, _cli_kebab_case: bool | None = None, _secrets_dir: PathType | None = None, *, aws_secrets_name: str | None = None, username: str, password: SecretStr, host: str, port: int = 5432, database: str | None = None, dbname: str | None = None)

Bases: CoreCredentials

RDS db credentials

dbname: str | None
host: str
model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'cli_avoid_json': False, 'cli_enforce_required': False, 'cli_exit_on_error': True, 'cli_flag_prefix_char': '-', 'cli_hide_none_type': False, 'cli_ignore_unknown_args': False, 'cli_implicit_flags': False, 'cli_kebab_case': False, 'cli_parse_args': None, 'cli_parse_none_str': None, 'cli_prefix': '', 'cli_prog_name': None, 'cli_use_class_docs_for_groups': False, 'enable_decoding': True, 'env_file': None, 'env_file_encoding': None, 'env_ignore_empty': False, 'env_nested_delimiter': None, 'env_nested_max_split': None, 'env_parse_enums': None, 'env_parse_none_str': None, 'env_prefix': 'RDS_', 'extra': 'forbid', 'json_file': None, 'json_file_encoding': None, 'nested_model_default_partial_update': False, 'protected_namespaces': ('model_validate', 'model_dump', 'settings_customise_sources'), 'secrets_dir': None, 'toml_file': None, 'validate_default': True, 'yaml_file': None, 'yaml_file_encoding': None}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

password: SecretStr
port: int
username: str
validate_database_name() Self

Sets database to db_name

aind_data_access_api.secrets module

Module to access secrets and parameters

aind_data_access_api.secrets.get_parameter(parameter_name: str, with_decryption=False) str

Retrieves a parameter from AWS Parameter Store.

param parameter_name: The name of the parameter to retrieve.

aind_data_access_api.secrets.get_secret(secret_name: str) dict

Retrieves a secret from AWS Secrets Manager.

param secret_name: The name of the secret to retrieve.

aind_data_access_api.utils module

Package for common methods used for interfacing with DocDB.

aind_data_access_api.utils.build_docdb_location_to_id_map(docdb_api_client: MetadataDbClient, bucket: str, prefixes: List[str]) Dict[str, str]

For a given s3 bucket and list of prefixes, return a dictionary that looks like {‘s3://bucket/prefix’: ‘abc-1234’} where the value is the id of the record in DocDb. If the record does not exist, then there will be no key in the dictionary.

Parameters:
  • docdb_api_client (MetadataDbClient) –

  • bucket (str) –

  • prefixes (List[str]) –

Return type:

Dict[str, str]

aind_data_access_api.utils.does_metadata_record_exist_in_docdb(docdb_api_client: MetadataDbClient, bucket: str, prefix: str) bool

For a given bucket and prefix, check if there is already a record in DocDb

Parameters:
Return type:

True if there is a record in DocDb. Otherwise, False.

aind_data_access_api.utils.fetch_records_by_filter_list(docdb_api_client: MetadataDbClient, filter_key: str, filter_values: List[str], projection: dict | None = None) List[dict]

Queries DocDB for records where the value of a specified field is in a list of values. Uses an aggregation pipeline with $in filter operator.

Parameters:
  • docdb_api_client (MetadataDbClient) –

  • filter_key (str) – The field to filter on.

  • filter_values (List[str]) – The list of values to filter on.

  • projection (Optional[dict]) – Subset of fields to return. Default is None which returns all fields.

Return type:

List[dict]

aind_data_access_api.utils.get_record_from_docdb(docdb_api_client: MetadataDbClient, record_id: str) dict | None

Download a record from docdb using the record _id.

Parameters:
Returns:

None if record does not exist. Otherwise, it will return the record as a dict.

Return type:

Optional[dict]

aind_data_access_api.utils.get_s3_bucket_and_prefix(s3_location: str) Dict[str, str]

For a location url like s3://bucket/prefix, it will return the bucket and prefix. It doesn’t check the scheme is s3. It will strip the leading and trailing forward slashes from the prefix.

Parameters:

s3_location (str) – For example, ‘s3://some_bucket/some_prefix’

Returns:

For example, {‘bucket’: ‘some_bucket’, ‘prefix’: ‘some_prefix’}

Return type:

Dict[str, str]

aind_data_access_api.utils.get_s3_location(bucket: str, prefix: str) str

For a given bucket and prefix, return a location url in format s3://{bucket}/{prefix}

Parameters:
  • bucket (str) –

  • prefix (str) –

Returns:

For example, ‘s3://some_bucket/some_prefix’

Return type:

str

aind_data_access_api.utils.paginate_docdb(docdb_api_client: MetadataDbClient, page_size: int = 500, filter_query: dict | None = None, projection: dict | None = None) Iterator[List[dict]]

Paginate through records in DocDb.

Parameters:
  • docdb_api_client (MetadataDbClient) –

  • page_size (int) – Default is 500

  • filter_query (Optional[dict]) –

  • projection (Optional[dict]) –

Return type:

Iterator[List[dict]]

Module contents

Init package