Skip to content

SQL Postgres

PgSqlHandler(connection_properties, max_retries=3, retry_wait_time=1, skip_connect=False, **kwargs)

Subclass of SqlHandler used for PostgreSQL.

This already connects to the SQL server.

Parameters:

  • connection_properties

    (SqlConnProperties) –

    Object containing connection parameters.

  • max_retries

    (int, default: 3 ) –

    Number of retries that will be attempted when reconnecting or doing queries, by default 3

  • retry_wait_time

    (float, default: 1 ) –

    Wait time in seconds between each connection or query retry, by default 1

  • skip_connect

    (bool, default: False ) –

    If True, the connection will not be established when the object is created.

    If this is set toTrue, the user will need to manually call the reconnect() method when they want to connect to the server.

    By default False

  • **kwargs

    (dict, default: {} ) –

    Just kept here for compatibility.

Source code in echo_connhandler/sql_postgres.py
@validate_call
def __init__(
    self,
    connection_properties: SqlConnProperties,
    max_retries: int = 3,
    retry_wait_time: float = 1,
    skip_connect: bool = False,
    **kwargs,  # pylint: disable=unused-argument # noqa
) -> None:
    """Method that initializes the SQL handler.

    This already connects to the SQL server.

    Parameters
    ----------
    connection_properties : SqlConnProperties
        Object containing connection parameters.
    max_retries : int, optional
        Number of retries that will be attempted when reconnecting or doing queries, by default 3
    retry_wait_time : float, optional
        Wait time in seconds between each connection or query retry, by default 1
    skip_connect : bool, optional
        If True, the connection will not be established when the object is created.

        If this is set toTrue, the user will need to manually call the reconnect() method when they want to connect to the server.

        By default False
    **kwargs : dict, optional
        Just kept here for compatibility.

    """
    # var to store notices
    self.notices = []
    # max number of notices to store
    self._max_notices = 10

    super().__init__(
        connection_properties=connection_properties,
        max_retries=max_retries,
        retry_wait_time=retry_wait_time,
        skip_connect=skip_connect,
    )

create_table(schema, table_name, columns, pkey_cols=None, if_exists='fail', temporary=False)

Method to create a table in the database.

Parameters:

  • schema

    (str | None) –

    Name of the schema to create the table in. If temporary is set to True, this must be set to None.

  • table_name

    (str) –

    Name of the table to be created.

  • columns

    (dict[str, str]) –

    Dict in the format {column_name: column_type, ...}.

  • pkey_cols

    (list[str] | None, default: None ) –

    List of columns to be set as primary key. If None, no primary key will be set. By default None.

  • if_exists

    (Literal['fail', 'replace'], default: 'fail' ) –

    What to do if the table already exists.

    • fail: Will raise a ValueError.
    • replace: Will drop the table and create a new one.

    By default "fail".

  • temporary

    (bool, default: False ) –

    If set to True, the table will be created as temporary. By default False.

