diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 51a9980..2394c9b 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -45,6 +45,7 @@ jobs: pip install twine pip install camelx pip install pyyaml + pip install vzool-config pip install pytest==8.2.2 pip install pytest-runner==6.0.1 - name: Test with pytest diff --git a/zakat/zakat_tracker.py b/zakat/zakat_tracker.py index 966aa65..3d16557 100644 --- a/zakat/zakat_tracker.py +++ b/zakat/zakat_tracker.py @@ -2468,8 +2468,27 @@ def test(debug: bool = False) -> bool: class SQLiteDatabase: - def __init__(self, db_file): + """ + Represents a connection to a SQLite database. + + Provides methods for interacting with the database, such as executing SQL queries. + + Attributes: + conn (sqlite3.Connection): The SQLite connection object. + debug (bool): Whether to enable debug mode (default: False). + cursor (sqlite3.Cursor): The SQLite cursor object. + """ + + def __init__(self, db_file: str, debug: bool = False): + """ + Initializes a new SQLiteDatabase instance. + + Parameters: + db_file (str): The path to the SQLite database file. + debug (bool, optional): Whether to enable debug mode. Defaults to False. + """ self.conn = sqlite3.connect(db_file) + self.debug = debug self.cursor = self.conn.cursor() def sql(self, query): @@ -2480,7 +2499,7 @@ def sql(self, query): print(f'Exception: {e}') print(f'SQL: {query}') - def create_table(self, table_name, columns): + def create_table(self, table_name: str, columns: dict): """Creates a table with specified columns.""" column_str = ", ".join([f"{col} {data_type}" for col, data_type in columns.items()]) query = f"CREATE TABLE IF NOT EXISTS {table_name} ({column_str})" @@ -2491,31 +2510,37 @@ def create_table(self, table_name, columns): print(f'Exception: {e}') print(f'SQL: {query}') - def insert(self, table_name, data): + def insert(self, table_name: str, data: dict) -> dict: """Inserts data into a table.""" columns = ", ".join(data.keys()) values = ", ".join(["?" for _ in data.values()]) query = f"INSERT INTO {table_name} ({columns}) VALUES ({values})" + if self.debug: + print(f'SQL: {query}') self.cursor.execute(query, tuple(data.values())) self.conn.commit() + data['id'] = self.cursor.lastrowid + return data - def select(self, table_name, columns=None, where=None): + def select(self, table_name: str, columns: list[str] = None, where: str = None) -> list: """Selects data from a table.""" columns = "*" if columns is None else ", ".join(columns) query = f"SELECT {columns} FROM {table_name}" if where: query += f" WHERE {where}" + if self.debug: + print(f'SQL: {query}') self.cursor.execute(query) return self.cursor.fetchall() - def update(self, table_name, data, where): + def update(self, table_name: str, data: dict, where: str): """Updates data in a table.""" set_clause = ", ".join([f"{col} = ?" for col in data]) query = f"UPDATE {table_name} SET {set_clause} WHERE {where}" self.cursor.execute(query, tuple(data.values()) + (where,)) self.conn.commit() - def delete(self, table_name, where): + def delete(self, table_name: str, where: str): """Deletes data from a table.""" query = f"DELETE FROM {table_name} WHERE {where}" self.cursor.execute(query) @@ -2534,19 +2559,26 @@ class SQLiteModel(Model): It may offer features like automatic mapping to database columns, validation, and query building. """ - def __init__(self, db_path: str = "./zakat_db/zakat.sqlite", history_mode: bool = True): - self._db = None + def __init__(self, db_path: str = "./zakat_db/zakat.sqlite", history_mode: bool = True, debug: bool = False): + """ + Initializes a new SQLiteModel instance. + + Parameters: + db_path (str, optional): The path to the SQLite database file. Defaults to "./zakat_db/zakat.sqlite". + history_mode (bool, optional): Whether to enable history mode. Defaults to True. + debug (bool, optional): Whether to enable debug mode. Defaults to False. + """ self._base_path = None self._vault_path = None self._history_mode = None self.path(db_path) + self.db = SQLiteDatabase(db_file=self.path(), debug=debug) self.create_db() self.config = ConfigManager(self.path()) self.history(history_mode) def create_db(self): - self._db = SQLiteDatabase(self.path()) - self._db.sql(""" + self.db.sql(""" CREATE TABLE IF NOT EXISTS account( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, @@ -2559,7 +2591,7 @@ def create_db(self): CONSTRAINT account_name_uk UNIQUE(name) ); """) - self._db.sql(""" + self.db.sql(""" CREATE TABLE IF NOT EXISTS box( id INTEGER PRIMARY KEY AUTOINCREMENT, account_id INTEGER NOT NULL, @@ -2575,7 +2607,7 @@ def create_db(self): CONSTRAINT box_account_id_fk FOREIGN KEY(account_id) REFERENCES account(id) ); """) - self._db.sql(""" + self.db.sql(""" CREATE TABLE IF NOT EXISTS log( id INTEGER PRIMARY KEY AUTOINCREMENT, account_id INTEGER NOT NULL, @@ -2588,7 +2620,7 @@ def create_db(self): CONSTRAINT log_account_id_fk FOREIGN KEY(account_id) REFERENCES account(id) ); """) - self._db.sql(""" + self.db.sql(""" CREATE TABLE IF NOT EXISTS file( id INTEGER PRIMARY KEY AUTOINCREMENT, log_id INTEGER NOT NULL, @@ -2600,7 +2632,7 @@ def create_db(self): CONSTRAINT file_log_id_fk FOREIGN KEY(log_id) REFERENCES log(id) ); """) - self._db.sql(""" + self.db.sql(""" CREATE TABLE IF NOT EXISTS exchange( id INTEGER PRIMARY KEY AUTOINCREMENT, account_id INTEGER NOT NULL, @@ -2612,7 +2644,7 @@ def create_db(self): CONSTRAINT exchange_account_id_fk FOREIGN KEY(account_id) REFERENCES account(id) ); """) - self._db.sql(""" + self.db.sql(""" CREATE TABLE IF NOT EXISTS action( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, @@ -2621,14 +2653,14 @@ def create_db(self): ); """) actions = [(x.value, x.name) for x in Action] - if len(self._db.select('action')) != len(actions): - self._db.sql(""" + if len(self.db.select('action')) != len(actions): + self.db.sql(""" DELETE FROM action; """) print('xxx', actions) for ref, name in actions: - self._db.insert('action', {'id': ref, 'name': name}) - self._db.sql(""" + self.db.insert('action', {'id': ref, 'name': name}) + self.db.sql(""" CREATE TABLE IF NOT EXISTS math( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, @@ -2637,14 +2669,14 @@ def create_db(self): ); """) maths = [(x.value, x.name) for x in MathOperation] - if len(self._db.select('math')) != len(maths): - self._db.sql(""" + if len(self.db.select('math')) != len(maths): + self.db.sql(""" DELETE FROM math; """) print('xxx', maths) for ref, name in maths: - self._db.insert('math', {'id': ref, 'name': name}) - self._db.sql(""" + self.db.insert('math', {'id': ref, 'name': name}) + self.db.sql(""" CREATE TABLE IF NOT EXISTS history( id INTEGER PRIMARY KEY AUTOINCREMENT, time INTEGER NOT NULL, @@ -2661,7 +2693,7 @@ def create_db(self): CONSTRAINT history_account_id_fk FOREIGN KEY(account_id) REFERENCES account(id) ); """) - self._db.sql(""" + self.db.sql(""" CREATE TABLE IF NOT EXISTS report( id INTEGER PRIMARY KEY AUTOINCREMENT, time INTEGER NOT NULL, @@ -2671,9 +2703,6 @@ def create_db(self): ); """) - def db(self): - return self._db - def path(self, path: str = None) -> str: if path is None: return self._vault_path @@ -2724,7 +2753,15 @@ def exchanges(self, account: int) -> dict | None: pass def account(self, name: str, ref: int = None) -> tuple[int, str]: - pass + account = self.db.select( + table_name='account', + columns=['id', 'name'], + where=f'name = "{name}"', + ) + if account: + return account[0]['id'], account[0]['name'] + account = self.db.insert(table_name='account', data={'name': name}) + return account['id'], account['name'] def transfer(self, unscaled_amount: float | int | Decimal, from_account: int, to_account: int, desc: str = '', created: int = None, debug: bool = False) -> list[int]: