SQLite¶
SqlLiteConnProperties(path, overwrite=False, autocommit=True)
dataclass
¶
Class that holds the properties used to establish a connection to a SQLite database.
Parameters:
-
path
¶str
) –The path to the SQLite database file.
-
overwrite
¶bool
, default:False
) –If set to True, the database file will be overwritten if it already exists. By default False.
-
autocommit
¶bool
, default:True
) –If set to True, connection will be committed automatically after every transaction. By default True.
__post_init__()
¶
Method that checks if inputs are valid after initialization.
Source code in echo_connhandler/sql_lite.py
def __post_init__(self) -> None:
"""Method that checks if inputs are valid after initialization."""
# checking if the path is valid
if not isinstance(self.path, str):
raise TypeError(f"path must be of type str, not {type(self.path)}")
# checking if the path is valid
if not self.path.endswith(".db"):
raise ValueError(f"path must end with .db, not {self.path}")
# checking if the overwrite is valid
if not isinstance(self.overwrite, bool):
raise TypeError(f"overwrite must be of type bool, not {type(self.overwrite)}")
# checking if the autocommit is valid
if not isinstance(self.autocommit, bool):
raise TypeError(f"autocommit must be of type bool, not {type(self.autocommit)}")
SqlLiteHandler(connection_properties, max_retries=1, retry_wait_time=1, skip_connect=False, **kwargs)
¶
Subclass of SqlHandler used for SQLite.
This already connects to the SQL server.
Parameters:
-
connection_properties
¶SqlLiteConnProperties
) –Object containing connection parameters.
-
max_retries
¶int
, default:1
) –Number of retries that will be attempted when reconnecting or doing queries, by default 1
-
retry_wait_time
¶float
, default:1
) –Wait time in seconds between each connection or query retry, by default 1
-
skip_connect
¶bool
, default:False
) –If True, the connection will not be established when the object is created.
If this is set toTrue, the user will need to manually call the reconnect() method when they want to connect to the server.
By default False
-
**kwargs
¶dict
, default:{}
) –Just kept here for compatibility.
Source code in echo_connhandler/sql_lite.py
@validate_call
def __init__(
self,
connection_properties: SqlLiteConnProperties,
max_retries: int = 1,
retry_wait_time: float = 1,
skip_connect: bool = False,
**kwargs, # pylint: disable=unused-argument # noqa
) -> None:
"""Method that initializes the SQL handler.
This already connects to the SQL server.
Parameters
----------
connection_properties : SqlLiteConnProperties
Object containing connection parameters.
max_retries : int, optional
Number of retries that will be attempted when reconnecting or doing queries, by default 1
retry_wait_time : float, optional
Wait time in seconds between each connection or query retry, by default 1
skip_connect : bool, optional
If True, the connection will not be established when the object is created.
If this is set toTrue, the user will need to manually call the reconnect() method when they want to connect to the server.
By default False
**kwargs : dict, optional
Just kept here for compatibility.
"""
super().__init__(
connection_properties=connection_properties,
max_retries=max_retries,
retry_wait_time=retry_wait_time,
skip_connect=skip_connect,
)
create_table(table_name, columns, pkey_cols=None, if_exists='fail', temporary=False, **kwargs)
¶
Method to create a table in the database.
Parameters:
-
table_name
¶str
) –Name of the table to be created.
-
columns
¶dict[str, str]
) –Dict in the format {column_name: column_type, ...}.
-
pkey_cols
¶list[str] | None
, default:None
) –List of columns to be set as primary key. If None, no primary key will be set. By default None.
-
if_exists
¶Literal['fail', 'replace']
, default:'fail'
) –What to do if the table already exists.
- fail: Will raise a ValueError.
- replace: Will drop the table and create a new one.
By default "fail".
-
temporary
¶bool
, default:False
) –If set to True, the table will be created as temporary. By default False.
-
**kwargs
¶Used for compatibility. Should be ignored.
Source code in echo_connhandler/sql_lite.py
@validate_call
def create_table(
self,
table_name: str,
columns: dict[str, str],
pkey_cols: list[str] | None = None,
if_exists: Literal["fail", "replace"] = "fail",
temporary: bool = False,
**kwargs, # noqa
) -> None:
"""Method to create a table in the database.
Parameters
----------
table_name : str
Name of the table to be created.
columns : dict[str, str]
Dict in the format {column_name: column_type, ...}.
pkey_cols : list[str] | None, optional
List of columns to be set as primary key. If None, no primary key will be set. By default None.
if_exists : Literal["fail", "replace"], optional
What to do if the table already exists.
- fail: Will raise a ValueError.
- replace: Will drop the table and create a new one.
By default "fail".
temporary : bool, optional
If set to True, the table will be created as temporary. By default False.
**kwargs
Used for compatibility. Should be ignored.
"""
# checking if if_exists is valid
if if_exists not in ["fail", "replace"]:
raise ValueError(f"if_exists must be one of ['fail', 'replace'], not {if_exists}")
# checking if table_name is valid
if not isinstance(table_name, str):
raise TypeError(f"table_name must be of type str, not {type(table_name)}")
# checking if pkey_cols is valid
if pkey_cols is not None and not (isinstance(pkey_cols, list) and all(isinstance(col, str) for col in pkey_cols)):
raise TypeError(f"pkey_cols must be of type list[str], not {type(pkey_cols)}")
# checking if temporary is valid
if not isinstance(temporary, bool):
raise TypeError(f"temporary must be of type bool, not {type(temporary)}")
# checking if table already exists
existing_tables = self.get_table_names()
if table_name in existing_tables and if_exists == "fail":
raise ValueError(f"Table {table_name} already exists. If you want to replace it, set if_exists to 'replace'.")
# checking if columns is valid
if not isinstance(columns, dict):
raise TypeError(f"columns must be of type dict, not {type(columns)}")
# dropping table if it already exists
if table_name in existing_tables and if_exists == "replace":
logger.debug(f"Table '{table_name}' already exists. Dropping it.")
query = sql.SQL("DROP TABLE {table_name}").format(table_name=sql.SQL(f'"{table_name}"'))
self.execute(query=query)
# creating the table
query = sql.SQL("CREATE {temp_table} TABLE {table_name} ({table_cols} {p_key});").format(
temp_table=sql.SQL("TEMP") if temporary else sql.SQL(""),
table_name=sql.SQL(f'"{table_name}"'),
table_cols=sql.SQL(", ").join(sql.SQL(f'"{col}" {sql_dtype}') for col, sql_dtype in columns.items()),
p_key=sql.SQL(", PRIMARY KEY ({pkey_cols})").format(pkey_cols=sql.SQL(", ").join(sql.SQL(f'"{col}"') for col in pkey_cols))
if (pkey_cols is not None and len(pkey_cols) > 0)
else sql.SQL(""),
)
self.execute(query=query)
logger.debug(f"Table '{table_name}' created with columns {columns} and primary key {pkey_cols}.")
delete_table(table_name, on_error='fail', **kwargs)
¶
Method to delete a table in the database.
Parameters:
-
table_name
¶str
) –Name of the table to be deleted.
-
on_error
¶Literal['fail', 'ignore']
, default:'fail'
) –What to do if the table does not exist. - fail: Will raise a ValueError. - ignore: Will ignore the error. By default "fail".
-
**kwargs
¶Used for compatibility. Should be ignored.
Source code in echo_connhandler/sql_lite.py
@validate_call
def delete_table(
self,
table_name: str,
on_error: Literal["fail", "ignore"] = "fail",
**kwargs, # noqa
) -> None:
"""Method to delete a table in the database.
Parameters
----------
table_name : str
Name of the table to be deleted.
on_error : Literal["fail", "ignore"], optional
What to do if the table does not exist.
- fail: Will raise a ValueError.
- ignore: Will ignore the error.
By default "fail".
**kwargs
Used for compatibility. Should be ignored.
"""
# checking if arguments are valid
if not isinstance(table_name, str):
raise TypeError(f"table_name must be of type str, not {type(table_name)}")
if on_error not in ["fail", "ignore"]:
raise ValueError(f"on_error must be one of ['fail', 'ignore'], not {on_error}")
# checking if table exists
existing_tables = self.get_table_names()
if table_name not in existing_tables:
if on_error == "fail":
raise ValueError(f"Table {table_name} does not exist.")
return
# deleting the table
query = sql.SQL("DROP TABLE {table_name}").format(table_name=sql.SQL(f'"{table_name}"'))
i = 0
while table_name in self.get_table_names() and i < 3:
self.execute(query=query)
i += 1
if table_name in self.get_table_names():
raise RuntimeError(f"Could not delete table {table_name}")
get_table_columns(table_names, **kwargs)
¶
Method to get the columns in a table and it's data types.
Parameters:
-
table_names
¶list[str]
) –Name of the desired tables.
-
**kwargs
¶Used for compatibility. Should be ignored.
Returns:
-
dict[str, dict[str, str]]
–Dict in the format {table_name: {column_name: column_type, ...}, ...}.
Source code in echo_connhandler/sql_lite.py
@validate_call
def get_table_columns(self, table_names: list[str], **kwargs) -> dict[str, dict[str, str]]: # noqa
"""Method to get the columns in a table and it's data types.
Parameters
----------
table_names : list[str]
Name of the desired tables.
**kwargs
Used for compatibility. Should be ignored.
Returns
-------
dict[str, dict[str, str]]
Dict in the format {table_name: {column_name: column_type, ...}, ...}.
"""
# checking if table_names is valid
if not isinstance(table_names, list):
raise TypeError(f"table_names must be of type list, not {type(table_names)}")
if not all(isinstance(table, str) for table in table_names):
raise TypeError(f"table_names must be a list of str, not {table_names}")
# checking if table exists
existing_tables = self.get_table_names()
missing_tables = set(table_names) - set(existing_tables)
if len(missing_tables) > 0:
raise ValueError(f"Table(s) {missing_tables} do(es) not exist.")
table_info = {}
for table in table_names:
query = sql.SQL("PRAGMA table_info({})").format(sql.Literal(table))
table_info[table] = self.read_to_pandas(query=query).set_index("name").to_dict()["type"]
return table_info
get_table_names(table_types=None, **kwargs)
¶
Method to get the names of all tables in the database.
Parameters:
-
table_types
¶list[Literal['table', 'view']] | None
, default:None
) –List of table types to be returned. Can be one of ["table", "view"]. If set to None, will be equal to ["table"], by default None.
-
**kwargs
¶Used for compatibility. Should be ignored.
Returns:
-
list[str]
–List of table names.
Source code in echo_connhandler/sql_lite.py
@validate_call
def get_table_names(self, table_types: list[Literal["table", "view"]] | None = None, **kwargs) -> list[str]: # noqa
"""Method to get the names of all tables in the database.
Parameters
----------
table_types : list[Literal["table", "view"]] | None, optional
List of table types to be returned. Can be one of ["table", "view"]. If set to None, will be equal to ["table"], by default None.
**kwargs
Used for compatibility. Should be ignored.
Returns
-------
list[str]
List of table names.
"""
# defining default value for table_types
if table_types is None:
table_types = ["table"]
# checking if table_types is valid
if any(table_type not in ["table", "view"] for table_type in table_types):
raise ValueError(f"table_types must be a subset of ['table', 'view'], not {table_types}.")
query = sql.SQL("SELECT name FROM sqlite_master WHERE type IN ({})").format(
sql.SQL(", ").join(sql.Literal(table_type) for table_type in table_types),
)
return self.read_to_pandas(query=query)["name"].to_list()
get_table_pkey(table_names, return_type='columns', **kwargs)
¶
Method to get the primary key of a table.
Can either return the name of the primary key or the columns associated with the primary key (depending on the type of SQL)
Parameters:
-
schema
¶str
) –Name of the schema where the tables are located.
-
table_names
¶list[str]
) –Name of the desired tables.
-
return_type
¶Literal['columns']
, default:'columns'
) –For SQLite will always return the columns associated with the primary key. By default "columns".
-
**kwargs
¶Used for compatibility. Should be ignored.
Returns:
-
dict[str, list[str]]
–Returns a dict in the format {table_name: [pk_col1, pk_col2, ...], ...}.
Source code in echo_connhandler/sql_lite.py
@validate_call
def get_table_pkey(self, table_names: list[str], return_type: Literal["columns"] = "columns", **kwargs) -> dict[str, list[str]]: # noqa
"""Method to get the primary key of a table.
Can either return the name of the primary key or the columns associated with the primary key (depending on the type of SQL)
Parameters
----------
schema : str
Name of the schema where the tables are located.
table_names : list[str]
Name of the desired tables.
return_type : Literal["columns"], optional
For SQLite will always return the columns associated with the primary key. By default "columns".
**kwargs
Used for compatibility. Should be ignored.
Returns
-------
dict[str, list[str]]
Returns a dict in the format {table_name: [pk_col1, pk_col2, ...], ...}.
"""
existing_tables = self.get_table_names()
table_pkey = {}
for table in table_names:
if table not in existing_tables:
raise ValueError(f"Table '{table}' does not exist.")
query = sql.SQL("SELECT l.name FROM pragma_table_info({table_name}) as l WHERE l.pk <> 0").format(
table_name=sql.SQL(f'"{table}"'),
)
table_pkey[table] = self.read_to_pandas(query=query)["name"].to_list()
return table_pkey
pandas_to_sql(df, table_name, if_exists='fail', ignore_index=False, index_name=None, pkey_cols=None, conflict_cols=None, **kwargs)
¶
Method to write a Pandas DataFrame to a SQL table.
This overrides the base class method as SQLite does not need a schema parameter.
Parameters:
-
df
¶DataFrame
) –DataFrame to be written.
-
table_name
¶str
) –Name of the table to be written to.
-
if_exists
¶Literal['fail', 'replace', 'append']
, default:'fail'
) –What to do if the table already exists.
- fail: Will raise a ValueError.
- replace: Will drop the table and write the DataFrame to a new table.
- append: Will append the DataFrame to the existing table, skipping rows that already exist.
- update: Will update the existing table with the DataFrame. This is the same as append but will update rows that already exist.
- skip_row_check: Will skip the check if the rows already exist and write the DataFrame to the table. It assumes no insert conflicts.
By default "fail".
-
ignore_index
¶bool
, default:False
) –Whether to ignore the index of the DataFrame, by default False.
-
index_name
¶str | None
, default:None
) –Name to set the index to. If None, the index will not be renamed. By default None.
-
pkey_cols
¶list[str] | None
, default:None
) –Name of the columns to set as primary key. If None, no primary key will be set. If is the same name as the index, index will be used.
This is only applicable when replace is used or if the table does not exist.
By default None.
-
conflict_cols
¶list[str] | None
, default:None
) –List of columns to be used to check for conflict instead of the primary key. If not set, the primary key will be used. By default None.
-
**kwargs
¶Used for backwards compatibility. Should be ignored.
Source code in echo_connhandler/sql_lite.py
@validate_call
def pandas_to_sql(
self,
df: pd.DataFrame,
table_name: str,
if_exists: Literal["fail", "replace", "append", "update", "skip_row_check"] = "fail",
ignore_index: bool = False,
index_name: str | None = None,
pkey_cols: list[str] | None = None,
conflict_cols: list[str] | None = None,
**kwargs,
) -> None:
"""Method to write a Pandas DataFrame to a SQL table.
This overrides the base class method as SQLite does not need a schema parameter.
Parameters
----------
df : pd.DataFrame
DataFrame to be written.
table_name : str
Name of the table to be written to.
if_exists : Literal["fail", "replace", "append"], optional
What to do if the table already exists.
- fail: Will raise a ValueError.
- replace: Will drop the table and write the DataFrame to a new table.
- append: Will append the DataFrame to the existing table, skipping rows that already exist.
- update: Will update the existing table with the DataFrame. This is the same as append but will update rows that already exist.
- skip_row_check: Will skip the check if the rows already exist and write the DataFrame to the table. It assumes no insert conflicts.
By default "fail".
ignore_index : bool, optional
Whether to ignore the index of the DataFrame, by default False.
index_name : str | None, optional
Name to set the index to. If None, the index will not be renamed. By default None.
pkey_cols : list[str] | None, optional
Name of the columns to set as primary key. If None, no primary key will be set. If is the same name as the index, index will be used.
This is only applicable when replace is used or if the table does not exist.
By default None.
conflict_cols : list[str] | None, optional
List of columns to be used to check for conflict instead of the primary key. If not set, the primary key will be used.
By default None.
**kwargs
Used for backwards compatibility. Should be ignored.
"""
t1 = perf_counter()
# pre-processing
df = self._pre_df_to_sql(
df=df,
schema=None,
table_name=table_name,
if_exists=if_exists,
ignore_index=ignore_index,
index_name=index_name,
pkey_cols=pkey_cols,
conflict_cols=conflict_cols,
**kwargs,
)
# checking if DataFrame is empty
if df.empty:
logger.debug("DataFrame is empty. Nothing to do.")
return
self._pandas_to_sql(
df=df,
table_name=table_name,
if_exists=if_exists,
conflict_cols=conflict_cols,
**kwargs,
)
logger.debug(
f"DataFrame of shape {df.shape} written to table {table_name} in {perf_counter() - t1:.2f} seconds.",
)