Skip to content

SQL Core

SqlConnProperties(host, user, password, database, port, schema=None, driver=None, conn_timeout=30, query_timeout=300, autocommit=True, application_name=None, time_zone='America/Sao_Paulo', auto_close_connection=True) dataclass

Class that holds the properties used to establish a connection to a SQL database.

Parameters:

  • host

    (str) –

    The hostname of the database server.

  • user

    (str) –

    User name

  • password

    (str) –

    Password for the user

  • database

    (str) –

    Name of database to connect to.

  • port

    (int) –

    Port used to connect to the database. Defaults are 5432 for PostgreSQL and 1433 for MS SQL Server.

  • schema

    (str | None, default: None ) –

    Name of the schema to connect to. If set to None, will connect to the default schema. By default None.

  • driver

    (str | None, default: None ) –

    Driver used to connect to the database, by default None.

    Currently this is only applicable to MS SQL Server (something like "ODBC Driver 17 for SQL Server" should be used). If you want to skip pyodbc, set this to "pymssql".

  • conn_timeout

    (int, default: 30 ) –

    Timeout in seconds for connection, by default 30

  • query_timeout

    (int, default: 300 ) –

    Query timeout in seconds, by default 300

  • autocommit

    (bool, default: True ) –

    If set to True, connection will be committed automatically after every transaction. By default True.

  • application_name

    (str | None, default: None ) –

    Name of application. If set to None will be set to f"EchoSqlConnection:{socket.gethostname()}" where socket.gethostname() is the name of the computer running this code. By default None.

  • time_zone

    (str, default: 'America/Sao_Paulo' ) –

    Time zone to be used in the connection. By default "America/Sao_Paulo".

  • auto_close_connection

    (bool, default: True ) –

    If True, the connection will be automatically closed after every query. By default True.

__post_init__()

Method that checks if inputs are valid after initialization.

Source code in echo_connhandler/sql_core.py
def __post_init__(self) -> None:
    """Method that checks if inputs are valid after initialization."""
    if not isinstance(self.host, str):
        raise TypeError(f"host must be of type str, not {type(self.host)}")
    if not isinstance(self.user, str):
        raise TypeError(f"user must be of type str, not {type(self.user)}")
    try:
        self.password = str(self.password)
    except Exception:
        raise TypeError(f"password must be convertible to str, not {type(self.password)}")  # noqa: B904
    if not isinstance(self.database, str):
        raise TypeError(f"database must be of type str, not {type(self.database)}")
    if self.schema is not None and not isinstance(self.schema, str):
        raise TypeError(f"schema must be of type str or None, not {type(self.schema)}")
    if not isinstance(self.port, int):
        raise TypeError(f"port must be of type int, not {type(self.port)}")
    if self.driver is not None and not isinstance(self.driver, str):
        raise TypeError(f"driver must be of type str or None, not {type(self.driver)}")
    if not isinstance(self.conn_timeout, int):
        raise TypeError(f"conn_timeout must be of type int, not {type(self.conn_timeout)}")
    if not isinstance(self.query_timeout, int):
        raise TypeError(f"query_timeout must be of type int, not {type(self.query_timeout)}")
    if not isinstance(self.autocommit, bool):
        raise TypeError(f"autocommit must be of type bool, not {type(self.autocommit)}")
    if not isinstance(self.application_name, str | type(None)):
        raise TypeError(f"application_name must be of type str or None, not {type(self.application_name)}")
    if not isinstance(self.time_zone, str):
        raise TypeError(f"time_zone must be of type str, not {type(self.time_zone)}")

    # strip surrounding double quotes from string values (they may come from database-stored configs)
    for field_name in ("host", "user", "password", "database", "schema", "driver", "application_name"):
        value = getattr(self, field_name)
        if value is not None and len(value) >= 2 and value[0] == '"' and value[-1] == '"':
            object.__setattr__(self, field_name, value[1:-1])

    # adjusting application_name
    if self.application_name is None:
        self.application_name = f"EchoSqlConnection:{socket.gethostname()}"

SqlHandler(connection_properties, max_retries=8, max_conn_retries=3, retry_wait_time=30, exponential_min_retry_wait_time=0.1, exponential_multiplier=1, skip_connect=True, **kwargs)

Base abstract class for SQL handlers. This class should not be instantiated directly.

This class should be used to handle connecting and doing queries to SQL servers. The _connect() method should be defined in subclasses as it might differ for each type of SQL (PostgreSQL, MySQL, MS SQL Server, etc.).

This already connects to the SQL server.

This handler implements a robust retry strategy using exponential backoff to gracefully manage connection or query failures. By default, operations are attempted up to 8 times; between failures, the wait time doubles exponentially—starting at 0.1 second (0.1s, 0.2s, 0.4s, 0.8s, 1.6s, 3.2s, 6.4s, 12.8s )—until it reaches a configured ceiling of 30 seconds. This approach allows for rapid recovery from momentary glitches while preventing server overload during persistent outages.

Using the default parameters the connection will fail after around 90 seconds if the server is not reachable.

Parameters:

  • connection_properties

    (SqlConnProperties) –

    Object containing connection parameters.

  • max_retries

    (int, default: 8 ) –

    Number of retries that will be attempted when doing queries. Will be used in stop parameter of tenacity.stop_after_attempt, by default 8

  • max_conn_retries

    (int, default: 3 ) –

    Number of retries that will be attempted when reconnecting. Will be used in stop parameter of tenacity.stop_after_attempt, by default 3

  • exponential_min_retry_wait_time

    (float, default: 0.1 ) –

    Min time to wait between retries when reconnecting or doing queries. Will be used in min parameter of tenacity.wait_exponential, by default 0.1

  • exponential_multiplier

    (float, default: 1 ) –

    Multiplier to use when calculating wait time between retries when reconnecting or doing queries. Will be used in multiplier parameter of tenacity.wait_exponential, by default 1

  • skip_connect

    (bool, default: True ) –

    If True, the connection will not be established when the object is created.

    If this is set toTrue, the user will need to manually call the reconnect() method when they want to connect to the server.

    By default True

  • **kwargs

    (dict, default: {} ) –

    Just kept here for compatibility.

Source code in echo_connhandler/sql_core.py
def __init__(
    self,
    connection_properties: SqlConnProperties,
    max_retries: int = 8,
    max_conn_retries: int = 3,
    retry_wait_time: float = 30,
    exponential_min_retry_wait_time: float = 0.1,
    exponential_multiplier: float = 1,
    skip_connect: bool = True,
    **kwargs,  # pylint: disable=unused-argument # noqa
) -> None:
    """Method that initializes the SQL handler.

    This already connects to the SQL server.

    This handler implements a robust retry strategy using exponential backoff to gracefully manage connection or query failures. By default, operations are attempted up to 8 times; between failures, the wait time doubles exponentially—starting at 0.1 second (0.1s, 0.2s, 0.4s, 0.8s, 1.6s, 3.2s, 6.4s, 12.8s  )—until it reaches a configured ceiling of 30 seconds. This approach allows for rapid recovery from momentary glitches while preventing server overload during persistent outages.

    Using the default parameters the connection will fail after around 90 seconds if the server is not reachable.

    Parameters
    ----------
    connection_properties : SqlConnProperties
        Object containing connection parameters.
    max_retries : int, optional
        Number of retries that will be attempted when doing queries. Will be used in `stop` parameter of tenacity.stop_after_attempt, by default 8
    max_conn_retries : int, optional
        Number of retries that will be attempted when reconnecting. Will be used in `stop` parameter of tenacity.stop_after_attempt, by default 3
    exponential_min_retry_wait_time: float, optional
        Min time to wait between retries when reconnecting or doing queries. Will be used in `min` parameter of tenacity.wait_exponential, by default 0.1
    exponential_multiplier: float, optional
        Multiplier to use when calculating wait time between retries when reconnecting or doing queries. Will be used in `multiplier` parameter of tenacity.wait_exponential, by default 1
    skip_connect : bool, optional
        If True, the connection will not be established when the object is created.

        If this is set toTrue, the user will need to manually call the reconnect() method when they want to connect to the server.

        By default True
    **kwargs : dict, optional
        Just kept here for compatibility.

    """
    super().__init__(
        connection_properties=connection_properties,
        max_retries=max_retries,
        max_conn_retries=max_conn_retries,
        retry_wait_time=retry_wait_time,
        exponential_min_retry_wait_time=exponential_min_retry_wait_time,
        exponential_multiplier=exponential_multiplier,
        skip_connect=skip_connect,
    )
    self._last_cursor: Any = None
    self._adbc_connection: Any = None
    self._suppress_auto_close: bool = False

rowcount property

Property to return the number of rows affected by the last query.

check_df_columns(df, schema, table_name, must_have_all=False, allow_extra_columns=False)

Method used to check if the columns in a DataFrame match the columns in a table.

This will check both the names of the columns match and if their data types are compatible. Gracefully handles custom user-defined types (like ENUMs) by skipping the strict type check.

Source code in echo_connhandler/sql_core.py
@validate_call
def check_df_columns(
    self,
    df: pd.DataFrame | pl.DataFrame,
    schema: str | None,
    table_name: str,
    must_have_all: bool = False,
    allow_extra_columns: bool = False,
) -> None:
    """Method used to check if the columns in a DataFrame match the columns in a table.

    This will check both the names of the columns match and if their data types are compatible.
    Gracefully handles custom user-defined types (like ENUMs) by skipping the strict type check.
    """
    # checking if table exists
    if not self.table_exists(schema=schema, table_name=table_name):
        raise ValueError(f"Table {table_name} does not exist in schema {schema}")

    # getting type of DataFrame
    df_dtype = "pandas" if isinstance(df, pd.DataFrame) else "polars"

    # getting table columns
    table_columns: dict[str, str] = self.get_table_columns(schema=schema, table_names=[table_name])[table_name]

    # checking if all columns in the DataFrame are in the table
    if not allow_extra_columns:  # noqa
        if cols_not_in_table := set(df.columns) - set(table_columns):
            raise ValueError(f"DataFrame has columns not in table: {cols_not_in_table}")
    # checking if all columns in the table are in the DataFrame
    if must_have_all:  # noqa
        if cols_not_in_df := set(table_columns) - set(df.columns):
            raise ValueError(f"must_have_all = True and table has columns not in DataFrame: {cols_not_in_df}")

    # checking if the data types match
    if isinstance(df, pd.DataFrame):
        sql_dtypes = self._get_pandas_sql_dtypes(df)
    elif isinstance(df, pl.DataFrame):
        sql_dtypes = self._get_polars_sql_dtypes(df)
    else:
        raise TypeError(f"df must be of type pd.DataFrame or pl.DataFrame, not {type(df)}")

    # Collect all standard known SQL types to identify custom/ENUM types
    all_known_sql_dtypes = {x.lower() for val in self._data_type_mapping[df_dtype].values() for x in val["sql"]}

    for col, dtype in sql_dtypes.items():
        allowed_sql_dtypes = self._data_type_mapping[df_dtype][dtype]["sql"]
        db_col_dtype = table_columns[col].lower()

        if db_col_dtype not in (x.lower() for x in allowed_sql_dtypes):
            # If the database type is completely custom (e.g. 'type_inv_withdrawal_status')
            # we skip the strict mapping validation and trust the Postgres internal cast mechanism.
            if db_col_dtype not in all_known_sql_dtypes:
                logger.debug(f"Column '{col}' has custom database type '{table_columns[col]}'. Skipping strict dtype validation.")
                continue

            raise ValueError(
                f"Column {col} in DataFrame has dtype {df[col].dtype} which does not match the table's dtype {table_columns[col]}. Allowed SQL dtypes for this column dtype are {allowed_sql_dtypes}.",
            )

commit()

Method to commit the current transaction.

Source code in echo_connhandler/sql_core.py
def commit(self) -> None:
    """Method to commit the current transaction."""
    try:
        self.connection.commit()
    except Exception as e:
        raise RuntimeError(f"Could not commit transaction to {self}") from e

create_table(schema, table_name, columns, pkey_cols=None, if_exists='fail', temporary=False) abstractmethod

Method to create a table in the database.

Parameters:

  • schema

    (str | None) –

    Name of the schema to create the table in. If temporary is set to True, this must be set to None.

  • table_name

    (str) –

    Name of the table to be created.

  • columns

    (dict[str, str]) –

    Dict in the format {column_name: column_type, ...}.

  • pkey_cols

    (list[str] | None, default: None ) –

    List of columns to be set as primary key. If None, no primary key will be set. By default None.

  • if_exists

    (Literal['fail', 'replace'], default: 'fail' ) –

    What to do if the table already exists.

    • fail: Will raise a ValueError.
    • replace: Will drop the table and create a new one.

    By default "fail".

  • temporary

    (bool, default: False ) –

    If set to True, the table will be created as temporary. By default False.

Source code in echo_connhandler/sql_core.py
@abstractmethod
def create_table(
    self,
    schema: str | None,
    table_name: str,
    columns: dict[str, str],
    pkey_cols: list[str] | None = None,
    if_exists: Literal["fail", "replace"] = "fail",
    temporary: bool = False,
) -> None:
    """Method to create a table in the database.

    Parameters
    ----------
    schema : str | None
        Name of the schema to create the table in. If temporary is set to True, this must be set to None.
    table_name : str
        Name of the table to be created.
    columns : dict[str, str]
        Dict in the format {column_name: column_type, ...}.
    pkey_cols : list[str] | None, optional
        List of columns to be set as primary key. If None, no primary key will be set. By default None.
    if_exists : Literal["fail", "replace"], optional
        What to do if the table already exists.

        - fail: Will raise a ValueError.
        - replace: Will drop the table and create a new one.

        By default "fail".
    temporary : bool, optional
        If set to True, the table will be created as temporary. By default False.

    """
    raise NotImplementedError("create_table must be implemented in subclass")

