SQL Core¶
SqlConnProperties(host, user, password, database, port, schema=None, driver=None, conn_timeout=30, query_timeout=300, autocommit=True, application_name=None, time_zone='America/Sao_Paulo', auto_close_connection=True)
dataclass
¶
Class that holds the properties used to establish a connection to a SQL database.
Parameters:
-
(host¶str) –The hostname of the database server.
-
(user¶str) –User name
-
(password¶str) –Password for the user
-
(database¶str) –Name of database to connect to.
-
(port¶int) –Port used to connect to the database. Defaults are 5432 for PostgreSQL and 1433 for MS SQL Server.
-
(schema¶str | None, default:None) –Name of the schema to connect to. If set to None, will connect to the default schema. By default None.
-
(driver¶str | None, default:None) –Driver used to connect to the database, by default None.
Currently this is only applicable to MS SQL Server (something like "ODBC Driver 17 for SQL Server" should be used). If you want to skip pyodbc, set this to "pymssql".
-
(conn_timeout¶int, default:30) –Timeout in seconds for connection, by default 30
-
(query_timeout¶int, default:300) –Query timeout in seconds, by default 300
-
(autocommit¶bool, default:True) –If set to True, connection will be committed automatically after every transaction. By default True.
-
(application_name¶str | None, default:None) –Name of application. If set to None will be set to f"EchoSqlConnection:{socket.gethostname()}" where socket.gethostname() is the name of the computer running this code. By default None.
-
(time_zone¶str, default:'America/Sao_Paulo') –Time zone to be used in the connection. By default "America/Sao_Paulo".
-
(auto_close_connection¶bool, default:True) –If True, the connection will be automatically closed after every query. By default True.
__post_init__()
¶
Method that checks if inputs are valid after initialization.
Source code in echo_connhandler/sql_core.py
def __post_init__(self) -> None:
"""Method that checks if inputs are valid after initialization."""
if not isinstance(self.host, str):
raise TypeError(f"host must be of type str, not {type(self.host)}")
if not isinstance(self.user, str):
raise TypeError(f"user must be of type str, not {type(self.user)}")
try:
self.password = str(self.password)
except Exception:
raise TypeError(f"password must be convertible to str, not {type(self.password)}") # noqa: B904
if not isinstance(self.database, str):
raise TypeError(f"database must be of type str, not {type(self.database)}")
if self.schema is not None and not isinstance(self.schema, str):
raise TypeError(f"schema must be of type str or None, not {type(self.schema)}")
if not isinstance(self.port, int):
raise TypeError(f"port must be of type int, not {type(self.port)}")
if self.driver is not None and not isinstance(self.driver, str):
raise TypeError(f"driver must be of type str or None, not {type(self.driver)}")
if not isinstance(self.conn_timeout, int):
raise TypeError(f"conn_timeout must be of type int, not {type(self.conn_timeout)}")
if not isinstance(self.query_timeout, int):
raise TypeError(f"query_timeout must be of type int, not {type(self.query_timeout)}")
if not isinstance(self.autocommit, bool):
raise TypeError(f"autocommit must be of type bool, not {type(self.autocommit)}")
if not isinstance(self.application_name, str | type(None)):
raise TypeError(f"application_name must be of type str or None, not {type(self.application_name)}")
if not isinstance(self.time_zone, str):
raise TypeError(f"time_zone must be of type str, not {type(self.time_zone)}")
# strip surrounding double quotes from string values (they may come from database-stored configs)
for field_name in ("host", "user", "password", "database", "schema", "driver", "application_name"):
value = getattr(self, field_name)
if value is not None and len(value) >= 2 and value[0] == '"' and value[-1] == '"':
object.__setattr__(self, field_name, value[1:-1])
# adjusting application_name
if self.application_name is None:
self.application_name = f"EchoSqlConnection:{socket.gethostname()}"
SqlHandler(connection_properties, max_retries=8, max_conn_retries=3, retry_wait_time=30, exponential_min_retry_wait_time=0.1, exponential_multiplier=1, skip_connect=True, **kwargs)
¶
Base abstract class for SQL handlers. This class should not be instantiated directly.
This class should be used to handle connecting and doing queries to SQL servers. The _connect() method should be defined in subclasses as it might differ for each type of SQL (PostgreSQL, MySQL, MS SQL Server, etc.).
This already connects to the SQL server.
This handler implements a robust retry strategy using exponential backoff to gracefully manage connection or query failures. By default, operations are attempted up to 8 times; between failures, the wait time doubles exponentially—starting at 0.1 second (0.1s, 0.2s, 0.4s, 0.8s, 1.6s, 3.2s, 6.4s, 12.8s )—until it reaches a configured ceiling of 30 seconds. This approach allows for rapid recovery from momentary glitches while preventing server overload during persistent outages.
Using the default parameters the connection will fail after around 90 seconds if the server is not reachable.
Parameters:
-
(connection_properties¶SqlConnProperties) –Object containing connection parameters.
-
(max_retries¶int, default:8) –Number of retries that will be attempted when doing queries. Will be used in
stopparameter of tenacity.stop_after_attempt, by default 8 -
(max_conn_retries¶int, default:3) –Number of retries that will be attempted when reconnecting. Will be used in
stopparameter of tenacity.stop_after_attempt, by default 3 -
(exponential_min_retry_wait_time¶float, default:0.1) –Min time to wait between retries when reconnecting or doing queries. Will be used in
minparameter of tenacity.wait_exponential, by default 0.1 -
(exponential_multiplier¶float, default:1) –Multiplier to use when calculating wait time between retries when reconnecting or doing queries. Will be used in
multiplierparameter of tenacity.wait_exponential, by default 1 -
(skip_connect¶bool, default:True) –If True, the connection will not be established when the object is created.
If this is set toTrue, the user will need to manually call the reconnect() method when they want to connect to the server.
By default True
-
(**kwargs¶dict, default:{}) –Just kept here for compatibility.
Source code in echo_connhandler/sql_core.py
def __init__(
self,
connection_properties: SqlConnProperties,
max_retries: int = 8,
max_conn_retries: int = 3,
retry_wait_time: float = 30,
exponential_min_retry_wait_time: float = 0.1,
exponential_multiplier: float = 1,
skip_connect: bool = True,
**kwargs, # pylint: disable=unused-argument # noqa
) -> None:
"""Method that initializes the SQL handler.
This already connects to the SQL server.
This handler implements a robust retry strategy using exponential backoff to gracefully manage connection or query failures. By default, operations are attempted up to 8 times; between failures, the wait time doubles exponentially—starting at 0.1 second (0.1s, 0.2s, 0.4s, 0.8s, 1.6s, 3.2s, 6.4s, 12.8s )—until it reaches a configured ceiling of 30 seconds. This approach allows for rapid recovery from momentary glitches while preventing server overload during persistent outages.
Using the default parameters the connection will fail after around 90 seconds if the server is not reachable.
Parameters
----------
connection_properties : SqlConnProperties
Object containing connection parameters.
max_retries : int, optional
Number of retries that will be attempted when doing queries. Will be used in `stop` parameter of tenacity.stop_after_attempt, by default 8
max_conn_retries : int, optional
Number of retries that will be attempted when reconnecting. Will be used in `stop` parameter of tenacity.stop_after_attempt, by default 3
exponential_min_retry_wait_time: float, optional
Min time to wait between retries when reconnecting or doing queries. Will be used in `min` parameter of tenacity.wait_exponential, by default 0.1
exponential_multiplier: float, optional
Multiplier to use when calculating wait time between retries when reconnecting or doing queries. Will be used in `multiplier` parameter of tenacity.wait_exponential, by default 1
skip_connect : bool, optional
If True, the connection will not be established when the object is created.
If this is set toTrue, the user will need to manually call the reconnect() method when they want to connect to the server.
By default True
**kwargs : dict, optional
Just kept here for compatibility.
"""
super().__init__(
connection_properties=connection_properties,
max_retries=max_retries,
max_conn_retries=max_conn_retries,
retry_wait_time=retry_wait_time,
exponential_min_retry_wait_time=exponential_min_retry_wait_time,
exponential_multiplier=exponential_multiplier,
skip_connect=skip_connect,
)
self._last_cursor: Any = None
self._adbc_connection: Any = None
self._suppress_auto_close: bool = False
rowcount
property
¶
Property to return the number of rows affected by the last query.
check_df_columns(df, schema, table_name, must_have_all=False, allow_extra_columns=False)
¶
Method used to check if the columns in a DataFrame match the columns in a table.
This will check both the names of the columns match and if their data types are compatible. Gracefully handles custom user-defined types (like ENUMs) by skipping the strict type check.
Source code in echo_connhandler/sql_core.py
@validate_call
def check_df_columns(
self,
df: pd.DataFrame | pl.DataFrame,
schema: str | None,
table_name: str,
must_have_all: bool = False,
allow_extra_columns: bool = False,
) -> None:
"""Method used to check if the columns in a DataFrame match the columns in a table.
This will check both the names of the columns match and if their data types are compatible.
Gracefully handles custom user-defined types (like ENUMs) by skipping the strict type check.
"""
# checking if table exists
if not self.table_exists(schema=schema, table_name=table_name):
raise ValueError(f"Table {table_name} does not exist in schema {schema}")
# getting type of DataFrame
df_dtype = "pandas" if isinstance(df, pd.DataFrame) else "polars"
# getting table columns
table_columns: dict[str, str] = self.get_table_columns(schema=schema, table_names=[table_name])[table_name]
# checking if all columns in the DataFrame are in the table
if not allow_extra_columns: # noqa
if cols_not_in_table := set(df.columns) - set(table_columns):
raise ValueError(f"DataFrame has columns not in table: {cols_not_in_table}")
# checking if all columns in the table are in the DataFrame
if must_have_all: # noqa
if cols_not_in_df := set(table_columns) - set(df.columns):
raise ValueError(f"must_have_all = True and table has columns not in DataFrame: {cols_not_in_df}")
# checking if the data types match
if isinstance(df, pd.DataFrame):
sql_dtypes = self._get_pandas_sql_dtypes(df)
elif isinstance(df, pl.DataFrame):
sql_dtypes = self._get_polars_sql_dtypes(df)
else:
raise TypeError(f"df must be of type pd.DataFrame or pl.DataFrame, not {type(df)}")
# Collect all standard known SQL types to identify custom/ENUM types
all_known_sql_dtypes = {x.lower() for val in self._data_type_mapping[df_dtype].values() for x in val["sql"]}
for col, dtype in sql_dtypes.items():
allowed_sql_dtypes = self._data_type_mapping[df_dtype][dtype]["sql"]
db_col_dtype = table_columns[col].lower()
if db_col_dtype not in (x.lower() for x in allowed_sql_dtypes):
# If the database type is completely custom (e.g. 'type_inv_withdrawal_status')
# we skip the strict mapping validation and trust the Postgres internal cast mechanism.
if db_col_dtype not in all_known_sql_dtypes:
logger.debug(f"Column '{col}' has custom database type '{table_columns[col]}'. Skipping strict dtype validation.")
continue
raise ValueError(
f"Column {col} in DataFrame has dtype {df[col].dtype} which does not match the table's dtype {table_columns[col]}. Allowed SQL dtypes for this column dtype are {allowed_sql_dtypes}.",
)
commit()
¶
Method to commit the current transaction.
Source code in echo_connhandler/sql_core.py
def commit(self) -> None:
"""Method to commit the current transaction."""
try:
self.connection.commit()
except Exception as e:
raise RuntimeError(f"Could not commit transaction to {self}") from e
create_table(schema, table_name, columns, pkey_cols=None, if_exists='fail', temporary=False)
abstractmethod
¶
Method to create a table in the database.
Parameters:
-
(schema¶str | None) –Name of the schema to create the table in. If temporary is set to True, this must be set to None.
-
(table_name¶str) –Name of the table to be created.
-
(columns¶dict[str, str]) –Dict in the format {column_name: column_type, ...}.
-
(pkey_cols¶list[str] | None, default:None) –List of columns to be set as primary key. If None, no primary key will be set. By default None.
-
(if_exists¶Literal['fail', 'replace'], default:'fail') –What to do if the table already exists.
- fail: Will raise a ValueError.
- replace: Will drop the table and create a new one.
By default "fail".
-
(temporary¶bool, default:False) –If set to True, the table will be created as temporary. By default False.
Source code in echo_connhandler/sql_core.py
@abstractmethod
def create_table(
self,
schema: str | None,
table_name: str,
columns: dict[str, str],
pkey_cols: list[str] | None = None,
if_exists: Literal["fail", "replace"] = "fail",
temporary: bool = False,
) -> None:
"""Method to create a table in the database.
Parameters
----------
schema : str | None
Name of the schema to create the table in. If temporary is set to True, this must be set to None.
table_name : str
Name of the table to be created.
columns : dict[str, str]
Dict in the format {column_name: column_type, ...}.
pkey_cols : list[str] | None, optional
List of columns to be set as primary key. If None, no primary key will be set. By default None.
if_exists : Literal["fail", "replace"], optional
What to do if the table already exists.
- fail: Will raise a ValueError.
- replace: Will drop the table and create a new one.
By default "fail".
temporary : bool, optional
If set to True, the table will be created as temporary. By default False.
"""
raise NotImplementedError("create_table must be implemented in subclass")
cursor()
¶
Method to return the cursor to the current connection.
Source code in echo_connhandler/sql_core.py
def cursor(self) -> Any:
"""Method to return the cursor to the current connection."""
try:
return self.connection.cursor()
except Exception:
self.reconnect()
return self.connection.cursor()
delete_table(schema, table_name, on_error='fail')
abstractmethod
¶
Method to delete a table in the database.
Parameters:
-
(schema¶str | None) –Name of the schema of the table to delete.
-
(table_name¶str) –Name of the table to be deleted.
-
(on_error¶Literal['fail', 'ignore'], default:'fail') –What to do if the table does not exist. - fail: Will raise a ValueError. - ignore: Will ignore the error. By default "fail".
Source code in echo_connhandler/sql_core.py
@abstractmethod
def delete_table(
self,
schema: str | None,
table_name: str,
on_error: Literal["fail", "ignore"] = "fail",
) -> None:
"""Method to delete a table in the database.
Parameters
----------
schema : str | None
Name of the schema of the table to delete.
table_name: str
Name of the table to be deleted.
on_error : Literal["fail", "ignore"], optional
What to do if the table does not exist.
- fail: Will raise a ValueError.
- ignore: Will ignore the error.
By default "fail".
"""
raise NotImplementedError("delete_table must be implemented in subclass")
execute(query, *args, output='cursor', **kwargs)
¶
Method to execute a SQL query using cursor.
Parameters:
-
(query¶SQL | Composed | str) –Query to be executed. Can be a SQL statement from psycopg.sql.SQL or psycopg.sql.Composed (preferable) or a string.
-
(*args¶tuple, default:()) –Any additional positional arguments are passed to
cursor.execute. Positioned here so keyword arguments after this are treated as keyword only. -
(output¶Literal['cursor', 'result'], default:'cursor') –What to return. Can be either "cursor" to return the cursor after executing the query or "result" to return the result of cursor.execute. By default "cursor".
-
–**kwargs¶Any additional keyword arguments are passed to
cursor.execute. - skip_retry : bool If set to True, will ignore number of retries set in handler and will raise the error immediately.
Returns:
-
Any–The result of
cursor.executeor the cursor itself, depending on theoutputparameter.
Source code in echo_connhandler/sql_core.py
@validate_call
def execute(self, query: SQL | Composed | str, *args, output: Literal["cursor", "result"] = "cursor", **kwargs) -> Any: # noqa
"""Method to execute a SQL query using cursor.
Parameters
----------
query : SQL | Composed | str
Query to be executed. Can be a SQL statement from psycopg.sql.SQL or psycopg.sql.Composed (preferable) or a string.
*args : tuple
Any additional positional arguments are passed to `cursor.execute`. Positioned here so keyword arguments after this are treated as keyword only.
output : Literal["cursor", "result"], optional
What to return. Can be either "cursor" to return the cursor after executing the query or "result" to return the result of cursor.execute. By default "cursor".
**kwargs
Any additional keyword arguments are passed to `cursor.execute`.
- skip_retry : bool
If set to True, will ignore number of retries set in handler and will raise the error immediately.
Returns
-------
Any
The result of `cursor.execute` or the cursor itself, depending on the `output` parameter.
"""
skip_retry = kwargs.pop("skip_retry") if "skip_retry" in kwargs else False
# Configure tenacity retry logic with exponential backoff
if skip_retry:
# If skip_retry is True, execute without retry logic
if isinstance(query, SQL | Composed):
if isinstance(self.connection, pg.Connection): # noqa
query_str = query.as_string(self.connection)
else:
# ! This will fail if query is a sql.Composed object containing a sql.Identifier, in this case change sql.Identifier to sql.SQL
query_str = query.as_string(None)
elif isinstance(query, str):
query_str = query
else:
raise TypeError(f"query must be of type psycopg.sql.SQL, psycopg.sql.Composed or string, not {type(query)}")
cur = self.cursor()
res = cur.execute(query_str, *args, **kwargs)
else:
retryer = Retrying(
stop=stop_after_attempt(self.max_retries + 1),
wait=wait_exponential(
multiplier=self.exponential_multiplier,
min=self.exponential_min_retry_wait_time,
max=self.retry_wait_time,
),
retry=retry_if_exception_type(Exception),
reraise=True,
before_sleep=lambda retry_state: logger.exception(
f"Attempt {retry_state.attempt_number}: Could not execute query. Will wait {retry_state.next_action.sleep}s until next call. Arguments used {self.connection_properties}\n{query.as_string(self.connection) if isinstance(query, SQL | Composed) and isinstance(self._connection, pg.Connection) else query}",
),
)
for attempt in retryer:
with attempt:
# Reconnect after sleep (for retry attempts)
if attempt.retry_state.attempt_number > 1:
self.reconnect()
# everything is in a try block so if the connection is lost, it can be reconnected and the query can be executed again
# this is the case for performance_db, where we have configured idle session timeout of 60 seconds
if isinstance(query, SQL | Composed):
if isinstance(self.connection, pg.Connection):
query_str = query.as_string(self.connection)
else:
# ! This will fail if query is a sql.Composed object containing a sql.Identifier, in this case change sql.Identifier to sql.SQL
query_str = query.as_string(None)
elif isinstance(query, str):
query_str = query
else:
raise TypeError(f"query must be of type psycopg.sql.SQL, psycopg.sql.Composed or string, not {type(query)}")
cur = self.cursor()
res = cur.execute(query_str, *args, **kwargs)
self._last_cursor = cur
if output == "cursor":
return cur
if output == "result":
self._auto_close_if_needed()
return res
raise ValueError(f"Invalid output value: {output}. Must be 'cursor' or 'result'.")
get_table_columns(schema, table_names)
abstractmethod
¶
Method to get the columns in a table and it's data types.
Parameters:
-
(schema¶str | None) –Name of the schema where the tables are located.
-
(table_names¶list[str]) –Name of the desired tables.
Returns:
-
dict[str, dict[str, str]]–Dict in the format {table_name: {column_name: column_type, ...}, ...}.
Source code in echo_connhandler/sql_core.py
@abstractmethod
def get_table_columns(self, schema: str | None, table_names: list[str]) -> dict[str, dict[str, str]]:
"""Method to get the columns in a table and it's data types.
Parameters
----------
schema : str | None
Name of the schema where the tables are located.
table_names : list[str]
Name of the desired tables.
Returns
-------
dict[str, dict[str, str]]
Dict in the format {table_name: {column_name: column_type, ...}, ...}.
"""
raise NotImplementedError("get_table_columns must be implemented in subclass")
get_table_names(schema, table_types=None)
abstractmethod
¶
Method to get the names of all tables in the database.
Parameters:
-
(schema¶str | None) –Name of the schema to get the tables from.
-
(table_types¶list[Literal['table', 'view']] | None, default:None) –List of table types to be returned. Can be one of ["table", "view"]. If set to None, will be equal to ["table"], by default None.
Returns:
-
list[str]–List of table names.
Source code in echo_connhandler/sql_core.py
@abstractmethod
def get_table_names(self, schema: str | None, table_types: list[Literal["table", "view"]] | None = None) -> list[str]:
"""Method to get the names of all tables in the database.
Parameters
----------
schema : str | None
Name of the schema to get the tables from.
table_types : list[Literal["table", "view"]] | None, optional
List of table types to be returned. Can be one of ["table", "view"]. If set to None, will be equal to ["table"], by default None.
Returns
-------
list[str]
List of table names.
"""
raise NotImplementedError("get_table_names must be implemented in subclass")
get_table_pkey(schema, table_names, return_type='columns')
abstractmethod
¶
Method to get the primary key of a table.
Can either return the name of the primary key or the columns associated with the primary key (depending on the type of SQL)
Parameters:
-
(schema¶str | None) –Name of the schema where the tables are located.
-
(table_names¶list[str]) –Name of the desired tables.
-
(return_type¶Literal['name', 'columns'], default:'columns') –What to return. Can be either "name" of the primary key or "columns" for the columns involved, by default "columns".
Returns:
-
dict[str, list[str] | str]–In case of return_type="columns", returns a dict in the format {table_name: [pk_col1, pk_col2, ...], ...}. In case of return_type="name", returns a dict in the format {table_name: pkey_name, ...}.
Source code in echo_connhandler/sql_core.py
@abstractmethod
def get_table_pkey(
self,
schema: str | None,
table_names: list[str],
return_type: Literal["name", "columns"] = "columns",
) -> dict[str, list[str] | str]:
"""Method to get the primary key of a table.
Can either return the name of the primary key or the columns associated with the primary key (depending on the type of SQL)
Parameters
----------
schema : str | None
Name of the schema where the tables are located.
table_names : list[str]
Name of the desired tables.
return_type : Literal["name", "columns"], optional
What to return. Can be either "name" of the primary key or "columns" for the columns involved, by default "columns".
Returns
-------
dict[str, list[str] | str]
In case of return_type="columns", returns a dict in the format {table_name: [pk_col1, pk_col2, ...], ...}.
In case of return_type="name", returns a dict in the format {table_name: pkey_name, ...}.
"""
raise NotImplementedError("get_table_pkey must be implemented in subclass")
pandas_to_sql(df, schema, table_name, if_exists='fail', ignore_index=False, index_name=None, pkey_cols=None, conflict_cols=None, return_cols=None, ignore_null_cols=True, **kwargs)
¶
Method to write a Pandas DataFrame to a SQL table.
Parameters:
-
(df¶DataFrame) –DataFrame to be written.
-
(schema¶str | None) –Name of the schema of the table.
-
(table_name¶str) –Name of the table to be written to.
-
(if_exists¶Literal['fail', 'replace', 'append', 'update', 'update_only', 'skip_row_check'], default:'fail') –What to do if the table already exists.
- fail: Will raise a ValueError.
- replace: Will drop the table and write the DataFrame to a new table.
- append: Will append the DataFrame to the existing table, skipping rows that already exist.
- update: Will update the existing table with the DataFrame. This is the same as append but will update rows that already exist.
- update_only: Will update only the rows that already exist in the table. If a row does not exist, it will be skipped. Different from update, this does not use INSERT ON CONFLICT but uses only UPDATE statements, which allows for updates in tables with IDENTITY primary keys that do not allow inserts. When used must provide conflict_cols. Please note that columns in conflict_cols will not be updated.
- skip_row_check: Will skip the check if the rows already exist and write the DataFrame to the table. It assumes no insert conflicts.
By default "fail".
-
(ignore_index¶bool, default:False) –Whether to ignore the index of the DataFrame, by default False.
-
(index_name¶str | None, default:None) –Name to set the index to. If None, the index will not be renamed. By default None.
-
(pkey_cols¶list[str] | None, default:None) –Name of the columns to set as primary key. If None, no primary key will be set. If is the same name as the index, index will be used.
This is only applicable when replace is used or if the table does not exist.
By default None.
-
(conflict_cols¶list[str] | None, default:None) –List of columns to be used to check for conflict instead of the primary key. If not set, the primary key will be used. By default None.
-
(return_cols¶list[str] | None, default:None) –List of columns to return after the operation. If None, no columns will be returned. By default None.
-
(ignore_null_cols¶bool, default:True) –If set to True, will remove columns that contain only null values before writing to SQL. By default True.
-
–**kwargs¶Used for backwards compatibility. Should be ignored.
Returns:
-
DataFrame | None–DataFrame with the specified return columns if return_cols is set. Otherwise, None
Source code in echo_connhandler/sql_core.py
@validate_call
def pandas_to_sql(
self,
df: pd.DataFrame,
schema: str | None,
table_name: str,
if_exists: Literal["fail", "replace", "append", "update", "update_only", "skip_row_check"] = "fail",
ignore_index: bool = False,
index_name: str | None = None,
pkey_cols: list[str] | None = None,
conflict_cols: list[str] | None = None,
return_cols: list[str] | None = None,
ignore_null_cols: bool = True,
**kwargs,
) -> pd.DataFrame | None:
"""Method to write a Pandas DataFrame to a SQL table.
Parameters
----------
df : pd.DataFrame
DataFrame to be written.
schema: str | None
Name of the schema of the table.
table_name : str
Name of the table to be written to.
if_exists : Literal["fail", "replace", "append", "update", "update_only", "skip_row_check"], optional
What to do if the table already exists.
- fail: Will raise a ValueError.
- replace: Will drop the table and write the DataFrame to a new table.
- append: Will append the DataFrame to the existing table, skipping rows that already exist.
- update: Will update the existing table with the DataFrame. This is the same as append but will update rows that already exist.
- update_only: Will update only the rows that already exist in the table. If a row does not exist, it will be skipped. Different from update, this does not use INSERT ON CONFLICT but uses only UPDATE statements, which allows for updates in tables with IDENTITY primary keys that do not allow inserts. When used must provide conflict_cols. Please note that columns in conflict_cols will not be updated.
- skip_row_check: Will skip the check if the rows already exist and write the DataFrame to the table. It assumes no insert conflicts.
By default "fail".
ignore_index : bool, optional
Whether to ignore the index of the DataFrame, by default False.
index_name : str | None, optional
Name to set the index to. If None, the index will not be renamed. By default None.
pkey_cols : list[str] | None, optional
Name of the columns to set as primary key. If None, no primary key will be set. If is the same name as the index, index will be used.
This is only applicable when replace is used or if the table does not exist.
By default None.
conflict_cols : list[str] | None, optional
List of columns to be used to check for conflict instead of the primary key. If not set, the primary key will be used.
By default None.
return_cols: list[str] | None, optional
List of columns to return after the operation. If None, no columns will be returned.
By default None.
ignore_null_cols: bool, optional
If set to True, will remove columns that contain only null values before writing to SQL.
By default True.
**kwargs
Used for backwards compatibility. Should be ignored.
Returns
-------
pd.DataFrame | None
DataFrame with the specified return columns if return_cols is set. Otherwise, None
"""
t1 = perf_counter()
self._suppress_auto_close = True
try:
# pre-processing
df = self._pre_df_to_sql(
df=df,
schema=schema,
table_name=table_name,
if_exists=if_exists,
ignore_index=ignore_index,
index_name=index_name,
pkey_cols=pkey_cols,
conflict_cols=conflict_cols,
remove_null_columns=ignore_null_cols,
**kwargs,
)
# checking if DataFrame is empty
if df.empty:
logger.debug("DataFrame is empty. Nothing to do.")
return None
result = self._pandas_to_sql(
df=df,
schema=schema,
table_name=table_name,
if_exists=if_exists,
conflict_cols=conflict_cols,
return_cols=return_cols,
**kwargs,
)
logger.debug(
f"DataFrame of shape {df.shape} written to table {table_name} in schema {schema} in {perf_counter() - t1:.2f} seconds.",
)
return result
finally:
self._suppress_auto_close = False
self._auto_close_if_needed()
polars_to_sql(df, schema, table_name, if_exists='fail', pkey_cols=None, conflict_cols=None, return_cols=None, ignore_null_cols=True, **kwargs)
¶
Method to write a Polars DataFrame to a SQL table.
Parameters:
-
(df¶DataFrame) –Polars DataFrame to be written.
-
(schema¶str | None) –Name of the schema of the table.
-
(table_name¶str) –Name of the table to be written to.
-
(if_exists¶Literal['fail', 'replace', 'append', 'update', 'update_only', 'skip_row_check'], default:'fail') –What to do if the table already exists.
- fail: Will raise a ValueError.
- replace: Will drop the table and write the DataFrame to a new table.
- append: Will append the DataFrame to the existing table, skipping rows that already exist.
- update: Will update the existing table with the DataFrame. This is the same as append but will update rows that already exist.
- update_only: Will only update rows that already exist in the table. If a row does not exist, it will be skipped. Different from update, this does not use INSERT ON CONFLICT but uses only UPDATE statements, which allows for updates in tables with IDENTITY primary keys that do not allow inserts. When used must provide conflict_cols. Please note that columns in conflict_cols will not be updated.
- skip_row_check: Will skip the check if the rows already exist and write the DataFrame to the table. It assumes no insert conflicts.
By default "fail".
-
(pkey_cols¶list[str] | None, default:None) –Name of the columns to set as primary key. If None, no primary key will be set.
This is only applicable when replace is used or if the table does not exist.
By default None.
-
(conflict_cols¶list[str] | None, default:None) –List of columns to be used to check for conflict instead of the primary key. If not set, the primary key will be used. By default None.
-
(return_cols¶list[str] | None, default:None) –List of columns to return after the operation. If None, no columns will be returned. By default None.
-
(ignore_null_cols¶bool, default:True) –If set to True, columns that contain only null values will be ignored when writing to the database. By default True.
-
–**kwargs¶Used for backwards compatibility. Should be ignored.
Returns:
-
DataFrame | None–Polars DataFrame with the specified return columns if return_cols is set. Otherwise, None
Source code in echo_connhandler/sql_core.py
@validate_call
def polars_to_sql(
self,
df: pl.DataFrame,
schema: str | None,
table_name: str,
if_exists: Literal["fail", "replace", "append", "update", "update_only", "skip_row_check"] = "fail",
pkey_cols: list[str] | None = None,
conflict_cols: list[str] | None = None,
return_cols: list[str] | None = None,
ignore_null_cols: bool = True,
**kwargs,
) -> pl.DataFrame | None:
"""Method to write a Polars DataFrame to a SQL table.
Parameters
----------
df : pl.DataFrame
Polars DataFrame to be written.
schema: str | None
Name of the schema of the table.
table_name : str
Name of the table to be written to.
if_exists : Literal["fail", "replace", "append", "update", "update_only", "skip_row_check"], optional
What to do if the table already exists.
- fail: Will raise a ValueError.
- replace: Will drop the table and write the DataFrame to a new table.
- append: Will append the DataFrame to the existing table, skipping rows that already exist.
- update: Will update the existing table with the DataFrame. This is the same as append but will update rows that already exist.
- update_only: Will only update rows that already exist in the table. If a row does not exist, it will be skipped. Different from update, this does not use INSERT ON CONFLICT but uses only UPDATE statements, which allows for updates in tables with IDENTITY primary keys that do not allow inserts. When used must provide conflict_cols. Please note that columns in conflict_cols will not be updated.
- skip_row_check: Will skip the check if the rows already exist and write the DataFrame to the table. It assumes no insert conflicts.
By default "fail".
pkey_cols : list[str] | None, optional
Name of the columns to set as primary key. If None, no primary key will be set.
This is only applicable when replace is used or if the table does not exist.
By default None.
conflict_cols : list[str] | None, optional
List of columns to be used to check for conflict instead of the primary key. If not set, the primary key will be used.
By default None.
return_cols: list[str] | None, optional
List of columns to return after the operation. If None, no columns will be returned.
By default None.
ignore_null_cols: bool, optional
If set to True, columns that contain only null values will be ignored when writing to the database.
By default True.
**kwargs
Used for backwards compatibility. Should be ignored.
Returns
-------
pl.DataFrame | None
Polars DataFrame with the specified return columns if return_cols is set. Otherwise, None
"""
t1 = perf_counter()
self._suppress_auto_close = True
try:
# pre-processing
df = self._pre_df_to_sql(
df=df,
schema=schema,
table_name=table_name,
if_exists=if_exists,
pkey_cols=pkey_cols,
conflict_cols=conflict_cols,
index_name=None,
ignore_index=True,
remove_null_columns=ignore_null_cols,
**kwargs,
)
# checking if DataFrame is empty
if len(df) == 0:
logger.debug("DataFrame is empty. Nothing to do.")
return None
result = self._polars_to_sql(
df=df,
schema=schema,
table_name=table_name,
if_exists=if_exists,
conflict_cols=conflict_cols,
return_cols=return_cols,
**kwargs,
)
logger.debug(
f"DataFrame of shape {df.shape} written to table {table_name} in schema {schema} in {perf_counter() - t1:.2f} seconds.",
)
return result
finally:
self._suppress_auto_close = False
self._auto_close_if_needed()
read_to_pandas(query, *args, use_adbc=False, **kwargs)
¶
Read a SQL query into a Pandas DataFrame.
Uses either ADBC (Apache Arrow Database Connectivity) or the handler's native connection to execute the query and return the result as a DataFrame.
ADBC provides faster reads for large datasets by using Apache Arrow's columnar format
natively, avoiding row-by-row conversion. It opens a separate short-lived connection per
query (managed by the ADBC driver), so it cannot see session-scoped objects like temporary
tables or session variables (e.g. SHOW statements in PostgreSQL).
Parameters:
-
(query¶SQL | Composed | str) –Query to be executed. Can be a SQL statement from psycopg.sql.SQL or psycopg.sql.Composed (preferable) or a string.
-
(use_adbc¶bool, default:False) –If
True(default), uses an ADBC connection for reading, enabling fast Arrow-native data transfer (good for large datasets). IfFalse, uses the handler's standard database connection (e.g. psycopg for PostgreSQL), which is good for small datasets or when session state (temp tables) is needed.Set to
Falsewhen you need access to session state (temp tables) or when the handler does not support ADBC.By default False.
Other Parameters:
-
dtype_backend(str | None) –Backend to use for type conversion. Can be
'pyarrow','numpy_nullable'orNone(which simulates as if this argument was not passed). By default'pyarrow'. -
dtype(dict[str, str]) –Dict in the format
{column_name: dtype, ...}to be used to convert the columns to the desired data type. -
index_col(str | list[str]) –Column(s) to set as index of the DataFrame.
-
parse_dates(list[str] | dict[str, str]) –Column(s) to parse as date(s). If a dict is passed, the keys should be column names and the values should be the date format.
-
post_convert(str | None) –Can be set to
'pyarrow','numpy_nullable'orNone. This will calldf.convert_dtypesafter reading the DataFrame.Useful when there is a JSON/JSONB column that is not being read correctly with
dtype_backend='pyarrow'. In this case setdtype_backend=Noneandpost_convert='pyarrow'. -
remove_time_zone(bool) –If
True(default), removes the time zone from time-zone-aware datetime columns, keeping the values in the time zone defined in the connection properties.By default, this is set to
Trueto avoid issues with time zones when reading datetime columns. Set toFalseif you want to keep the time zone information in the DataFrame. -
skip_retry(bool) –If
True, ignores the retry count and raises errors immediately.
Returns:
-
DataFrame–DataFrame containing the results of the query.
Source code in echo_connhandler/sql_core.py
@validate_call
def read_to_pandas(
self,
query: SQL | Composed | str,
*args, # noqa: ANN002, ARG002
use_adbc: bool = False,
**kwargs: Unpack[ReadToPandasKwargs],
) -> pd.DataFrame:
"""Read a SQL query into a Pandas DataFrame.
Uses either ADBC (Apache Arrow Database Connectivity) or the handler's native connection
to execute the query and return the result as a DataFrame.
ADBC provides faster reads for large datasets by using Apache Arrow's columnar format
natively, avoiding row-by-row conversion. It opens a separate short-lived connection per
query (managed by the ADBC driver), so it cannot see session-scoped objects like temporary
tables or session variables (e.g. ``SHOW`` statements in PostgreSQL).
Parameters
----------
query : SQL | Composed | str
Query to be executed. Can be a SQL statement from psycopg.sql.SQL or psycopg.sql.Composed (preferable) or a string.
use_adbc : bool, optional
If ``True`` (default), uses an ADBC connection for reading, enabling fast Arrow-native data transfer (good for large datasets).
If ``False``, uses the handler's standard database connection (e.g. psycopg for PostgreSQL), which is good for small datasets or when session state (temp tables) is needed.
Set to ``False`` when you need access to session state (temp tables) or when the handler does not support ADBC.
By default False.
Other Parameters
----------------
dtype_backend : str | None
Backend to use for type conversion. Can be ``'pyarrow'``, ``'numpy_nullable'`` or ``None`` (which simulates as if this argument was not passed).
By default ``'pyarrow'``.
dtype : dict[str, str]
Dict in the format ``{column_name: dtype, ...}`` to be used to convert the columns to the desired data type.
index_col : str | list[str]
Column(s) to set as index of the DataFrame.
parse_dates : list[str] | dict[str, str]
Column(s) to parse as date(s). If a dict is passed, the keys should be column names and the values should be the date format.
post_convert : str | None
Can be set to ``'pyarrow'``, ``'numpy_nullable'`` or ``None``. This will call ``df.convert_dtypes`` after reading the DataFrame.
Useful when there is a JSON/JSONB column that is not being read correctly with ``dtype_backend='pyarrow'``.
In this case set ``dtype_backend=None`` and ``post_convert='pyarrow'``.
remove_time_zone : bool
If ``True`` (default), removes the time zone from time-zone-aware datetime columns,
keeping the values in the time zone defined in the connection properties.
By default, this is set to ``True`` to avoid issues with time zones when reading datetime columns. Set to ``False`` if you want to keep the time zone information in the DataFrame.
skip_retry : bool
If ``True``, ignores the retry count and raises errors immediately.
Returns
-------
pd.DataFrame
DataFrame containing the results of the query.
"""
# defining dtype_backend to pyarrow as default
if "dtype_backend" not in kwargs:
if "post_convert" not in kwargs or kwargs["post_convert"] is None:
kwargs["dtype_backend"] = "pyarrow"
else:
kwargs["dtype_backend"] = None
# deleting dtype_backend if was set to None
if kwargs["dtype_backend"] is None:
del kwargs["dtype_backend"]
remove_time_zone = True if "remove_time_zone" not in kwargs else kwargs.pop("remove_time_zone")
# getting post_convert
post_convert = kwargs.pop("post_convert", None)
if post_convert not in [None, "pyarrow", "numpy_nullable"]:
raise ValueError(f"post_convert must be one of [None, 'pyarrow', 'numpy_nullable'], not {post_convert}")
if "dtype_backend" in kwargs and post_convert is not None:
raise ValueError("post_convert can only be used if dtype_backend is not set")
skip_retry = kwargs.pop("skip_retry") if "skip_retry" in kwargs else False
def _resolve_query_str() -> str:
if isinstance(query, SQL | Composed):
if isinstance(self._connection, pg.Connection):
return query.as_string(self._connection)
return query.as_string(None)
if isinstance(query, str):
return query
raise TypeError(f"query must be of type psycopg.sql.SQL, psycopg.sql.Composed or string, not {type(query)}")
def _read_pandas(conn: object) -> pd.DataFrame:
return pd.read_sql(sql=_resolve_query_str(), con=conn, **kwargs)
def _read_pandas_adbc() -> pd.DataFrame:
if self._cache_adbc_connection:
if self._adbc_connection is None:
self._adbc_connection = self._create_adbc_connection()
if self._adbc_connection is None:
raise RuntimeError("ADBC connection not available for this handler")
return pd.read_sql(sql=_resolve_query_str(), con=self._adbc_connection, **kwargs)
conn = self._create_adbc_connection()
if conn is None:
raise RuntimeError("ADBC connection not available for this handler")
try:
return pd.read_sql(sql=_resolve_query_str(), con=conn, **kwargs)
finally:
with contextlib.suppress(Exception):
conn.close()
# Configure tenacity retry logic with exponential backoff
if skip_retry:
df: pd.DataFrame = _read_pandas_adbc() if use_adbc else _read_pandas(self.connection)
else:
retryer = Retrying(
stop=stop_after_attempt(self.max_retries + 1),
wait=wait_exponential(
multiplier=self.exponential_multiplier,
min=self.exponential_min_retry_wait_time,
max=self.retry_wait_time,
),
retry=retry_if_exception_type(Exception),
reraise=True,
before_sleep=lambda retry_state: logger.exception(
f"Attempt {retry_state.attempt_number}: Could not execute query. Will wait {retry_state.next_action.sleep}s until next call. Arguments used {self.connection_properties}\n{query.as_string(self.connection) if isinstance(query, SQL | Composed) and isinstance(self._connection, pg.Connection) else query}",
),
)
for attempt in retryer:
with attempt:
# Reconnect after sleep (for retry attempts)
if attempt.retry_state.attempt_number > 1:
if use_adbc:
if self._adbc_connection is not None:
try:
self._adbc_connection.close()
except Exception: # noqa
pass
finally:
self._adbc_connection = None
else:
self.reconnect()
if use_adbc:
df: pd.DataFrame = _read_pandas_adbc()
else:
df: pd.DataFrame = _read_pandas(self.connection)
# ADBC binary COPY always returns TIMESTAMPTZ as UTC regardless of SET timezone;
# convert to the configured session timezone so behavior matches psycopg
if use_adbc and self._convert_time_zone:
target_tz = getattr(self.connection_properties, "time_zone", "UTC")
for col in df.columns:
if "duration" in str(df[col].dtype).lower() or "timedelta" in str(df[col].dtype).lower():
continue
if hasattr(df[col], "dt") and df[col].dt.tz is not None and str(df[col].dt.tz) == "UTC" and target_tz != "UTC":
df[col] = df[col].dt.tz_convert(target_tz)
# checking if we should remove the time zone
if remove_time_zone:
# iterating columns
for col in df.columns:
# checking if column has dt attribute
if "duration" in str(df[col].dtype).lower() or "timedelta" in str(df[col].dtype).lower():
continue
# sourcery skip: merge-nested-ifs
if hasattr(df[col], "dt"): # noqa: SIM102
# skipping if is duration
# checking if column is time zone aware
if df[col].dt.tz is not None:
# removing time zone
# here we have to convert to datetime64 because pyarrow has a bug that makes the final result stay at UTC
df[col] = (
df[col].astype(f"datetime64[{df[col].dt.unit}, {self.connection_properties.time_zone}]").dt.tz_localize(None)
)
# converting the DataFrame to the desired data types
if post_convert is not None:
df = df.convert_dtypes(dtype_backend=post_convert)
# doing any post-processing of the DataFrame
df = self._process_pandas_read(df, **kwargs)
self._auto_close_if_needed()
return df
read_to_polars(query, *args, use_adbc=False, **kwargs)
¶
Read a SQL query into a Polars DataFrame.
Uses either ADBC (Apache Arrow Database Connectivity) or the handler's native connection to execute the query and return the result as a DataFrame.
ADBC provides faster reads for large datasets by using Apache Arrow's columnar format natively, which is especially beneficial with Polars since it is Arrow-native. It opens a separate short-lived connection per query, so it cannot see session-scoped objects like temporary tables.
Parameters:
-
(query¶SQL | Composed | str) –Query to be executed. Can be a SQL statement from psycopg.sql.SQL or psycopg.sql.Composed (preferable) or a string.
-
(use_adbc¶bool, default:False) –If
True(default), uses an ADBC connection for reading, enabling fast Arrow-native data transfer (good for large datasets). IfFalse, uses the handler's standard database connection (e.g. psycopg for PostgreSQL), which is good for small datasets or when session state (temp tables) is needed.Set to
Falsewhen you need access to session state (temp tables) or when the handler does not support ADBC.By default False.
Other Parameters:
-
schema_overrides(dict[str, DataTypeClass] | None) –A dictionary mapping column names to dtypes, used to override the schema inferred from the query. An example would be
{"col1": pl.Int64, "col2": pl.Float64}. -
infer_schema_length(int | None) –The maximum number of rows to scan for schema inference. If set to
None, the full data may be scanned. Defaults toNoneto avoid issues. -
remove_time_zone(bool) –If
True(default), removes the time zone from time-zone-aware datetime columns, keeping the values in the time zone defined in the connection properties. -
skip_retry(bool) –If
True, ignores the retry count and raises errors immediately.
Returns:
-
DataFrame–DataFrame containing the results of the query.
Source code in echo_connhandler/sql_core.py
@validate_call
def read_to_polars(
self,
query: SQL | Composed | str,
*args, # noqa: ANN002, ARG002
use_adbc: bool = False,
**kwargs: Unpack[ReadToPolarsKwargs],
) -> pl.DataFrame:
"""Read a SQL query into a Polars DataFrame.
Uses either ADBC (Apache Arrow Database Connectivity) or the handler's native connection
to execute the query and return the result as a DataFrame.
ADBC provides faster reads for large datasets by using Apache Arrow's columnar format
natively, which is especially beneficial with Polars since it is Arrow-native. It opens a
separate short-lived connection per query, so it cannot see session-scoped objects like
temporary tables.
Parameters
----------
query : SQL | Composed | str
Query to be executed. Can be a SQL statement from psycopg.sql.SQL or psycopg.sql.Composed (preferable) or a string.
use_adbc : bool, optional
If ``True`` (default), uses an ADBC connection for reading, enabling fast Arrow-native data transfer (good for large datasets).
If ``False``, uses the handler's standard database connection (e.g. psycopg for PostgreSQL), which is good for small datasets or when session state (temp tables) is needed.
Set to ``False`` when you need access to session state (temp tables) or when the handler does not support ADBC.
By default False.
Other Parameters
----------------
schema_overrides : dict[str, pl.DataTypeClass] | None
A dictionary mapping column names to dtypes, used to override the schema inferred from the query.
An example would be ``{"col1": pl.Int64, "col2": pl.Float64}``.
infer_schema_length : int | None, optional
The maximum number of rows to scan for schema inference. If set to ``None``, the full data may be scanned.
Defaults to ``None`` to avoid issues.
remove_time_zone : bool
If ``True`` (default), removes the time zone from time-zone-aware datetime columns,
keeping the values in the time zone defined in the connection properties.
skip_retry : bool
If ``True``, ignores the retry count and raises errors immediately.
Returns
-------
pl.DataFrame
DataFrame containing the results of the query.
"""
skip_retry = kwargs.pop("skip_retry") if "skip_retry" in kwargs else False
remove_time_zone = kwargs.pop("remove_time_zone", True)
# set infer_schema_length to None if not set
if "infer_schema_length" not in kwargs:
kwargs["infer_schema_length"] = None
def _resolve_query_str() -> str:
if isinstance(query, SQL | Composed):
if isinstance(self._connection, pg.Connection):
return query.as_string(self._connection)
return query.as_string(None)
if isinstance(query, str):
return query
raise TypeError(f"query must be of type psycopg.sql.SQL, psycopg.sql.Composed or string, not {type(query)}")
def _read_polars(conn: object) -> pl.DataFrame:
return pl.read_database(query=_resolve_query_str(), connection=conn, **kwargs)
def _read_polars_adbc() -> pl.DataFrame:
if self._cache_adbc_connection:
if self._adbc_connection is None:
self._adbc_connection = self._create_adbc_connection()
if self._adbc_connection is None:
raise RuntimeError("ADBC connection not available for this handler")
return pl.read_database(query=_resolve_query_str(), connection=self._adbc_connection, **kwargs)
conn = self._create_adbc_connection()
if conn is None:
raise RuntimeError("ADBC connection not available for this handler")
try:
return pl.read_database(query=_resolve_query_str(), connection=conn, **kwargs)
finally:
with contextlib.suppress(Exception):
conn.close()
# removing datetime columns from schema overrides to not mess up with time zone conversion later
datetime_schema_cols = None
if "schema_overrides" in kwargs and kwargs["schema_overrides"] is not None:
datetime_schema_cols = {
col: dtype
for col, dtype in kwargs["schema_overrides"].items()
# Catches the raw class OR an instantiated timezone-aware object
if dtype == pl.Datetime or isinstance(dtype, pl.Datetime)
}
if datetime_schema_cols:
for col in datetime_schema_cols:
kwargs["schema_overrides"].pop(col)
# Configure tenacity retry logic with exponential backoff
if skip_retry:
df: pl.DataFrame = _read_polars_adbc() if use_adbc else _read_polars(self.connection)
else:
retryer = Retrying(
stop=stop_after_attempt(self.max_retries + 1),
wait=wait_exponential(
multiplier=self.exponential_multiplier,
min=self.exponential_min_retry_wait_time,
max=self.retry_wait_time,
),
retry=retry_if_exception_type(Exception),
reraise=True,
before_sleep=lambda retry_state: logger.exception(
f"Attempt {retry_state.attempt_number}: Could not execute query. Will wait {retry_state.next_action.sleep}s until next call. Arguments used {self.connection_properties}\n{query.as_string(self.connection) if isinstance(query, SQL | Composed) and isinstance(self._connection, pg.Connection) else query}",
),
)
for attempt in retryer:
with attempt:
# Reconnect after sleep (for retry attempts)
if attempt.retry_state.attempt_number > 1:
if use_adbc:
if self._adbc_connection is not None:
try:
self._adbc_connection.close()
except Exception: # noqa
pass
finally:
self._adbc_connection = None
else:
self.reconnect()
if use_adbc:
df: pl.DataFrame = _read_polars_adbc()
else:
df: pl.DataFrame = _read_polars(self.connection)
# ADBC binary COPY always returns TIMESTAMPTZ as UTC regardless of SET timezone;
# convert to the configured session timezone so behavior matches psycopg
if use_adbc and self._convert_time_zone:
target_tz = getattr(self.connection_properties, "time_zone", "UTC")
if target_tz != "UTC":
tz_casts = [
pl.col(col_name).dt.convert_time_zone(target_tz)
for col_name, dtype in df.schema.items()
if isinstance(dtype, pl.Datetime) and dtype.time_zone == "UTC"
]
if tz_casts:
df = df.with_columns(tz_casts)
# removing time zone if requested
if remove_time_zone:
tz_strips = [
pl.col(col_name).dt.replace_time_zone(None)
for col_name, dtype in df.schema.items()
if isinstance(dtype, pl.Datetime) and dtype.time_zone is not None
]
if tz_strips:
df = df.with_columns(tz_strips)
# forcing schema for datetime columns if they were in the schema overrides, since we had to remove the time zone information from them for the previous steps
if datetime_schema_cols:
df = df.with_columns(
*[pl.col(col_name).cast(dtype) for col_name, dtype in datetime_schema_cols.items() if col_name in df.columns],
)
self._auto_close_if_needed()
return df
table_exists(schema, table_name)
abstractmethod
¶
Method to check if a table, view, or materialized view exists in the database.
Parameters:
-
(schema¶str | None) –Name of the schema where the table should be located.
-
(table_name¶str) –Name of the table to check.
Returns:
-
bool–True if the table exists, False otherwise.
Source code in echo_connhandler/sql_core.py
@abstractmethod
def table_exists(self, schema: str | None, table_name: str) -> bool:
"""Method to check if a table, view, or materialized view exists in the database.
Parameters
----------
schema : str | None
Name of the schema where the table should be located.
table_name : str
Name of the table to check.
Returns
-------
bool
True if the table exists, False otherwise.
"""
raise NotImplementedError("table_exists must be implemented in subclass")