Source code in echo_connhandler/sql_postgres.py
@validate_call
def create_table(
    self,
    schema: str | None,
    table_name: str,
    columns: dict[str, str],
    pkey_cols: list[str] | None = None,
    if_exists: Literal["fail", "replace"] = "fail",
    temporary: bool = False,
) -> None:
    """Method to create a table in the database.

    Parameters
    ----------
    schema : str | None
        Name of the schema to create the table in. If temporary is set to True, this must be set to None.
    table_name : str
        Name of the table to be created.
    columns : dict[str, str]
        Dict in the format {column_name: column_type, ...}.
    pkey_cols : list[str] | None, optional
        List of columns to be set as primary key. If None, no primary key will be set. By default None.
    if_exists : Literal["fail", "replace"], optional
        What to do if the table already exists.

        - fail: Will raise a ValueError.
        - replace: Will drop the table and create a new one.

        By default "fail".
    temporary : bool, optional
        If set to True, the table will be created as temporary. By default False.
    """
    # checking arguments
    if not isinstance(schema, str | type(None)):
        raise TypeError(f"schema should be a string or none, not {type(schema)}")
    if not isinstance(table_name, str):
        raise TypeError(f"table_name should be a string, not {type(table_name)}")
    if not isinstance(columns, dict):
        raise TypeError(f"columns should be a dict, not {type(columns)}")
    if not all(isinstance(col, str) for col in columns):
        raise TypeError(f"all column names should be strings, not {type(columns)}")
    if not all(isinstance(col_type, str) for col_type in columns.values()):
        raise TypeError(f"all column types should be strings, not {type(columns)}")
    if pkey_cols is not None:
        if not isinstance(pkey_cols, list):
            raise TypeError(f"pkey_cols should be a list, not {type(pkey_cols)}")
        if not all(isinstance(col, str) for col in pkey_cols):
            raise TypeError(f"all primary key columns should be strings, not {type(pkey_cols)}")
        if any(col not in columns for col in pkey_cols):
            raise ValueError(f"all primary key columns should be in columns, not {pkey_cols}")
    if if_exists not in ["fail", "replace"]:
        raise ValueError(f"if_exists should be one of ['fail', 'replace'], not {if_exists}")
    if not isinstance(temporary, bool):
        raise TypeError(f"temporary should be a bool, not {type(temporary)}")
    if temporary and schema is not None:
        raise ValueError("If temporary is set to True, schema must be set to None.")

    # checking if table already exists
    existing_tables = self.get_table_names(schema=schema)
    if table_name in existing_tables and if_exists == "fail":
        raise ValueError(f"Table {table_name} already exists in the database.")
    if (table_name in existing_tables or temporary) and if_exists == "replace":
        self.delete_table(schema=schema, table_name=table_name, on_error="ignore")

    # creating table
    query = sql.SQL("CREATE {temporary} TABLE {schema}{table_name} ({columns} {pkey})").format(
        temporary=sql.SQL("TEMPORARY" if temporary else ""),
        schema=sql.SQL("{schema}.").format(schema=sql.Identifier(schema)) if schema else sql.SQL(""),
        table_name=sql.Identifier(table_name),
        columns=sql.SQL(",").join(
            [
                sql.SQL("{col_name} {col_type}").format(col_name=sql.Identifier(col), col_type=sql.SQL(col_type))
                for col, col_type in columns.items()
            ],
        ),
        pkey=sql.SQL(", PRIMARY KEY ({pkey_cols})").format(pkey_cols=sql.SQL(",").join([sql.Identifier(col) for col in pkey_cols]))
        if pkey_cols
        else sql.SQL(""),
    )

    # executing query
    self.execute(query)
    logger.debug(f"Table {table_name} created in schema {schema}.")

delete_table(schema, table_name, on_error='fail')

Method to delete a table in the database.

Parameters:

  • schema

    (str | None) –

    Name of the schema of the table to delete.

  • table_name

    (str) –

    Name of the table to be deleted.

  • on_error

    (Literal['fail', 'ignore'], default: 'fail' ) –

    What to do if the table does not exist. - fail: Will raise a ValueError. - ignore: Will ignore the error. By default "fail".

Source code in echo_connhandler/sql_postgres.py
@validate_call
def delete_table(
    self,
    schema: str | None,
    table_name: str,
    on_error: Literal["fail", "ignore"] = "fail",
) -> None:
    """Method to delete a table in the database.

    Parameters
    ----------
    schema : str | None
        Name of the schema of the table to delete.
    table_name: str
        Name of the table to be deleted.
    on_error : Literal["fail", "ignore"], optional
        What to do if the table does not exist.
        - fail: Will raise a ValueError.
        - ignore: Will ignore the error.
        By default "fail".
    """
    # checking arguments
    if not isinstance(schema, str | type(None)):
        raise TypeError(f"schema should be a string or None, not {type(schema)}")
    if not isinstance(table_name, str):
        raise TypeError(f"table_name should be a string, not {type(table_name)}")
    if on_error not in ["fail", "ignore"]:
        raise ValueError(f"on_error should be one of ['fail', 'ignore'], not {on_error}")

    # checking if table exists
    existing_tables = self.get_table_names(schema=schema)
    if table_name not in existing_tables and schema is not None:
        if on_error == "fail":
            raise ValueError(f"Table {table_name} does not exist in the database.")
        return

    # deleting table
    query = sql.SQL("DROP TABLE IF EXISTS {schema}{table_name}").format(
        schema=sql.SQL("{schema}.").format(schema=sql.Identifier(schema)) if schema else sql.SQL(""),
        table_name=sql.Identifier(table_name),
    )
    self.execute(query)
    logger.debug(f"Table {table_name} deleted from schema {schema}.")

get_table_columns(schema, table_names, table_types=None)

Method to get the columns in a table and it's data types.

Parameters:

  • schema

    (str | None) –

    Name of the schema where the tables are located.

  • table_names

    (list[str]) –

    Name of the desired tables.

  • table_types

    (list[Literal['table', 'view', 'materialized_view']], default: None ) –

    List of table types to be returned. Can be one of ["table", "view", "materialized_view"]. If set to None, will be equal to ["table"], by default None.

Returns:

  • dict[str, dict[str, str]]

    Dict in the format {table_name: {column_name: column_type, ...}, ...}.