cursor()

Method to return the cursor to the current connection.

Source code in echo_connhandler/sql_core.py
def cursor(self) -> Any:
    """Method to return the cursor to the current connection."""
    try:
        return self.connection.cursor()
    except Exception:
        self.reconnect()
        return self.connection.cursor()

delete_table(schema, table_name, on_error='fail') abstractmethod

Method to delete a table in the database.

Parameters:

  • schema

    (str | None) –

    Name of the schema of the table to delete.

  • table_name

    (str) –

    Name of the table to be deleted.

  • on_error

    (Literal['fail', 'ignore'], default: 'fail' ) –

    What to do if the table does not exist. - fail: Will raise a ValueError. - ignore: Will ignore the error. By default "fail".

Source code in echo_connhandler/sql_core.py
@abstractmethod
def delete_table(
    self,
    schema: str | None,
    table_name: str,
    on_error: Literal["fail", "ignore"] = "fail",
) -> None:
    """Method to delete a table in the database.

    Parameters
    ----------
    schema : str | None
        Name of the schema of the table to delete.
    table_name: str
        Name of the table to be deleted.
    on_error : Literal["fail", "ignore"], optional
        What to do if the table does not exist.
        - fail: Will raise a ValueError.
        - ignore: Will ignore the error.
        By default "fail".

    """
    raise NotImplementedError("delete_table must be implemented in subclass")

execute(query, *args, output='cursor', **kwargs)

Method to execute a SQL query using cursor.

Parameters:

  • query

    (SQL | Composed | str) –

    Query to be executed. Can be a SQL statement from psycopg.sql.SQL or psycopg.sql.Composed (preferable) or a string.

  • *args

    (tuple, default: () ) –

    Any additional positional arguments are passed to cursor.execute. Positioned here so keyword arguments after this are treated as keyword only.

  • output

    (Literal['cursor', 'result'], default: 'cursor' ) –

    What to return. Can be either "cursor" to return the cursor after executing the query or "result" to return the result of cursor.execute. By default "cursor".

  • **kwargs

    Any additional keyword arguments are passed to cursor.execute. - skip_retry : bool If set to True, will ignore number of retries set in handler and will raise the error immediately.

Returns:

  • Any

    The result of cursor.execute or the cursor itself, depending on the output parameter.

Source code in echo_connhandler/sql_core.py
@validate_call
def execute(self, query: SQL | Composed | str, *args, output: Literal["cursor", "result"] = "cursor", **kwargs) -> Any:  # noqa
    """Method to execute a SQL query using cursor.

    Parameters
    ----------
    query : SQL | Composed | str
        Query to be executed. Can be a SQL statement from psycopg.sql.SQL or psycopg.sql.Composed (preferable) or a string.
    *args : tuple
        Any additional positional arguments are passed to `cursor.execute`. Positioned here so keyword arguments after this are treated as keyword only.
    output : Literal["cursor", "result"], optional
        What to return. Can be either "cursor" to return the cursor after executing the query or "result" to return the result of cursor.execute. By default "cursor".
    **kwargs
        Any additional keyword arguments are passed to `cursor.execute`.
        - skip_retry : bool
            If set to True, will ignore number of retries set in handler and will raise the error immediately.

    Returns
    -------
    Any
        The result of `cursor.execute` or the cursor itself, depending on the `output` parameter.

    """
    skip_retry = kwargs.pop("skip_retry") if "skip_retry" in kwargs else False

    # Configure tenacity retry logic with exponential backoff
    if skip_retry:
        # If skip_retry is True, execute without retry logic
        if isinstance(query, SQL | Composed):
            if isinstance(self.connection, pg.Connection):  # noqa
                query_str = query.as_string(self.connection)
            else:
                # ! This will fail if query is a sql.Composed object containing a sql.Identifier, in this case change sql.Identifier to sql.SQL
                query_str = query.as_string(None)
        elif isinstance(query, str):
            query_str = query
        else:
            raise TypeError(f"query must be of type psycopg.sql.SQL, psycopg.sql.Composed or string, not {type(query)}")

        cur = self.cursor()
        res = cur.execute(query_str, *args, **kwargs)
    else:
        retryer = Retrying(
            stop=stop_after_attempt(self.max_retries + 1),
            wait=wait_exponential(
                multiplier=self.exponential_multiplier,
                min=self.exponential_min_retry_wait_time,
                max=self.retry_wait_time,
            ),
            retry=retry_if_exception_type(Exception),
            reraise=True,
            before_sleep=lambda retry_state: logger.exception(
                f"Attempt {retry_state.attempt_number}: Could not execute query. Will wait {retry_state.next_action.sleep}s until next call. Arguments used {self.connection_properties}\n{query.as_string(self.connection) if isinstance(query, SQL | Composed) and isinstance(self._connection, pg.Connection) else query}",
            ),
        )

        for attempt in retryer:
            with attempt:
                # Reconnect after sleep (for retry attempts)
                if attempt.retry_state.attempt_number > 1:
                    self.reconnect()
                # everything is in a try block so if the connection is lost, it can be reconnected and the query can be executed again
                # this is the case for performance_db, where we have configured idle session timeout of 60 seconds

                if isinstance(query, SQL | Composed):
                    if isinstance(self.connection, pg.Connection):
                        query_str = query.as_string(self.connection)
                    else:
                        # ! This will fail if query is a sql.Composed object containing a sql.Identifier, in this case change sql.Identifier to sql.SQL
                        query_str = query.as_string(None)
                elif isinstance(query, str):
                    query_str = query
                else:
                    raise TypeError(f"query must be of type psycopg.sql.SQL, psycopg.sql.Composed or string, not {type(query)}")

                cur = self.cursor()
                res = cur.execute(query_str, *args, **kwargs)
    self._last_cursor = cur

    if output == "cursor":
        return cur
    if output == "result":
        self._auto_close_if_needed()
        return res
    raise ValueError(f"Invalid output value: {output}. Must be 'cursor' or 'result'.")

get_table_columns(schema, table_names) abstractmethod

Method to get the columns in a table and it's data types.

Parameters:

  • schema

    (str | None) –

    Name of the schema where the tables are located.

  • table_names

    (list[str]) –

    Name of the desired tables.

Returns:

  • dict[str, dict[str, str]]

    Dict in the format {table_name: {column_name: column_type, ...}, ...}.

Source code in echo_connhandler/sql_core.py
@abstractmethod
def get_table_columns(self, schema: str | None, table_names: list[str]) -> dict[str, dict[str, str]]:
    """Method to get the columns in a table and it's data types.

    Parameters
    ----------
    schema : str | None
        Name of the schema where the tables are located.
    table_names : list[str]
        Name of the desired tables.

    Returns
    -------
    dict[str, dict[str, str]]
        Dict in the format {table_name: {column_name: column_type, ...}, ...}.

    """
    raise NotImplementedError("get_table_columns must be implemented in subclass")

get_table_names(schema, table_types=None) abstractmethod

Method to get the names of all tables in the database.

Parameters:

  • schema

    (str | None) –

    Name of the schema to get the tables from.

  • table_types

    (list[Literal['table', 'view']] | None, default: None ) –

    List of table types to be returned. Can be one of ["table", "view"]. If set to None, will be equal to ["table"], by default None.

Returns:

  • list[str]

    List of table names.

Source code in echo_connhandler/sql_core.py
@abstractmethod
def get_table_names(self, schema: str | None, table_types: list[Literal["table", "view"]] | None = None) -> list[str]:
    """Method to get the names of all tables in the database.

    Parameters
    ----------
    schema : str | None
        Name of the schema to get the tables from.
    table_types : list[Literal["table", "view"]] | None, optional
        List of table types to be returned. Can be one of ["table", "view"]. If set to None, will be equal to ["table"], by default None.

    Returns
    -------
    list[str]
        List of table names.

    """
    raise NotImplementedError("get_table_names must be implemented in subclass")

get_table_pkey(schema, table_names, return_type='columns') abstractmethod

Method to get the primary key of a table.

Can either return the name of the primary key or the columns associated with the primary key (depending on the type of SQL)

Parameters:

  • schema

    (str | None) –

    Name of the schema where the tables are located.

  • table_names

    (list[str]) –

    Name of the desired tables.

  • return_type

    (Literal['name', 'columns'], default: 'columns' ) –

    What to return. Can be either "name" of the primary key or "columns" for the columns involved, by default "columns".

Returns:

  • dict[str, list[str] | str]

    In case of return_type="columns", returns a dict in the format {table_name: [pk_col1, pk_col2, ...], ...}. In case of return_type="name", returns a dict in the format {table_name: pkey_name, ...}.

Source code in echo_connhandler/sql_core.py
@abstractmethod
def get_table_pkey(
    self,
    schema: str | None,
    table_names: list[str],
    return_type: Literal["name", "columns"] = "columns",
) -> dict[str, list[str] | str]:
    """Method to get the primary key of a table.

    Can either return the name of the primary key or the columns associated with the primary key (depending on the type of SQL)

    Parameters
    ----------
    schema : str | None
        Name of the schema where the tables are located.
    table_names : list[str]
        Name of the desired tables.
    return_type : Literal["name", "columns"], optional
        What to return. Can be either "name" of the primary key or "columns" for the columns involved, by default "columns".

    Returns
    -------
    dict[str, list[str] | str]
        In case of return_type="columns", returns a dict in the format {table_name: [pk_col1, pk_col2, ...], ...}.
        In case of return_type="name", returns a dict in the format {table_name: pkey_name, ...}.

    """
    raise NotImplementedError("get_table_pkey must be implemented in subclass")

pandas_to_sql(df, schema, table_name, if_exists='fail', ignore_index=False, index_name=None, pkey_cols=None, conflict_cols=None, return_cols=None, ignore_null_cols=True, **kwargs)

Method to write a Pandas DataFrame to a SQL table.

Parameters:

  • df

    (DataFrame) –

    DataFrame to be written.

  • schema

    (str | None) –

    Name of the schema of the table.

  • table_name

    (str) –

    Name of the table to be written to.

  • if_exists

    (Literal['fail', 'replace', 'append', 'update', 'update_only', 'skip_row_check'], default: 'fail' ) –

    What to do if the table already exists.

    • fail: Will raise a ValueError.
    • replace: Will drop the table and write the DataFrame to a new table.
    • append: Will append the DataFrame to the existing table, skipping rows that already exist.
    • update: Will update the existing table with the DataFrame. This is the same as append but will update rows that already exist.
    • update_only: Will update only the rows that already exist in the table. If a row does not exist, it will be skipped. Different from update, this does not use INSERT ON CONFLICT but uses only UPDATE statements, which allows for updates in tables with IDENTITY primary keys that do not allow inserts. When used must provide conflict_cols. Please note that columns in conflict_cols will not be updated.
    • skip_row_check: Will skip the check if the rows already exist and write the DataFrame to the table. It assumes no insert conflicts.

    By default "fail".

  • ignore_index

    (bool, default: False ) –

    Whether to ignore the index of the DataFrame, by default False.

  • index_name

    (str | None, default: None ) –

    Name to set the index to. If None, the index will not be renamed. By default None.

  • pkey_cols

    ( list[str] | None, default: None ) –

    Name of the columns to set as primary key. If None, no primary key will be set. If is the same name as the index, index will be used.

    This is only applicable when replace is used or if the table does not exist.

    By default None.

  • conflict_cols

    (list[str] | None, default: None ) –

    List of columns to be used to check for conflict instead of the primary key. If not set, the primary key will be used. By default None.

  • return_cols

    (list[str] | None, default: None ) –

    List of columns to return after the operation. If None, no columns will be returned. By default None.

  • ignore_null_cols

    (bool, default: True ) –

    If set to True, will remove columns that contain only null values before writing to SQL. By default True.

  • **kwargs

    Used for backwards compatibility. Should be ignored.

Returns:

  • DataFrame | None

    DataFrame with the specified return columns if return_cols is set. Otherwise, None

