A Python library for database connections and ORM queries with support for multiple database engines including SQLite, PostgreSQL, MySQL, MSSQL, Oracle, and MongoDB.
- Installation
- Features
- Database Classes
- Database Engines
- Database Utilities
- Development
- License
- Support
- Multiple Database Support: SQLite, PostgreSQL, MySQL, MSSQL, Oracle, and MongoDB
- Sync and Async Support: Both synchronous and asynchronous operations
- Environment Configuration: Optional parameters with
.env
file fallback - SQLAlchemy Integration: Built on top of SQLAlchemy ORM
- Connection Pooling: Configurable connection pooling for better performance
Synchronous Sessions:
autoflush = True
expire_on_commit = True
echo = False
Asynchronous Sessions:
autoflush = True
expire_on_commit = False
echo = False
Note: All constructor parameters are optional and fall back to .env file variables.
pip install ddcDatabases
Note: The basic installation includes only SQlite. Database-specific drivers are optional extras that you can install as needed.
Install only the database drivers you need:
# All database drivers (recommended for development)
pip install ddcDatabases[all]
# SQL Server / MSSQL
pip install ddcDatabases[mssql]
# MySQL / MariaDB
pip install ddcDatabases[mysql]
# PostgreSQL
pip install ddcDatabases[pgsql]
# Oracle Database
pip install ddcDatabases[oracle]
# MongoDB
pip install ddcDatabases[mongodb]
# Multiple databases (example)
pip install ddcDatabases[mysql,pgsql,mongodb]
Available Database Extras:
all
- All database driversmssql
- Microsoft SQL Server (pyodbc, aioodbc)mysql
- MySQL/MariaDB (pymysql, aiomysql)pgsql
- PostgreSQL (psycopg2-binary, asyncpg)oracle
- Oracle Database (cx-oracle)mongodb
- MongoDB (pymongo)
Platform Notes:
- SQLite support is included by default (no extra installation required)
- PostgreSQL extras may have compilation requirements on some systems
- All extras support both synchronous and asynchronous operations where applicable
class Sqlite(
filepath: Optional[str] = None,
echo: Optional[bool] = None,
autoflush: Optional[bool] = None,
expire_on_commit: Optional[bool] = None,
extra_engine_args: Optional[dict] = None,
)
Example:
import sqlalchemy as sa
from ddcDatabases import DBUtils, Sqlite
from your_models import User # Your SQLAlchemy model
with Sqlite() as session:
db_utils = DBUtils(session)
stmt = sa.select(User).where(User.id == 1)
results = db_utils.fetchall(stmt)
for row in results:
print(row)
class MSSQL(
host: Optional[str] = None,
port: Optional[int] = None,
user: Optional[str] = None,
password: Optional[str] = None,
database: Optional[str] = None,
schema: Optional[str] = None,
echo: Optional[bool] = None,
pool_size: Optional[int] = None,
max_overflow: Optional[int] = None,
autoflush: Optional[bool] = None,
expire_on_commit: Optional[bool] = None,
extra_engine_args: Optional[dict] = None,
)
Synchronous Example:
import sqlalchemy as sa
from ddcDatabases import DBUtils, MSSQL
from your_models import User
with MSSQL() as session:
stmt = sa.select(User).where(User.id == 1)
db_utils = DBUtils(session)
results = db_utils.fetchall(stmt)
for row in results:
print(row)
Asynchronous Example:
import sqlalchemy as sa
from ddcDatabases import DBUtilsAsync, MSSQL
from your_models import User
async def main():
async with MSSQL() as session:
stmt = sa.select(User).where(User.id == 1)
db_utils = DBUtilsAsync(session)
results = await db_utils.fetchall(stmt)
for row in results:
print(row)
class PostgreSQL(
host: Optional[str] = None,
port: Optional[int] = None,
user: Optional[str] = None,
password: Optional[str] = None,
database: Optional[str] = None,
echo: Optional[bool] = None,
autoflush: Optional[bool] = None,
expire_on_commit: Optional[bool] = None,
engine_args: Optional[dict] = None,
)
Synchronous Example:
import sqlalchemy as sa
from ddcDatabases import DBUtils, PostgreSQL
from your_models import User
with PostgreSQL() as session:
stmt = sa.select(User).where(User.id == 1)
db_utils = DBUtils(session)
results = db_utils.fetchall(stmt)
for row in results:
print(row)
Asynchronous Example:
import sqlalchemy as sa
from ddcDatabases import DBUtilsAsync, PostgreSQL
from your_models import User
async def main():
async with PostgreSQL() as session:
stmt = sa.select(User).where(User.id == 1)
db_utils = DBUtilsAsync(session)
results = await db_utils.fetchall(stmt)
for row in results:
print(row)
Synchronous Example:
import sqlalchemy as sa
from ddcDatabases import DBUtils, MySQL
with MySQL() as session:
stmt = sa.text("SELECT * FROM users WHERE id = :user_id")
db_utils = DBUtils(session)
results = db_utils.fetchall(stmt, {"user_id": 1})
for row in results:
print(row)
class Oracle(
host: Optional[str] = None,
port: Optional[int] = None,
user: Optional[str] = None,
password: Optional[str] = None,
servicename: Optional[str] = None,
echo: Optional[bool] = None,
autoflush: Optional[bool] = None,
expire_on_commit: Optional[bool] = None,
extra_engine_args: Optional[dict] = None,
)
Example with explicit credentials:
import sqlalchemy as sa
from ddcDatabases import DBUtils, Oracle
credentials = {
"host": "127.0.0.1",
"user": "system",
"password": "oracle",
"servicename": "xe",
"echo": False,
}
with Oracle(**credentials) as session:
stmt = sa.text("SELECT * FROM dual")
db_utils = DBUtils(session)
results = db_utils.fetchall(stmt)
for row in results:
print(row)
class MongoDB(
host: Optional[str] = None,
port: Optional[int] = None,
user: Optional[str] = None,
password: Optional[str] = None,
database: Optional[str] = None,
batch_size: Optional[int] = None,
limit: Optional[int] = None,
)
Example with explicit credentials:
from ddcDatabases.mongodb import MongoDB
from bson.objectid import ObjectId
credentials = {
"host": "127.0.0.1",
"user": "admin",
"password": "admin",
"database": "admin",
}
with MongoDB(**credentials) as mongodb:
query = {"_id": ObjectId("6772cf60f27e7e068e9d8985")}
collection = "movies"
with mongodb.cursor(collection, query) as cursor:
for document in cursor:
print(document)
Access the underlying SQLAlchemy engine for advanced operations:
Synchronous Engine:
from ddcDatabases import PostgreSQL
with PostgreSQL() as session:
engine = session.bind
# Use engine for advanced operations
Asynchronous Engine:
from ddcDatabases import PostgreSQL
async def main():
async with PostgreSQL() as session:
engine = session.bind
# Use engine for advanced operations
The DBUtils
and DBUtilsAsync
classes provide convenient methods for common database operations:
from ddcDatabases import DBUtils, DBUtilsAsync
# Synchronous utilities
db_utils = DBUtils(session)
results = db_utils.fetchall(stmt) # Returns list of RowMapping objects
value = db_utils.fetchvalue(stmt) # Returns single value as string
db_utils.insert(stmt) # Insert into model table
db_utils.deleteall(model) # Delete all records from model
db_utils.insertbulk(model, data_list) # Bulk insert from list of dictionaries
db_utils.execute(stmt) # Execute any SQLAlchemy statement
# Asynchronous utilities (similar interface with await)
db_utils_async = DBUtilsAsync(session)
results = await db_utils_async.fetchall(stmt)
poetry build -f wheel
poetry update --with test
poe tests
Released under the MIT License
If you find this project helpful, consider supporting development: