Skip to content

SQL Core

SqlConnProperties(host, user, password, database, port, schema=None, driver=None, conn_timeout=30, query_timeout=300, autocommit=True, application_name=None, time_zone='America/Sao_Paulo') dataclass

Class that holds the properties used to establish a connection to a SQL database.

Parameters:

  • host

    (str) –

    The hostname of the database server.

  • user

    (str) –

    User name

  • password

    (str) –

    Password for the user

  • database

    (str) –

    Name of database to connect to.

  • port

    (int) –

    Port used to connect to the database. Defaults are 5432 for PostgreSQL and 1433 for MS SQL Server.

  • schema

    (str | None, default: None ) –

    Name of the schema to connect to. If set to None, will connect to the default schema. By default None.

  • driver

    (str | None, default: None ) –

    Driver used to connect to the database, by default None.

    Currently this is only applicable to MS SQL Server (something like "ODBC Driver 17 for SQL Server" should be used). If you want to skip pyodbc, set this to "pymssql".

  • conn_timeout

    (int, default: 30 ) –

    Timeout in seconds for connection, by default 30

  • query_timeout

    (int, default: 300 ) –

    Query timeout in seconds, by default 300

  • autocommit

    (bool, default: True ) –

    If set to True, connection will be committed automatically after every transaction. By default True.

  • application_name

    (str | None, default: None ) –

    Name of application. If set to None will be set to f"EchoSqlConnection:{socket.gethostname()}" where socket.gethostname() is the name of the computer running this code. By default None.

  • time_zone

    (str, default: 'America/Sao_Paulo' ) –

    Time zone to be used in the connection. By default "America/Sao_Paulo".

__post_init__()

Method that checks if inputs are valid after initialization.

Source code in echo_connhandler/sql_core.py
def __post_init__(self) -> None:
    """Method that checks if inputs are valid after initialization."""
    if not isinstance(self.host, str):
        raise TypeError(f"host must be of type str, not {type(self.host)}")
    if not isinstance(self.user, str):
        raise TypeError(f"user must be of type str, not {type(self.user)}")
    if not isinstance(self.password, str):
        raise TypeError(f"password must be of type str, not {type(self.password)}")
    if not isinstance(self.database, str):
        raise TypeError(f"database must be of type str, not {type(self.database)}")
    if self.schema is not None and not isinstance(self.schema, str):
        raise TypeError(f"schema must be of type str or None, not {type(self.schema)}")
    if not isinstance(self.port, int):
        raise TypeError(f"port must be of type int, not {type(self.port)}")
    if self.driver is not None and not isinstance(self.driver, str):
        raise TypeError(f"driver must be of type str or None, not {type(self.driver)}")
    if not isinstance(self.conn_timeout, int):
        raise TypeError(f"conn_timeout must be of type int, not {type(self.conn_timeout)}")
    if not isinstance(self.query_timeout, int):
        raise TypeError(f"query_timeout must be of type int, not {type(self.query_timeout)}")
    if not isinstance(self.autocommit, bool):
        raise TypeError(f"autocommit must be of type bool, not {type(self.autocommit)}")
    if not isinstance(self.application_name, str | type(None)):
        raise TypeError(f"application_name must be of type str or None, not {type(self.application_name)}")
    if not isinstance(self.time_zone, str):
        raise TypeError(f"time_zone must be of type str, not {type(self.time_zone)}")

    # adjusting application_name
    if self.application_name is None:
        self.application_name = f"EchoSqlConnection:{socket.gethostname()}"

SqlHandler(connection_properties, max_retries=3, retry_wait_time=1, skip_connect=False, **kwargs)

Base abstract class for SQL handlers. This class should not be instantiated directly.

This class should be used to handle connecting and doing queries to SQL servers. The _connect() method should be defined in subclasses as it might differ for each type of SQL (PostgreSQL, MySQL, MS SQL Server, etc.).

This already connects to the SQL server.

Parameters:

  • connection_properties

    (SqlConnProperties) –

    Object containing connection parameters.

  • max_retries

    (int, default: 3 ) –

    Number of retries that will be attempted when reconnecting or doing queries, by default 3

  • retry_wait_time

    (float, default: 1 ) –

    Wait time in seconds between each connection or query retry, by default 1

  • skip_connect

    (bool, default: False ) –

    If True, the connection will not be established when the object is created.

    If this is set toTrue, the user will need to manually call the reconnect() method when they want to connect to the server.

    By default False

  • **kwargs

    (dict, default: {} ) –

    Just kept here for compatibility.