Source code in echo_connhandler/sql_core.py
@validate_call
def pandas_to_sql(
    self,
    df: pd.DataFrame,
    schema: str | None,
    table_name: str,
    if_exists: Literal["fail", "replace", "append", "update", "update_only", "skip_row_check"] = "fail",
    ignore_index: bool = False,
    index_name: str | None = None,
    pkey_cols: list[str] | None = None,
    conflict_cols: list[str] | None = None,
    return_cols: list[str] | None = None,
    ignore_null_cols: bool = True,
    **kwargs,
) -> pd.DataFrame | None:
    """Method to write a Pandas DataFrame to a SQL table.

    Parameters
    ----------
    df : pd.DataFrame
        DataFrame to be written.
    schema: str | None
        Name of the schema of the table.
    table_name : str
        Name of the table to be written to.
    if_exists : Literal["fail", "replace", "append", "update", "update_only", "skip_row_check"], optional
        What to do if the table already exists.

        - fail: Will raise a ValueError.
        - replace: Will drop the table and write the DataFrame to a new table.
        - append: Will append the DataFrame to the existing table, skipping rows that already exist.
        - update: Will update the existing table with the DataFrame. This is the same as append but will update rows that already exist.
        - update_only: Will update only the rows that already exist in the table. If a row does not exist, it will be skipped. Different from update, this does not use INSERT ON CONFLICT but uses only UPDATE statements, which allows for updates in tables with IDENTITY primary keys that do not allow inserts. When used must provide conflict_cols. Please note that columns in conflict_cols will not be updated.
        - skip_row_check: Will skip the check if the rows already exist and write the DataFrame to the table. It assumes no insert conflicts.

        By default "fail".
    ignore_index : bool, optional
        Whether to ignore the index of the DataFrame, by default False.
    index_name : str | None, optional
        Name to set the index to. If None, the index will not be renamed. By default None.
    pkey_cols :  list[str] | None, optional
        Name of the columns to set as primary key. If None, no primary key will be set. If is the same name as the index, index will be used.

        This is only applicable when replace is used or if the table does not exist.

        By default None.
    conflict_cols : list[str] | None, optional
        List of columns to be used to check for conflict instead of the primary key. If not set, the primary key will be used.
        By default None.
    return_cols: list[str] | None, optional
        List of columns to return after the operation. If None, no columns will be returned.
        By default None.
    ignore_null_cols: bool, optional
        If set to True, will remove columns that contain only null values before writing to SQL.
        By default True.
    **kwargs
        Used for backwards compatibility. Should be ignored.

    Returns
    -------
    pd.DataFrame | None
        DataFrame with the specified return columns if return_cols is set. Otherwise, None

    """
    t1 = perf_counter()
    self._suppress_auto_close = True

    try:
        # pre-processing
        df = self._pre_df_to_sql(
            df=df,
            schema=schema,
            table_name=table_name,
            if_exists=if_exists,
            ignore_index=ignore_index,
            index_name=index_name,
            pkey_cols=pkey_cols,
            conflict_cols=conflict_cols,
            remove_null_columns=ignore_null_cols,
            **kwargs,
        )

        # checking if DataFrame is empty
        if df.empty:
            logger.debug("DataFrame is empty. Nothing to do.")
            return None

        result = self._pandas_to_sql(
            df=df,
            schema=schema,
            table_name=table_name,
            if_exists=if_exists,
            conflict_cols=conflict_cols,
            return_cols=return_cols,
            **kwargs,
        )

        logger.debug(
            f"DataFrame of shape {df.shape} written to table {table_name} in schema {schema} in {perf_counter() - t1:.2f} seconds.",
        )

        return result
    finally:
        self._suppress_auto_close = False
        self._auto_close_if_needed()

polars_to_sql(df, schema, table_name, if_exists='fail', pkey_cols=None, conflict_cols=None, return_cols=None, ignore_null_cols=True, **kwargs)

Method to write a Polars DataFrame to a SQL table.

Parameters:

  • df

    (DataFrame) –

    Polars DataFrame to be written.

  • schema

    (str | None) –

    Name of the schema of the table.

  • table_name

    (str) –

    Name of the table to be written to.

  • if_exists

    (Literal['fail', 'replace', 'append', 'update', 'update_only', 'skip_row_check'], default: 'fail' ) –

    What to do if the table already exists.

    • fail: Will raise a ValueError.
    • replace: Will drop the table and write the DataFrame to a new table.
    • append: Will append the DataFrame to the existing table, skipping rows that already exist.
    • update: Will update the existing table with the DataFrame. This is the same as append but will update rows that already exist.
    • update_only: Will only update rows that already exist in the table. If a row does not exist, it will be skipped. Different from update, this does not use INSERT ON CONFLICT but uses only UPDATE statements, which allows for updates in tables with IDENTITY primary keys that do not allow inserts. When used must provide conflict_cols. Please note that columns in conflict_cols will not be updated.
    • skip_row_check: Will skip the check if the rows already exist and write the DataFrame to the table. It assumes no insert conflicts.

    By default "fail".

  • pkey_cols

    ( list[str] | None, default: None ) –

    Name of the columns to set as primary key. If None, no primary key will be set.

    This is only applicable when replace is used or if the table does not exist.

    By default None.

  • conflict_cols

    (list[str] | None, default: None ) –

    List of columns to be used to check for conflict instead of the primary key. If not set, the primary key will be used. By default None.

  • return_cols

    (list[str] | None, default: None ) –

    List of columns to return after the operation. If None, no columns will be returned. By default None.

  • ignore_null_cols

    (bool, default: True ) –

    If set to True, columns that contain only null values will be ignored when writing to the database. By default True.

  • **kwargs

    Used for backwards compatibility. Should be ignored.

Returns:

  • DataFrame | None

    Polars DataFrame with the specified return columns if return_cols is set. Otherwise, None

Source code in echo_connhandler/sql_core.py
@validate_call
def polars_to_sql(
    self,
    df: pl.DataFrame,
    schema: str | None,
    table_name: str,
    if_exists: Literal["fail", "replace", "append", "update", "update_only", "skip_row_check"] = "fail",
    pkey_cols: list[str] | None = None,
    conflict_cols: list[str] | None = None,
    return_cols: list[str] | None = None,
    ignore_null_cols: bool = True,
    **kwargs,
) -> pl.DataFrame | None:
    """Method to write a Polars DataFrame to a SQL table.

    Parameters
    ----------
    df : pl.DataFrame
        Polars DataFrame to be written.
    schema: str | None
        Name of the schema of the table.
    table_name : str
        Name of the table to be written to.
    if_exists : Literal["fail", "replace", "append", "update", "update_only", "skip_row_check"], optional
        What to do if the table already exists.

        - fail: Will raise a ValueError.
        - replace: Will drop the table and write the DataFrame to a new table.
        - append: Will append the DataFrame to the existing table, skipping rows that already exist.
        - update: Will update the existing table with the DataFrame. This is the same as append but will update rows that already exist.
        - update_only: Will only update rows that already exist in the table. If a row does not exist, it will be skipped. Different from update, this does not use INSERT ON CONFLICT but uses only UPDATE statements, which allows for updates in tables with IDENTITY primary keys that do not allow inserts. When used must provide conflict_cols. Please note that columns in conflict_cols will not be updated.
        - skip_row_check: Will skip the check if the rows already exist and write the DataFrame to the table. It assumes no insert conflicts.

        By default "fail".
    pkey_cols :  list[str] | None, optional
        Name of the columns to set as primary key. If None, no primary key will be set.

        This is only applicable when replace is used or if the table does not exist.

        By default None.
    conflict_cols : list[str] | None, optional
        List of columns to be used to check for conflict instead of the primary key. If not set, the primary key will be used.
        By default None.
    return_cols: list[str] | None, optional
        List of columns to return after the operation. If None, no columns will be returned.
        By default None.
    ignore_null_cols: bool, optional
        If set to True, columns that contain only null values will be ignored when writing to the database.
        By default True.
    **kwargs
        Used for backwards compatibility. Should be ignored.

    Returns
    -------
    pl.DataFrame | None
        Polars DataFrame with the specified return columns if return_cols is set. Otherwise, None

    """
    t1 = perf_counter()
    self._suppress_auto_close = True

    try:
        # pre-processing
        df = self._pre_df_to_sql(
            df=df,
            schema=schema,
            table_name=table_name,
            if_exists=if_exists,
            pkey_cols=pkey_cols,
            conflict_cols=conflict_cols,
            index_name=None,
            ignore_index=True,
            remove_null_columns=ignore_null_cols,
            **kwargs,
        )

        # checking if DataFrame is empty
        if len(df) == 0:
            logger.debug("DataFrame is empty. Nothing to do.")
            return None

        result = self._polars_to_sql(
            df=df,
            schema=schema,
            table_name=table_name,
            if_exists=if_exists,
            conflict_cols=conflict_cols,
            return_cols=return_cols,
            **kwargs,
        )

        logger.debug(
            f"DataFrame of shape {df.shape} written to table {table_name} in schema {schema} in {perf_counter() - t1:.2f} seconds.",
        )

        return result
    finally:
        self._suppress_auto_close = False
        self._auto_close_if_needed()