Source code in echo_connhandler/sql_postgres.py
@validate_call
def get_table_columns(
    self,
    schema: str | None,
    table_names: list[str],
    table_types: list[Literal["table", "view", "materialized_view"]] | None = None,
) -> dict[str, dict[str, str]]:
    # sourcery skip: class-extract-method
    """Method to get the columns in a table and it's data types.

    Parameters
    ----------
    schema : str | None
        Name of the schema where the tables are located.
    table_names : list[str]
        Name of the desired tables.
    table_types : list[Literal["table", "view", "materialized_view"]], optional
        List of table types to be returned. Can be one of ["table", "view", "materialized_view"]. If set to None, will be equal to ["table"], by default None.

    Returns
    -------
    dict[str, dict[str, str]]
        Dict in the format {table_name: {column_name: column_type, ...}, ...}.

    """
    # checking arguments
    if not isinstance(table_names, list):
        raise TypeError(f"table_names should be a list, not {type(table_names)}")
    if not all(isinstance(table, str) for table in table_names):
        raise TypeError(f"all table names should be strings, not {type(table_names)}")
    if not isinstance(schema, str | type(None)):
        raise TypeError(f"schema should be a string or none, not {type(schema)}")
    if table_types is not None:
        if not isinstance(table_types, list):
            raise TypeError(f"table_types should be a list, not {type(table_types)}")
        if not all(isinstance(table_type, str) for table_type in table_types):
            raise TypeError(f"all table types should be strings, not {type(table_types)}")
        if any(table_type not in ["table", "view", "materialized_view"] for table_type in table_types):
            raise ValueError(f"all table types should be one of ['table', 'view', 'materialized_view'], not {table_types}")
    # setting default value for table_types
    if table_types is None:
        table_types = ["table"]

    # checking if all tables exist
    existing_tables = self.get_table_names(schema=schema, table_types=table_types)
    if missing_tables := set(table_names) - set(existing_tables):
        raise ValueError(f"Tables {missing_tables} do not exist in the database.")

    query = sql.SQL(
        "SELECT table_name, column_name, data_type FROM information_schema.columns WHERE table_name IN ({tables}) {schema}",
    ).format(
        schema=sql.SQL("AND table_schema = {schema}").format(schema=sql.Literal(schema)) if schema else sql.SQL(""),
        tables=sql.SQL(",").join([sql.Literal(table) for table in table_names]),
    )
    df = self.read_to_pandas(query)

    return {
        table: dict(
            df[df["table_name"] == table][["column_name", "data_type"]].values,
        )
        for table in table_names
    }

get_table_names(schema, table_types=None)

Method to get the names of all tables in the database.

Parameters:

  • schema

    (str | None) –

    Name of the schema where the tables are located.

  • table_types

    (list[Literal['table', 'view', 'materialized_view']], default: None ) –

    List of table types to be returned. Can be one of ["table", "view", "materialized_view"]. If set to None, will be equal to ["table"], by default None.

Returns:

  • list[str]

    List of table names.

Source code in echo_connhandler/sql_postgres.py
@validate_call
def get_table_names(
    self,
    schema: str | None,
    table_types: list[Literal["table", "view", "materialized_view"]] | None = None,
) -> list[str]:
    """Method to get the names of all tables in the database.

    Parameters
    ----------
    schema : str | None
        Name of the schema where the tables are located.
    table_types : list[Literal["table", "view", "materialized_view"]], optional
        List of table types to be returned. Can be one of ["table", "view", "materialized_view"]. If set to None, will be equal to ["table"], by default None.

    Returns
    -------
    list[str]
        List of table names.

    """
    # checking arguments
    if not isinstance(schema, str | type(None)):
        raise TypeError(f"schema should be a string or none, not {type(schema)}")
    if table_types is not None:
        if not isinstance(table_types, list):
            raise TypeError(f"table_types should be a list, not {type(table_types)}")
        if not all(isinstance(table_type, str) for table_type in table_types):
            raise TypeError(f"all table types should be strings, not {type(table_types)}")
        if any(table_type not in ["table", "view", "materialized_view"] for table_type in table_types):
            raise ValueError(f"all table types should be one of ['table', 'view', 'materialized_view'], not {table_types}")

    # setting default value for table_types
    if table_types is None:
        table_types = ["table"]

    types_mapping = {
        "table": "BASE TABLE",
        "view": "VIEW",
        "materialized_view": "MATERIALIZED VIEW",
    }

    # getting table names (except materialized_view)
    query = sql.SQL(
        "SELECT table_name FROM information_schema.tables WHERE table_type IN ({types}) {schema}",
    ).format(
        schema=sql.SQL("AND table_schema = {schema}").format(schema=sql.Literal(schema)) if schema else sql.SQL(""),
        types=sql.SQL(",").join([sql.Literal(types_mapping[table_type]) for table_type in table_types]),
    )
    df = self.read_to_pandas(query)

    table_list = df["table_name"].tolist()

    # getting materialized_view names
    if "materialized_view" in table_types:
        query = sql.SQL(
            "SELECT matviewname FROM pg_matviews {schema}",
        ).format(
            schema=sql.SQL("WHERE schemaname = {schema}").format(schema=sql.Literal(schema)) if schema else sql.SQL(""),
        )
        df = self.read_to_pandas(query)

        table_list += df["matviewname"].tolist()

    return table_list