Source code in echo_connhandler/sql_core.py
def __init__(
    self,
    connection_properties: SqlConnProperties,
    max_retries: int = 3,
    retry_wait_time: float = 1,
    skip_connect: bool = False,
    **kwargs,  # pylint: disable=unused-argument # noqa
) -> None:
    """Method that initializes the SQL handler.

    This already connects to the SQL server.

    Parameters
    ----------
    connection_properties : SqlConnProperties
        Object containing connection parameters.
    max_retries : int, optional
        Number of retries that will be attempted when reconnecting or doing queries, by default 3
    retry_wait_time : float, optional
        Wait time in seconds between each connection or query retry, by default 1
    skip_connect : bool, optional
        If True, the connection will not be established when the object is created.

        If this is set toTrue, the user will need to manually call the reconnect() method when they want to connect to the server.

        By default False
    **kwargs : dict, optional
        Just kept here for compatibility.

    """
    super().__init__(
        connection_properties=connection_properties,
        max_retries=max_retries,
        retry_wait_time=retry_wait_time,
        skip_connect=skip_connect,
    )

check_df_columns(df, schema, table_name, must_have_all=False, allow_extra_columns=False)

Method used to check if the columns in a DataFrame match the columns in a table.

This will check both the names of the columns match and if their data types are compatible.

Parameters:

  • df

    (DataFrame | DataFrame) –

    DataFrame to be checked.

  • schema

    (str | None) –

    Schema of the table.

  • table_name

    (str) –

    Name of the table to be checked.

  • must_have_all

    (bool, default: False ) –

    If set to True, all the columns in SQL table must be present in the DataFrame, by default False

  • allow_extra_columns

    (bool, default: False ) –

    If set to True, the DataFrame can have columns not present in the SQL table, by default False

Source code in echo_connhandler/sql_core.py
@validate_call
def check_df_columns(
    self,
    df: pd.DataFrame | pl.DataFrame,
    schema: str | None,
    table_name: str,
    must_have_all: bool = False,
    allow_extra_columns: bool = False,
) -> None:
    """Method used to check if the columns in a DataFrame match the columns in a table.

    This will check both the names of the columns match and if their data types are compatible.

    Parameters
    ----------
    df : pd.DataFrame | pl.DataFrame
        DataFrame to be checked.
    schema : str | None
        Schema of the table.
    table_name : str
        Name of the table to be checked.
    must_have_all : bool, optional
        If set to True, all the columns in SQL table must be present in the DataFrame, by default False
    allow_extra_columns : bool, optional
        If set to True, the DataFrame can have columns not present in the SQL table, by default False
    """
    # checking if table exists
    if table_name not in self.get_table_names(schema=schema):
        raise ValueError(f"Table {table_name} does not exist in schema {schema}")

    # getting type of DataFrame
    df_dtype = "pandas" if isinstance(df, pd.DataFrame) else "polars"

    # getting table columns
    table_columns: dict[str, str] = self.get_table_columns(schema=schema, table_names=[table_name])[table_name]

    # checking if all columns in the DataFrame are in the table
    if not allow_extra_columns:  # noqa
        if cols_not_in_table := set(df.columns) - set(table_columns):
            raise ValueError(f"DataFrame has columns not in table: {cols_not_in_table}")
    # checking if all columns in the table are in the DataFrame
    if must_have_all:  # noqa
        if cols_not_in_df := set(table_columns) - set(df.columns):
            raise ValueError(f"must_have_all = True and table has columns not in DataFrame: {cols_not_in_df}")

    # checking if the data types match
    if isinstance(df, pd.DataFrame):
        sql_dtypes = self._get_pandas_sql_dtypes(df)
    elif isinstance(df, pl.DataFrame):
        sql_dtypes = self._get_polars_sql_dtypes(df)
    else:
        raise TypeError(f"df must be of type pd.DataFrame or pl.DataFrame, not {type(df)}")

    for col, dtype in sql_dtypes.items():
        allowed_sql_dtypes = self._data_type_mapping[df_dtype][dtype]["sql"]
        if table_columns[col].lower() not in (x.lower() for x in allowed_sql_dtypes):
            raise ValueError(
                f"Column {col} in DataFrame has dtype {df[col].dtype} which does not match the table's dtype {table_columns[col]}. Allowed SQL dtypes for this column dtype are {allowed_sql_dtypes}.",
            )

commit()

Method to commit the current transaction.

Source code in echo_connhandler/sql_core.py
def commit(self) -> None:
    """Method to commit the current transaction."""
    try:
        self.connection.commit()
    except Exception as e:
        raise RuntimeError(f"Could not commit transaction to {self}") from e

create_table(schema, table_name, columns, pkey_cols=None, if_exists='fail', temporary=False) abstractmethod