read_to_pandas(query, *args, use_adbc=False, **kwargs)

Read a SQL query into a Pandas DataFrame.

Uses either ADBC (Apache Arrow Database Connectivity) or the handler's native connection to execute the query and return the result as a DataFrame.

ADBC provides faster reads for large datasets by using Apache Arrow's columnar format natively, avoiding row-by-row conversion. It opens a separate short-lived connection per query (managed by the ADBC driver), so it cannot see session-scoped objects like temporary tables or session variables (e.g. SHOW statements in PostgreSQL).

Parameters:

  • query

    (SQL | Composed | str) –

    Query to be executed. Can be a SQL statement from psycopg.sql.SQL or psycopg.sql.Composed (preferable) or a string.

  • use_adbc

    (bool, default: False ) –

    If True (default), uses an ADBC connection for reading, enabling fast Arrow-native data transfer (good for large datasets). If False, uses the handler's standard database connection (e.g. psycopg for PostgreSQL), which is good for small datasets or when session state (temp tables) is needed.

    Set to False when you need access to session state (temp tables) or when the handler does not support ADBC.

    By default False.

Other Parameters:

  • dtype_backend (str | None) –

    Backend to use for type conversion. Can be 'pyarrow', 'numpy_nullable' or None (which simulates as if this argument was not passed). By default 'pyarrow'.

  • dtype (dict[str, str]) –

    Dict in the format {column_name: dtype, ...} to be used to convert the columns to the desired data type.

  • index_col (str | list[str]) –

    Column(s) to set as index of the DataFrame.

  • parse_dates (list[str] | dict[str, str]) –

    Column(s) to parse as date(s). If a dict is passed, the keys should be column names and the values should be the date format.

  • post_convert (str | None) –

    Can be set to 'pyarrow', 'numpy_nullable' or None. This will call df.convert_dtypes after reading the DataFrame.

    Useful when there is a JSON/JSONB column that is not being read correctly with dtype_backend='pyarrow'. In this case set dtype_backend=None and post_convert='pyarrow'.

  • remove_time_zone (bool) –

    If True (default), removes the time zone from time-zone-aware datetime columns, keeping the values in the time zone defined in the connection properties.

    By default, this is set to True to avoid issues with time zones when reading datetime columns. Set to False if you want to keep the time zone information in the DataFrame.

  • skip_retry (bool) –

    If True, ignores the retry count and raises errors immediately.

Returns:

  • DataFrame

    DataFrame containing the results of the query.