get_table_pkey(schema, table_names, return_type='columns')

Method to get the primary key of a table.

Can either return the name of the primary key or the columns associated with the primary key (depending on the type of SQL)

Parameters:

  • schema

    (str | None) –

    Name of the schema where the tables are located.

  • table_names

    (list[str]) –

    Name of the desired tables.

  • return_type

    (Literal['name', 'columns'], default: 'columns' ) –

    What to return. Can be either "name" of the primary key or "columns" for the columns involved, by default "columns".

Returns:

  • dict[str, list[str] | str]

    In case of return_type="columns", returns a dict in the format {table_name: [pk_col1, pk_col2, ...], ...}. In case of return_type="name", returns a dict in the format {table_name: pkey_name, ...}.

Source code in echo_connhandler/sql_postgres.py
@validate_call
def get_table_pkey(
    self,
    schema: str | None,
    table_names: list[str],
    return_type: Literal["name", "columns"] = "columns",
) -> dict[str, list[str] | str]:
    """Method to get the primary key of a table.

    Can either return the name of the primary key or the columns associated with the primary key (depending on the type of SQL)

    Parameters
    ----------
    schema : str | None
        Name of the schema where the tables are located.
    table_names : list[str]
        Name of the desired tables.
    return_type : Literal["name", "columns"], optional
        What to return. Can be either "name" of the primary key or "columns" for the columns involved, by default "columns".

    Returns
    -------
    dict[str, list[str] | str]
        In case of return_type="columns", returns a dict in the format {table_name: [pk_col1, pk_col2, ...], ...}.
        In case of return_type="name", returns a dict in the format {table_name: pkey_name, ...}.
    """
    # checking arguments
    if not isinstance(table_names, list):
        raise TypeError(f"table_names should be a list, not {type(table_names)}")
    if not all(isinstance(table, str) for table in table_names):
        raise TypeError(f"all table names should be strings, not {type(table_names)}")
    if not isinstance(schema, str | type(None)):
        raise TypeError(f"schema should be a string or none, not {type(schema)}")
    if return_type not in ["name", "columns"]:
        raise ValueError(f"return_type should be one of ['name', 'columns'], not {return_type}")

    # checking if all tables exist
    existing_tables = self.get_table_names(schema=schema)
    if missing_tables := set(table_names) - set(existing_tables):
        raise ValueError(f"Tables {missing_tables} do not exist in the database.")

    # getting primary keys names
    query = sql.SQL(
        "SELECT table_name, constraint_name FROM information_schema.table_constraints WHERE table_name IN ({tables}) AND constraint_type = 'PRIMARY KEY' {schema}",
    ).format(
        schema=sql.SQL("AND table_schema = {schema}").format(schema=sql.Literal(schema)) if schema else sql.SQL(""),
        tables=sql.SQL(",").join([sql.Literal(table) for table in table_names]),
    )
    df = self.read_to_pandas(query)

    pkey_names = dict(df[["table_name", "constraint_name"]].values)

    if return_type == "name":
        return pkey_names

    if not pkey_names:
        return {table: [] for table in table_names}

    # getting primary keys columns
    query = sql.SQL(
        "SELECT table_name, column_name FROM information_schema.constraint_column_usage WHERE table_name IN ({tables}) AND constraint_name IN ({pkey_names}) {schema}",
    ).format(
        schema=sql.SQL("AND table_schema = {schema}").format(schema=sql.Literal(schema)) if schema else sql.SQL(""),
        tables=sql.SQL(",").join([sql.Literal(table) for table in table_names]),
        pkey_names=sql.SQL(",").join([sql.Literal(pkey) for pkey in pkey_names.values()]),
    )
    df = self.read_to_pandas(query)

    return df.groupby("table_name")["column_name"].apply(list).to_dict()