Method to create a table in the database.

Parameters:

  • schema

    (str | None) –

    Name of the schema to create the table in. If temporary is set to True, this must be set to None.

  • table_name

    (str) –

    Name of the table to be created.

  • columns

    (dict[str, str]) –

    Dict in the format {column_name: column_type, ...}.

  • pkey_cols

    (list[str] | None, default: None ) –

    List of columns to be set as primary key. If None, no primary key will be set. By default None.

  • if_exists

    (Literal['fail', 'replace'], default: 'fail' ) –

    What to do if the table already exists.

    • fail: Will raise a ValueError.
    • replace: Will drop the table and create a new one.

    By default "fail".

  • temporary

    (bool, default: False ) –

    If set to True, the table will be created as temporary. By default False.

Source code in echo_connhandler/sql_core.py
@abstractmethod
def create_table(
    self,
    schema: str | None,
    table_name: str,
    columns: dict[str, str],
    pkey_cols: list[str] | None = None,
    if_exists: Literal["fail", "replace"] = "fail",
    temporary: bool = False,
) -> None:
    """Method to create a table in the database.

    Parameters
    ----------
    schema : str | None
        Name of the schema to create the table in. If temporary is set to True, this must be set to None.
    table_name : str
        Name of the table to be created.
    columns : dict[str, str]
        Dict in the format {column_name: column_type, ...}.
    pkey_cols : list[str] | None, optional
        List of columns to be set as primary key. If None, no primary key will be set. By default None.
    if_exists : Literal["fail", "replace"], optional
        What to do if the table already exists.

        - fail: Will raise a ValueError.
        - replace: Will drop the table and create a new one.

        By default "fail".
    temporary : bool, optional
        If set to True, the table will be created as temporary. By default False.
    """
    raise NotImplementedError("create_table must be implemented in subclass")

cursor()

Method to return the cursor to the current connection.

Source code in echo_connhandler/sql_core.py
def cursor(self) -> Any:
    """Method to return the cursor to the current connection."""
    try:
        return self.connection.cursor()
    except Exception:
        self.reconnect()
        return self.connection.cursor()

delete_table(schema, table_name, on_error='fail') abstractmethod

Method to delete a table in the database.

Parameters:

  • schema

    (str | None) –

    Name of the schema of the table to delete.

  • table_name

    (str) –

    Name of the table to be deleted.

  • on_error

    (Literal['fail', 'ignore'], default: 'fail' ) –

    What to do if the table does not exist. - fail: Will raise a ValueError. - ignore: Will ignore the error. By default "fail".

Source code in echo_connhandler/sql_core.py
@abstractmethod
def delete_table(
    self,
    schema: str | None,
    table_name: str,
    on_error: Literal["fail", "ignore"] = "fail",
) -> None:
    """Method to delete a table in the database.

    Parameters
    ----------
    schema : str | None
        Name of the schema of the table to delete.
    table_name: str
        Name of the table to be deleted.
    on_error : Literal["fail", "ignore"], optional
        What to do if the table does not exist.
        - fail: Will raise a ValueError.
        - ignore: Will ignore the error.
        By default "fail".
    """
    raise NotImplementedError("delete_table must be implemented in subclass")

execute(query, *args, **kwargs)

Method to execute a SQL query using cursor.

Parameters:

  • query

    (SQL | Composed | str) –

    Query to be executed. Can be a SQL statement from psycopg.sql.SQL or psycopg.sql.Composed (preferable) or a string.

  • *args

    (tuple, default: () ) –

    Any additional positional arguments are passed to cursor.execute.

  • **kwargs

    Any additional keyword arguments are passed to cursor.execute. - skip_retry : bool If set to True, will ignore number of retries set in handler and will raise the error immediately.

Raises:

  • RuntimeError

    Raised when the maximum number of retries has been reached and the query could not be executed.

Source code in echo_connhandler/sql_core.py
@validate_call
def execute(self, query: SQL | Composed | str, *args, **kwargs) -> Any:  # noqa
    """Method to execute a SQL query using cursor.

    Parameters
    ----------
    query : SQL | Composed | str
        Query to be executed. Can be a SQL statement from psycopg.sql.SQL or psycopg.sql.Composed (preferable) or a string.
    *args : tuple
        Any additional positional arguments are passed to `cursor.execute`.
    **kwargs
        Any additional keyword arguments are passed to `cursor.execute`.
        - skip_retry : bool
            If set to True, will ignore number of retries set in handler and will raise the error immediately.

    Raises
    ------
    RuntimeError
        Raised when the maximum number of retries has been reached and the query could not be executed.

    """
    query_success = False
    attempt = 0
    skip_retry = kwargs.pop("skip_retry") if "skip_retry" in kwargs else False
    while not query_success:
        try:
            # everything is in a try block so if the connection is lost, it can be reconnected and the query can be executed again
            # this is the case for performance_db, where we have configured idle session timeout of 60 seconds

            if isinstance(query, SQL | Composed):
                if isinstance(self.connection, pg.Connection):  # noqa
                    query_str = query.as_string(self.connection)
                else:
                    # ! This will fail if query is a sql.Composed object containing a sql.Identifier, in this case change sql.Identifier to sql.SQL
                    query_str = query.as_string(None)
            elif isinstance(query, str):
                query_str = query
            else:
                raise TypeError(f"query must be of type psycopg.sql.SQL, psycopg.sql.Composed or string, not {type(query)}")

            cur = self.cursor()
            cur.execute(query_str, *args, **kwargs)
            query_success = True
        except Exception as e:
            if skip_retry:
                raise e
            if attempt == 0:
                # storing the first error to be raised later
                first_error = e
            attempt += 1
            if attempt > self.max_retries:
                raise RuntimeError(
                    f"Reachead {attempt} attempts and could not execute query below using {self.connection_properties}. The error presented is of the first try.\n{query_str}",
                ) from first_error  # type: ignore

            sleep(self.retry_wait_time)
            self.reconnect()
    return cur

get_table_columns(schema, table_names) abstractmethod

Method to get the columns in a table and it's data types.

Parameters:

  • schema

    (str | None) –

    Name of the schema where the tables are located.

  • table_names

    (list[str]) –

    Name of the desired tables.

Returns:

  • dict[str, dict[str, str]]

    Dict in the format {table_name: {column_name: column_type, ...}, ...}.

Source code in echo_connhandler/sql_core.py
@abstractmethod
def get_table_columns(self, schema: str | None, table_names: list[str]) -> dict[str, dict[str, str]]:
    """Method to get the columns in a table and it's data types.

    Parameters
    ----------
    schema : str | None
        Name of the schema where the tables are located.
    table_names : list[str]
        Name of the desired tables.

    Returns
    -------
    dict[str, dict[str, str]]
        Dict in the format {table_name: {column_name: column_type, ...}, ...}.

    """
    raise NotImplementedError("get_table_columns must be implemented in subclass")

get_table_names(schema, table_types=None) abstractmethod

Method to get the names of all tables in the database.

Parameters:

  • schema

    (str | None) –

    Name of the schema to get the tables from.

  • table_types

    (list[Literal['table', 'view']] | None, default: None ) –

    List of table types to be returned. Can be one of ["table", "view"]. If set to None, will be equal to ["table"], by default None.

Returns:

  • list[str]

    List of table names.

Source code in echo_connhandler/sql_core.py
@abstractmethod
def get_table_names(self, schema: str | None, table_types: list[Literal["table", "view"]] | None = None) -> list[str]:
    """Method to get the names of all tables in the database.

    Parameters
    ----------
    schema : str | None
        Name of the schema to get the tables from.
    table_types : list[Literal["table", "view"]] | None, optional
        List of table types to be returned. Can be one of ["table", "view"]. If set to None, will be equal to ["table"], by default None.

    Returns
    -------
    list[str]
        List of table names.

    """
    raise NotImplementedError("get_table_names must be implemented in subclass")

get_table_pkey(schema, table_names, return_type='columns') abstractmethod

Method to get the primary key of a table.

Can either return the name of the primary key or the columns associated with the primary key (depending on the type of SQL)

Parameters:

  • schema

    (str | None) –

    Name of the schema where the tables are located.

  • table_names

    (list[str]) –

    Name of the desired tables.

  • return_type

    (Literal['name', 'columns'], default: 'columns' ) –

    What to return. Can be either "name" of the primary key or "columns" for the columns involved, by default "columns".

Returns:

  • dict[str, list[str] | str]

    In case of return_type="columns", returns a dict in the format {table_name: [pk_col1, pk_col2, ...], ...}. In case of return_type="name", returns a dict in the format {table_name: pkey_name, ...}.

Source code in echo_connhandler/sql_core.py
@abstractmethod
def get_table_pkey(
    self,
    schema: str | None,
    table_names: list[str],
    return_type: Literal["name", "columns"] = "columns",
) -> dict[str, list[str] | str]:
    """Method to get the primary key of a table.

    Can either return the name of the primary key or the columns associated with the primary key (depending on the type of SQL)

    Parameters
    ----------
    schema : str | None
        Name of the schema where the tables are located.
    table_names : list[str]
        Name of the desired tables.
    return_type : Literal["name", "columns"], optional
        What to return. Can be either "name" of the primary key or "columns" for the columns involved, by default "columns".

    Returns
    -------
    dict[str, list[str] | str]
        In case of return_type="columns", returns a dict in the format {table_name: [pk_col1, pk_col2, ...], ...}.
        In case of return_type="name", returns a dict in the format {table_name: pkey_name, ...}.
    """
    raise NotImplementedError("get_table_pkey must be implemented in subclass")

pandas_to_sql(df, schema, table_name, if_exists='fail', ignore_index=False, index_name=None, pkey_cols=None, conflict_cols=None, **kwargs)

Method to write a Pandas DataFrame to a SQL table.

Parameters:

  • df

    (DataFrame) –

    DataFrame to be written.

  • schema

    (str | None) –

    Name of the schema of the table.

  • table_name

    (str) –

    Name of the table to be written to.

  • if_exists

    (Literal['fail', 'replace', 'append'], default: 'fail' ) –

    What to do if the table already exists.

    • fail: Will raise a ValueError.
    • replace: Will drop the table and write the DataFrame to a new table.
    • append: Will append the DataFrame to the existing table, skipping rows that already exist.
    • update: Will update the existing table with the DataFrame. This is the same as append but will update rows that already exist.
    • skip_row_check: Will skip the check if the rows already exist and write the DataFrame to the table. It assumes no insert conflicts.

    By default "fail".

  • ignore_index

    (bool, default: False ) –

    Whether to ignore the index of the DataFrame, by default False.

  • index_name

    (str | None, default: None ) –

    Name to set the index to. If None, the index will not be renamed. By default None.

  • pkey_cols

    ( list[str] | None, default: None ) –

    Name of the columns to set as primary key. If None, no primary key will be set. If is the same name as the index, index will be used.

    This is only applicable when replace is used or if the table does not exist.

    By default None.

  • conflict_cols

    (list[str] | None, default: None ) –

    List of columns to be used to check for conflict instead of the primary key. If not set, the primary key will be used. By default None.

  • **kwargs

    Used for backwards compatibility. Should be ignored.

Source code in echo_connhandler/sql_core.py
@validate_call
def pandas_to_sql(
    self,
    df: pd.DataFrame,
    schema: str | None,
    table_name: str,
    if_exists: Literal["fail", "replace", "append", "update", "skip_row_check"] = "fail",
    ignore_index: bool = False,
    index_name: str | None = None,
    pkey_cols: list[str] | None = None,
    conflict_cols: list[str] | None = None,
    **kwargs,
) -> None:
    """Method to write a Pandas DataFrame to a SQL table.

    Parameters
    ----------
    df : pd.DataFrame
        DataFrame to be written.
    schema: str | None
        Name of the schema of the table.
    table_name : str
        Name of the table to be written to.
    if_exists : Literal["fail", "replace", "append"], optional
        What to do if the table already exists.

        - fail: Will raise a ValueError.
        - replace: Will drop the table and write the DataFrame to a new table.
        - append: Will append the DataFrame to the existing table, skipping rows that already exist.
        - update: Will update the existing table with the DataFrame. This is the same as append but will update rows that already exist.
        - skip_row_check: Will skip the check if the rows already exist and write the DataFrame to the table. It assumes no insert conflicts.

        By default "fail".
    ignore_index : bool, optional
        Whether to ignore the index of the DataFrame, by default False.
    index_name : str | None, optional
        Name to set the index to. If None, the index will not be renamed. By default None.
    pkey_cols :  list[str] | None, optional
        Name of the columns to set as primary key. If None, no primary key will be set. If is the same name as the index, index will be used.

        This is only applicable when replace is used or if the table does not exist.

        By default None.
    conflict_cols : list[str] | None, optional
        List of columns to be used to check for conflict instead of the primary key. If not set, the primary key will be used.
        By default None.
    **kwargs
        Used for backwards compatibility. Should be ignored.
    """
    t1 = perf_counter()

    # pre-processing
    df = self._pre_df_to_sql(
        df=df,
        schema=schema,
        table_name=table_name,
        if_exists=if_exists,
        ignore_index=ignore_index,
        index_name=index_name,
        pkey_cols=pkey_cols,
        conflict_cols=conflict_cols,
        **kwargs,
    )

    # checking if DataFrame is empty
    if df.empty:
        logger.debug("DataFrame is empty. Nothing to do.")
        return

    self._pandas_to_sql(
        df=df,
        schema=schema,
        table_name=table_name,
        if_exists=if_exists,
        conflict_cols=conflict_cols,
        **kwargs,
    )

    logger.debug(
        f"DataFrame of shape {df.shape} written to table {table_name} in schema {schema} in {perf_counter() - t1:.2f} seconds.",
    )