Source code in echo_connhandler/sql_core.py
@validate_call
def read_to_pandas(
    self,
    query: SQL | Composed | str,
    *args,  # noqa: ANN002, ARG002
    use_adbc: bool = False,
    **kwargs: Unpack[ReadToPandasKwargs],
) -> pd.DataFrame:
    """Read a SQL query into a Pandas DataFrame.

    Uses either ADBC (Apache Arrow Database Connectivity) or the handler's native connection
    to execute the query and return the result as a DataFrame.

    ADBC provides faster reads for large datasets by using Apache Arrow's columnar format
    natively, avoiding row-by-row conversion. It opens a separate short-lived connection per
    query (managed by the ADBC driver), so it cannot see session-scoped objects like temporary
    tables or session variables (e.g. ``SHOW`` statements in PostgreSQL).

    Parameters
    ----------
    query : SQL | Composed | str
        Query to be executed. Can be a SQL statement from psycopg.sql.SQL or psycopg.sql.Composed (preferable) or a string.
    use_adbc : bool, optional
        If ``True`` (default), uses an ADBC connection for reading, enabling fast Arrow-native data transfer (good for large datasets).
        If ``False``, uses the handler's standard database connection (e.g. psycopg for PostgreSQL), which is good for small datasets or when session state (temp tables) is needed.

        Set to ``False`` when you need access to session state (temp tables) or when the handler does not support ADBC.

        By default False.


    Other Parameters
    ----------------
    dtype_backend : str | None
        Backend to use for type conversion. Can be ``'pyarrow'``, ``'numpy_nullable'`` or ``None`` (which simulates as if this argument was not passed).
        By default ``'pyarrow'``.
    dtype : dict[str, str]
        Dict in the format ``{column_name: dtype, ...}`` to be used to convert the columns to the desired data type.
    index_col : str | list[str]
        Column(s) to set as index of the DataFrame.
    parse_dates : list[str] | dict[str, str]
        Column(s) to parse as date(s). If a dict is passed, the keys should be column names and the values should be the date format.
    post_convert : str | None
        Can be set to ``'pyarrow'``, ``'numpy_nullable'`` or ``None``. This will call ``df.convert_dtypes`` after reading the DataFrame.

        Useful when there is a JSON/JSONB column that is not being read correctly with ``dtype_backend='pyarrow'``.
        In this case set ``dtype_backend=None`` and ``post_convert='pyarrow'``.
    remove_time_zone : bool
        If ``True`` (default), removes the time zone from time-zone-aware datetime columns,
        keeping the values in the time zone defined in the connection properties.

        By default, this is set to ``True`` to avoid issues with time zones when reading datetime columns. Set to ``False`` if you want to keep the time zone information in the DataFrame.
    skip_retry : bool
        If ``True``, ignores the retry count and raises errors immediately.

    Returns
    -------
    pd.DataFrame
        DataFrame containing the results of the query.

    """
    # defining dtype_backend to pyarrow as default
    if "dtype_backend" not in kwargs:
        if "post_convert" not in kwargs or kwargs["post_convert"] is None:
            kwargs["dtype_backend"] = "pyarrow"
        else:
            kwargs["dtype_backend"] = None
    # deleting dtype_backend if was set to None
    if kwargs["dtype_backend"] is None:
        del kwargs["dtype_backend"]

    remove_time_zone = True if "remove_time_zone" not in kwargs else kwargs.pop("remove_time_zone")

    # getting post_convert
    post_convert = kwargs.pop("post_convert", None)
    if post_convert not in [None, "pyarrow", "numpy_nullable"]:
        raise ValueError(f"post_convert must be one of [None, 'pyarrow', 'numpy_nullable'], not {post_convert}")
    if "dtype_backend" in kwargs and post_convert is not None:
        raise ValueError("post_convert can only be used if dtype_backend is not set")

    skip_retry = kwargs.pop("skip_retry") if "skip_retry" in kwargs else False

    def _resolve_query_str() -> str:
        if isinstance(query, SQL | Composed):
            if isinstance(self._connection, pg.Connection):
                return query.as_string(self._connection)
            return query.as_string(None)
        if isinstance(query, str):
            return query
        raise TypeError(f"query must be of type psycopg.sql.SQL, psycopg.sql.Composed or string, not {type(query)}")

    def _read_pandas(conn: object) -> pd.DataFrame:
        return pd.read_sql(sql=_resolve_query_str(), con=conn, **kwargs)

    def _read_pandas_adbc() -> pd.DataFrame:
        if self._cache_adbc_connection:
            if self._adbc_connection is None:
                self._adbc_connection = self._create_adbc_connection()
            if self._adbc_connection is None:
                raise RuntimeError("ADBC connection not available for this handler")
            return pd.read_sql(sql=_resolve_query_str(), con=self._adbc_connection, **kwargs)
        conn = self._create_adbc_connection()
        if conn is None:
            raise RuntimeError("ADBC connection not available for this handler")
        try:
            return pd.read_sql(sql=_resolve_query_str(), con=conn, **kwargs)
        finally:
            with contextlib.suppress(Exception):
                conn.close()

    # Configure tenacity retry logic with exponential backoff
    if skip_retry:
        df: pd.DataFrame = _read_pandas_adbc() if use_adbc else _read_pandas(self.connection)
    else:
        retryer = Retrying(
            stop=stop_after_attempt(self.max_retries + 1),
            wait=wait_exponential(
                multiplier=self.exponential_multiplier,
                min=self.exponential_min_retry_wait_time,
                max=self.retry_wait_time,
            ),
            retry=retry_if_exception_type(Exception),
            reraise=True,
            before_sleep=lambda retry_state: logger.exception(
                f"Attempt {retry_state.attempt_number}: Could not execute query. Will wait {retry_state.next_action.sleep}s until next call. Arguments used {self.connection_properties}\n{query.as_string(self.connection) if isinstance(query, SQL | Composed) and isinstance(self._connection, pg.Connection) else query}",
            ),
        )

        for attempt in retryer:
            with attempt:
                # Reconnect after sleep (for retry attempts)
                if attempt.retry_state.attempt_number > 1:
                    if use_adbc:
                        if self._adbc_connection is not None:
                            try:
                                self._adbc_connection.close()
                            except Exception:  # noqa
                                pass
                            finally:
                                self._adbc_connection = None
                    else:
                        self.reconnect()

                if use_adbc:
                    df: pd.DataFrame = _read_pandas_adbc()
                else:
                    df: pd.DataFrame = _read_pandas(self.connection)

    # ADBC binary COPY always returns TIMESTAMPTZ as UTC regardless of SET timezone;
    # convert to the configured session timezone so behavior matches psycopg
    if use_adbc and self._convert_time_zone:
        target_tz = getattr(self.connection_properties, "time_zone", "UTC")
        for col in df.columns:
            if "duration" in str(df[col].dtype).lower() or "timedelta" in str(df[col].dtype).lower():
                continue
            if hasattr(df[col], "dt") and df[col].dt.tz is not None and str(df[col].dt.tz) == "UTC" and target_tz != "UTC":
                df[col] = df[col].dt.tz_convert(target_tz)

    # checking if we should remove the time zone
    if remove_time_zone:
        # iterating columns
        for col in df.columns:
            # checking if column has dt attribute
            if "duration" in str(df[col].dtype).lower() or "timedelta" in str(df[col].dtype).lower():
                continue
            # sourcery skip: merge-nested-ifs
            if hasattr(df[col], "dt"):  # noqa: SIM102
                # skipping if is duration
                # checking if column is time zone aware
                if df[col].dt.tz is not None:
                    # removing time zone
                    # here we have to convert to datetime64 because pyarrow has a bug that makes the final result stay at UTC
                    df[col] = (
                        df[col].astype(f"datetime64[{df[col].dt.unit}, {self.connection_properties.time_zone}]").dt.tz_localize(None)
                    )

    # converting the DataFrame to the desired data types
    if post_convert is not None:
        df = df.convert_dtypes(dtype_backend=post_convert)

    # doing any post-processing of the DataFrame
    df = self._process_pandas_read(df, **kwargs)

    self._auto_close_if_needed()
    return df

read_to_polars(query, *args, use_adbc=False, **kwargs)

Read a SQL query into a Polars DataFrame.

Uses either ADBC (Apache Arrow Database Connectivity) or the handler's native connection to execute the query and return the result as a DataFrame.

ADBC provides faster reads for large datasets by using Apache Arrow's columnar format natively, which is especially beneficial with Polars since it is Arrow-native. It opens a separate short-lived connection per query, so it cannot see session-scoped objects like temporary tables.

Parameters:

  • query

    (SQL | Composed | str) –

    Query to be executed. Can be a SQL statement from psycopg.sql.SQL or psycopg.sql.Composed (preferable) or a string.

  • use_adbc

    (bool, default: False ) –

    If True (default), uses an ADBC connection for reading, enabling fast Arrow-native data transfer (good for large datasets). If False, uses the handler's standard database connection (e.g. psycopg for PostgreSQL), which is good for small datasets or when session state (temp tables) is needed.

    Set to False when you need access to session state (temp tables) or when the handler does not support ADBC.

    By default False.

Other Parameters:

  • schema_overrides (dict[str, DataTypeClass] | None) –

    A dictionary mapping column names to dtypes, used to override the schema inferred from the query. An example would be {"col1": pl.Int64, "col2": pl.Float64}.

  • infer_schema_length (int | None) –

    The maximum number of rows to scan for schema inference. If set to None, the full data may be scanned. Defaults to None to avoid issues.

  • remove_time_zone (bool) –

    If True (default), removes the time zone from time-zone-aware datetime columns, keeping the values in the time zone defined in the connection properties.

  • skip_retry (bool) –

    If True, ignores the retry count and raises errors immediately.

Returns:

  • DataFrame

    DataFrame containing the results of the query.