polars_to_sql(df, schema, table_name, if_exists='fail', pkey_cols=None, conflict_cols=None, **kwargs)

Method to write a Polars DataFrame to a SQL table.

Parameters:

  • df

    (DataFrame) –

    Polars DataFrame to be written.

  • schema

    (str | None) –

    Name of the schema of the table.

  • table_name

    (str) –

    Name of the table to be written to.

  • if_exists

    (Literal['fail', 'replace', 'append'], default: 'fail' ) –

    What to do if the table already exists.

    • fail: Will raise a ValueError.
    • replace: Will drop the table and write the DataFrame to a new table.
    • append: Will append the DataFrame to the existing table, skipping rows that already exist.
    • update: Will update the existing table with the DataFrame. This is the same as append but will update rows that already exist.
    • skip_row_check: Will skip the check if the rows already exist and write the DataFrame to the table. It assumes no insert conflicts.

    By default "fail".

  • pkey_cols

    ( list[str] | None, default: None ) –

    Name of the columns to set as primary key. If None, no primary key will be set.

    This is only applicable when replace is used or if the table does not exist.

    By default None.

  • conflict_cols

    (list[str] | None, default: None ) –

    List of columns to be used to check for conflict instead of the primary key. If not set, the primary key will be used. By default None.

  • **kwargs

    Used for backwards compatibility. Should be ignored.

Source code in echo_connhandler/sql_core.py
@validate_call
def polars_to_sql(
    self,
    df: pl.DataFrame,
    schema: str | None,
    table_name: str,
    if_exists: Literal["fail", "replace", "append", "update", "skip_row_check"] = "fail",
    pkey_cols: list[str] | None = None,
    conflict_cols: list[str] | None = None,
    **kwargs,
) -> None:
    """Method to write a Polars DataFrame to a SQL table.

    Parameters
    ----------
    df : pl.DataFrame
        Polars DataFrame to be written.
    schema: str | None
        Name of the schema of the table.
    table_name : str
        Name of the table to be written to.
    if_exists : Literal["fail", "replace", "append"], optional
        What to do if the table already exists.

        - fail: Will raise a ValueError.
        - replace: Will drop the table and write the DataFrame to a new table.
        - append: Will append the DataFrame to the existing table, skipping rows that already exist.
        - update: Will update the existing table with the DataFrame. This is the same as append but will update rows that already exist.
        - skip_row_check: Will skip the check if the rows already exist and write the DataFrame to the table. It assumes no insert conflicts.

        By default "fail".
    pkey_cols :  list[str] | None, optional
        Name of the columns to set as primary key. If None, no primary key will be set.

        This is only applicable when replace is used or if the table does not exist.

        By default None.
    conflict_cols : list[str] | None, optional
        List of columns to be used to check for conflict instead of the primary key. If not set, the primary key will be used.
        By default None.
    **kwargs
        Used for backwards compatibility. Should be ignored.
    """
    t1 = perf_counter()

    # pre-processing
    df = self._pre_df_to_sql(
        df=df,
        schema=schema,
        table_name=table_name,
        if_exists=if_exists,
        pkey_cols=pkey_cols,
        conflict_cols=conflict_cols,
        index_name=None,
        ignore_index=True,
        **kwargs,
    )

    # checking if DataFrame is empty
    if len(df) == 0:
        logger.debug("DataFrame is empty. Nothing to do.")
        return

    self._polars_to_sql(
        df=df,
        schema=schema,
        table_name=table_name,
        if_exists=if_exists,
        conflict_cols=conflict_cols,
        **kwargs,
    )

    logger.debug(
        f"DataFrame of shape {df.shape} written to table {table_name} in schema {schema} in {perf_counter() - t1:.2f} seconds.",
    )

read_to_pandas(query, **kwargs)

Method to read a SQL query into a Pandas DataFrame.

Parameters:

  • query

    (SQL | Composed | str) –

    Query to be executed. Can be a SQL statement from psycopg.sql.SQL or psycopg.sql.Composed (preferable) or a string.

Other Parameters:

  • dtype_backend (str | None) –

    Backend to use for type conversion. Can be 'pyarrow', 'numpy_nullable' or None (which simulates as if this argument was not passed). By default we are using pyarrow

  • dtype (dict[str, str]) –

    Dict in the format {column_name: dtype, ...} to be used to convert the columns to the desired data type.

  • index_col (str | list[str]) –

    Column(s) to set as index of the DataFrame.

  • parse_dates (list[str] | dict[str, str]) –

    Column(s) to parse as date(s). If a dict is passed, the keys should be column names and the values should be the date format.

  • post_convert

    Can be set to 'pyarrow', 'numpy_nullable' or None. This will call df.convert_dtypes after reading the DataFrame.

    Can be useful in cases when there is a JSON column that is not being read correctly by using dtype_backend to 'pyarrow'. In this case set dtype_backend to None and post_convert to 'pyarrow'.

  • remove_time_zone (bool) –

    If set to True, will remove the time zone from the datetime columns that are time zone aware. This will keep the values in the time zone defined in the connection properties. Will default to True if not set.

  • skip_retry (bool) –

    If set to True, will ignore number of retries set in handler and will raise the error immediately.

Returns:

  • DataFrame

    DataFrame containing the results of the query.

Source code in echo_connhandler/sql_core.py
@validate_call
def read_to_pandas(self, query: SQL | Composed | str, **kwargs: Unpack[ReadToPandasKwargs]) -> pd.DataFrame:
    """Method to read a SQL query into a Pandas DataFrame.

    Parameters
    ----------
    query : SQL | Composed | str
        Query to be executed. Can be a SQL statement from psycopg.sql.SQL or psycopg.sql.Composed (preferable) or a string.

    Other Parameters
    ----------------
    dtype_backend : str | None
        Backend to use for type conversion. Can be 'pyarrow', 'numpy_nullable' or None (which simulates as if this argument was not passed).
        By default we are using pyarrow
    dtype : dict[str, str]
        Dict in the format {column_name: dtype, ...} to be used to convert the columns to the desired data type.
    index_col : str | list[str]
        Column(s) to set as index of the DataFrame.
    parse_dates : list[str] | dict[str, str]
        Column(s) to parse as date(s). If a dict is passed, the keys should be column names and the values should be the date format.
    post_convert: str | None
        Can be set to 'pyarrow', 'numpy_nullable' or None. This will call df.convert_dtypes after reading the DataFrame.

        Can be useful in cases when there is a JSON column that is not being read correctly by using dtype_backend to 'pyarrow'. In this case set dtype_backend to None and post_convert to 'pyarrow'.
    remove_time_zone : bool
        If set to True, will remove the time zone from the datetime columns that are time zone aware. This will keep the values in the time zone defined in the connection properties.
        Will default to True if not set.
    skip_retry : bool
        If set to True, will ignore number of retries set in handler and will raise the error immediately.

    Returns
    -------
    pd.DataFrame
        DataFrame containing the results of the query.
    """
    # defining dtype_backend to pyarrow as default
    if "dtype_backend" not in kwargs:
        if "post_convert" not in kwargs or kwargs["post_convert"] is None:
            kwargs["dtype_backend"] = "pyarrow"
        else:
            kwargs["dtype_backend"] = None
    # deleting dtype_backend if was set to None
    if kwargs["dtype_backend"] is None:
        del kwargs["dtype_backend"]

    remove_time_zone = True if "remove_time_zone" not in kwargs else kwargs.pop("remove_time_zone")

    # getting post_convert
    post_convert = kwargs.pop("post_convert", None)
    if post_convert not in [None, "pyarrow", "numpy_nullable"]:
        raise ValueError(f"post_convert must be one of [None, 'pyarrow', 'numpy_nullable'], not {post_convert}")
    if "dtype_backend" in kwargs and post_convert is not None:
        raise ValueError("post_convert can only be used if dtype_backend is not set")

    query_success = False
    attempt = 0
    skip_retry = kwargs.pop("skip_retry") if "skip_retry" in kwargs else False

    while not query_success:
        try:
            # everything is in a try block so if the connection is lost, it can be reconnected and the query can be executed again
            # this is the case for performance_db, where we have configured idle session timeout of 60 seconds

            if isinstance(query, SQL | Composed):
                if isinstance(self.connection, pg.Connection):  # noqa
                    query_str = query.as_string(self.connection)
                else:
                    # ! This will fail if query is a sql.Composed object containing a sql.Identifier, in this case change sql.Identifier to sql.SQL
                    query_str = query.as_string(None)
            elif isinstance(query, str):
                query_str = query
            else:
                raise TypeError(f"query must be of type psycopg.sql.SQL, psycopg.sql.Composed or string, not {type(query)}")

            df: pd.DataFrame = pd.read_sql(sql=query_str, con=self.connection, **kwargs)
            query_success = True
        except Exception as e:
            if skip_retry:
                raise e
            if attempt == 0:
                # storing the first error to be raised later
                first_error = e
            attempt += 1
            if attempt > self.max_retries:
                raise RuntimeError(
                    f"Reachead {attempt} attempts and could not execute query below using {self.connection_properties}. The error presented is of the first try.\n{query_str}",
                ) from first_error  # type: ignore

            sleep(self.retry_wait_time)
            self.reconnect()

    # checking if we should remove the time zone
    if remove_time_zone:
        # iterating columns
        for col in df.columns:
            # checking if column has dt attribute
            if "duration" in str(df[col].dtype).lower() or "timedelta" in str(df[col].dtype).lower():
                continue
            # sourcery skip: merge-nested-ifs
            if hasattr(df[col], "dt"):  # noqa: SIM102
                # skipping if is duration
                # checking if column is time zone aware
                if df[col].dt.tz is not None:
                    # removing time zone
                    # here we have to convert to datetime64 because pyarrow has a bug that makes the final result stay at UTC
                    df[col] = (
                        df[col].astype(f"datetime64[{df[col].dt.unit}, {self.connection_properties.time_zone}]").dt.tz_localize(None)
                    )

    # converting the DataFrame to the desired data types
    if post_convert is not None:
        df = df.convert_dtypes(dtype_backend=post_convert)

    # doing any post-processing of the DataFrame
    df = self._process_pandas_read(df, **kwargs)

    return df

read_to_polars(query, **kwargs)

Method to read a SQL query into a Polars DataFrame.

Parameters:

  • query

    (SQL | Composed | str) –

    Query to be executed. Can be a SQL statement from psycopg.sql.SQL or psycopg.sql.Composed (preferable) or a string.

Other Parameters:

  • schema_overrides (dict[str, DataTypeClass] | None) –

    A dictionary mapping column names to dtypes, used to override the schema inferred from the query. An example would be {"col1": pl.Int64, "col2": pl.Float64}.

  • infer_schema_length (int | None) –

    The maximum number of rows to scan for schema inference. If set to None, the full data may be scanned. Polars sets 100 by default.

  • skip_retry (bool) –

    If set to True, will ignore number of retries set in handler and will raise the error immediately.

Returns:

  • DataFrame

    DataFrame containing the results of the query.

Source code in echo_connhandler/sql_core.py
@validate_call
def read_to_polars(self, query: SQL | Composed | str, **kwargs: Unpack[ReadToPolarsKwargs]) -> pl.DataFrame:
    """Method to read a SQL query into a Polars DataFrame.

    Parameters
    ----------
    query : SQL | Composed | str
        Query to be executed. Can be a SQL statement from psycopg.sql.SQL or psycopg.sql.Composed (preferable) or a string.

    Other Parameters
    ----------------
    schema_overrides : dict[str, pl.DataTypeClass] | None
        A dictionary mapping column names to dtypes, used to override the schema inferred from the query. An example would be {"col1": pl.Int64, "col2": pl.Float64}.
    infer_schema_length : int | None
        The maximum number of rows to scan for schema inference. If set to None, the full data may be scanned. Polars sets 100 by default.
    skip_retry : bool
        If set to True, will ignore number of retries set in handler and will raise the error immediately.

    Returns
    -------
    pl.DataFrame
        DataFrame containing the results of the query.
    """
    query_success = False
    attempt = 0
    skip_retry = kwargs.pop("skip_retry") if "skip_retry" in kwargs else False

    while not query_success:
        try:
            # everything is in a try block so if the connection is lost, it can be reconnected and the query can be executed again
            # this is the case for performance_db, where we have configured idle session timeout of 60 seconds

            if isinstance(query, SQL | Composed):
                if isinstance(self.connection, pg.Connection):  # noqa
                    query_str = query.as_string(self.connection)
                else:
                    # ! This will fail if query is a sql.Composed object containing a sql.Identifier, in this case change sql.Identifier to sql.SQL
                    query_str = query.as_string(None)
            elif isinstance(query, str):
                query_str = query
            else:
                raise TypeError(f"query must be of type psycopg.sql.SQL, psycopg.sql.Composed or string, not {type(query)}")

            df: pl.DataFrame = pl.read_database(query=query, connection=self.connection, **kwargs)
            query_success = True
        except Exception as e:
            if skip_retry:
                raise e
            if attempt == 0:
                # storing the first error to be raised later
                first_error = e
            attempt += 1
            if attempt > self.max_retries:
                raise RuntimeError(
                    f"Reachead {attempt} attempts and could not execute query below using {self.connection_properties}. The error presented is of the first try.\n{query_str}",
                ) from first_error  # type: ignore

            sleep(self.retry_wait_time)
            self.reconnect()

    return df