Source code in echo_connhandler/sql_core.py
@validate_call
def read_to_polars(
    self,
    query: SQL | Composed | str,
    *args,  # noqa: ANN002, ARG002
    use_adbc: bool = False,
    **kwargs: Unpack[ReadToPolarsKwargs],
) -> pl.DataFrame:
    """Read a SQL query into a Polars DataFrame.

    Uses either ADBC (Apache Arrow Database Connectivity) or the handler's native connection
    to execute the query and return the result as a DataFrame.

    ADBC provides faster reads for large datasets by using Apache Arrow's columnar format
    natively, which is especially beneficial with Polars since it is Arrow-native. It opens a
    separate short-lived connection per query, so it cannot see session-scoped objects like
    temporary tables.

    Parameters
    ----------
    query : SQL | Composed | str
        Query to be executed. Can be a SQL statement from psycopg.sql.SQL or psycopg.sql.Composed (preferable) or a string.
    use_adbc : bool, optional
        If ``True`` (default), uses an ADBC connection for reading, enabling fast Arrow-native data transfer (good for large datasets).
        If ``False``, uses the handler's standard database connection (e.g. psycopg for PostgreSQL), which is good for small datasets or when session state (temp tables) is needed.

        Set to ``False`` when you need access to session state (temp tables) or when the handler does not support ADBC.

        By default False.

    Other Parameters
    ----------------
    schema_overrides : dict[str, pl.DataTypeClass] | None
        A dictionary mapping column names to dtypes, used to override the schema inferred from the query.
        An example would be ``{"col1": pl.Int64, "col2": pl.Float64}``.
    infer_schema_length : int | None, optional
        The maximum number of rows to scan for schema inference. If set to ``None``, the full data may be scanned.
        Defaults to ``None`` to avoid issues.
    remove_time_zone : bool
        If ``True`` (default), removes the time zone from time-zone-aware datetime columns,
        keeping the values in the time zone defined in the connection properties.
    skip_retry : bool
        If ``True``, ignores the retry count and raises errors immediately.

    Returns
    -------
    pl.DataFrame
        DataFrame containing the results of the query.

    """
    skip_retry = kwargs.pop("skip_retry") if "skip_retry" in kwargs else False
    remove_time_zone = kwargs.pop("remove_time_zone", True)
    # set infer_schema_length to None if not set
    if "infer_schema_length" not in kwargs:
        kwargs["infer_schema_length"] = None

    def _resolve_query_str() -> str:
        if isinstance(query, SQL | Composed):
            if isinstance(self._connection, pg.Connection):
                return query.as_string(self._connection)
            return query.as_string(None)
        if isinstance(query, str):
            return query
        raise TypeError(f"query must be of type psycopg.sql.SQL, psycopg.sql.Composed or string, not {type(query)}")

    def _read_polars(conn: object) -> pl.DataFrame:
        return pl.read_database(query=_resolve_query_str(), connection=conn, **kwargs)

    def _read_polars_adbc() -> pl.DataFrame:
        if self._cache_adbc_connection:
            if self._adbc_connection is None:
                self._adbc_connection = self._create_adbc_connection()
            if self._adbc_connection is None:
                raise RuntimeError("ADBC connection not available for this handler")
            return pl.read_database(query=_resolve_query_str(), connection=self._adbc_connection, **kwargs)
        conn = self._create_adbc_connection()
        if conn is None:
            raise RuntimeError("ADBC connection not available for this handler")
        try:
            return pl.read_database(query=_resolve_query_str(), connection=conn, **kwargs)
        finally:
            with contextlib.suppress(Exception):
                conn.close()

    # removing datetime columns from schema overrides to not mess up with time zone conversion later
    datetime_schema_cols = None
    if "schema_overrides" in kwargs and kwargs["schema_overrides"] is not None:
        datetime_schema_cols = {
            col: dtype
            for col, dtype in kwargs["schema_overrides"].items()
            # Catches the raw class OR an instantiated timezone-aware object
            if dtype == pl.Datetime or isinstance(dtype, pl.Datetime)
        }
        if datetime_schema_cols:
            for col in datetime_schema_cols:
                kwargs["schema_overrides"].pop(col)

    # Configure tenacity retry logic with exponential backoff
    if skip_retry:
        df: pl.DataFrame = _read_polars_adbc() if use_adbc else _read_polars(self.connection)
    else:
        retryer = Retrying(
            stop=stop_after_attempt(self.max_retries + 1),
            wait=wait_exponential(
                multiplier=self.exponential_multiplier,
                min=self.exponential_min_retry_wait_time,
                max=self.retry_wait_time,
            ),
            retry=retry_if_exception_type(Exception),
            reraise=True,
            before_sleep=lambda retry_state: logger.exception(
                f"Attempt {retry_state.attempt_number}: Could not execute query. Will wait {retry_state.next_action.sleep}s until next call. Arguments used {self.connection_properties}\n{query.as_string(self.connection) if isinstance(query, SQL | Composed) and isinstance(self._connection, pg.Connection) else query}",
            ),
        )

        for attempt in retryer:
            with attempt:
                # Reconnect after sleep (for retry attempts)
                if attempt.retry_state.attempt_number > 1:
                    if use_adbc:
                        if self._adbc_connection is not None:
                            try:
                                self._adbc_connection.close()
                            except Exception:  # noqa
                                pass
                            finally:
                                self._adbc_connection = None
                    else:
                        self.reconnect()

                if use_adbc:
                    df: pl.DataFrame = _read_polars_adbc()
                else:
                    df: pl.DataFrame = _read_polars(self.connection)

    # ADBC binary COPY always returns TIMESTAMPTZ as UTC regardless of SET timezone;
    # convert to the configured session timezone so behavior matches psycopg
    if use_adbc and self._convert_time_zone:
        target_tz = getattr(self.connection_properties, "time_zone", "UTC")
        if target_tz != "UTC":
            tz_casts = [
                pl.col(col_name).dt.convert_time_zone(target_tz)
                for col_name, dtype in df.schema.items()
                if isinstance(dtype, pl.Datetime) and dtype.time_zone == "UTC"
            ]
            if tz_casts:
                df = df.with_columns(tz_casts)

    # removing time zone if requested
    if remove_time_zone:
        tz_strips = [
            pl.col(col_name).dt.replace_time_zone(None)
            for col_name, dtype in df.schema.items()
            if isinstance(dtype, pl.Datetime) and dtype.time_zone is not None
        ]
        if tz_strips:
            df = df.with_columns(tz_strips)

    # forcing schema for datetime columns if they were in the schema overrides, since we had to remove the time zone information from them for the previous steps
    if datetime_schema_cols:
        df = df.with_columns(
            *[pl.col(col_name).cast(dtype) for col_name, dtype in datetime_schema_cols.items() if col_name in df.columns],
        )

    self._auto_close_if_needed()
    return df

table_exists(schema, table_name) abstractmethod

Method to check if a table, view, or materialized view exists in the database.

Parameters:

  • schema

    (str | None) –

    Name of the schema where the table should be located.

  • table_name

    (str) –

    Name of the table to check.

Returns:

  • bool

    True if the table exists, False otherwise.

Source code in echo_connhandler/sql_core.py
@abstractmethod
def table_exists(self, schema: str | None, table_name: str) -> bool:
    """Method to check if a table, view, or materialized view exists in the database.

    Parameters
    ----------
    schema : str | None
        Name of the schema where the table should be located.
    table_name : str
        Name of the table to check.

    Returns
    -------
    bool
        True if the table exists, False otherwise.
    """
    raise NotImplementedError("table_exists must be implemented in subclass")