CRUD Four Ways
Implement the exact same tasks CRUD — create, get, list, update,
delete — four different ways against one Postgres table, and feel the
trade-offs in your fingers. Raw asyncpg, SQLAlchemy 2.0 Core, SQLAlchemy 2.0 ORM,
and SQLModel. Same table, same five operations, four very different code shapes.
By the end you’ll have an opinion about which one you’d reach for, and why.
This is the Python version of a debate every backend dev has had: hand-written
SQL (pgx in Go, postgres.js in Node) vs. a typed query builder (Knex, Kysely)
vs. a full ORM (GORM, Prisma, TypeORM).
What you’ll practice
Section titled “What you’ll practice”- A
uvproject withasyncpg,sqlalchemy,sqlmodel, andalembic. - The same async CRUD written four ways against one pool/engine.
- An Alembic async migration that creates the
taskstable. async withtransactions and session-per-operation discipline.- Running everything with
uv run, formatted withruff, typed withty.
Requirements
Section titled “Requirements”- Shared Postgres 17 running (
docker compose up -d postgresinshared-infra). - One table:
tasks(id, title, done, created_at). - One Alembic migration that creates it.
- Four modules —
crud_asyncpg.py,crud_core.py,crud_orm.py,crud_sqlmodel.py— each exposingcreate / get / list_tasks / update / delete. - A
main.pythat runs the same five operations through each module so you can diff the output.
The worked solution
Section titled “The worked solution”Directoryfour-ways-crud/
- pyproject.toml uv project, deps, ruff/ty config
- alembic.ini Alembic config
Directoryalembic/
- env.py async migration environment
- versions/0001_create_tasks.py the migration
Directorysrc/app/
- init .py
- models.py SQLAlchemy ORM
Base+Task(also Alembic’s target metadata) - db.py async engine +
async_sessionmaker - crud_asyncpg.py way 1 — raw asyncpg pool
- crud_core.py way 2 — SQLAlchemy Core expression language
- crud_orm.py way 3 — SQLAlchemy 2.0 ORM,
AsyncSession - crud_sqlmodel.py way 4 — SQLModel
- main.py runs all four
Project setup
Section titled “Project setup”uv init four-ways-crudcd four-ways-cruduv add asyncpg sqlalchemy sqlmodel alembicuv add --dev ruff ty[project]name = "four-ways-crud"version = "0.1.0"requires-python = ">=3.13"dependencies = [ "alembic>=1.14", "asyncpg>=0.30", "sqlalchemy[asyncio]>=2.0.36", "sqlmodel>=0.0.22",]
[dependency-groups]dev = ["ruff>=0.8", "ty>=0.0.1a1"]
[tool.ruff]line-length = 100The shared table
Section titled “The shared table”One migration creates the table. The ORM model below is also Alembic’s autogenerate target, so the schema has a single source of truth.
"""create tasks table"""import sqlalchemy as safrom alembic import op
revision = "0001"down_revision = None
def upgrade() -> None: op.create_table( "tasks", sa.Column("id", sa.BigInteger(), primary_key=True, autoincrement=True), sa.Column("title", sa.String(length=255), nullable=False), sa.Column("done", sa.Boolean(), server_default=sa.false(), nullable=False), sa.Column( "created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False, ), )
def downgrade() -> None: op.drop_table("tasks")The async Alembic environment (from alembic init -t async alembic) just needs
its metadata and URL wired up:
# ... template boilerplate above ...from app.models import Base
target_metadata = Base.metadataconfig.set_main_option( "sqlalchemy.url", "postgresql+asyncpg://dev:dev@localhost:5432/app")# ... template run_migrations_online() below uses an async engine ...The shared engine and model
Section titled “The shared engine and model”from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
ASYNCPG_DSN = "postgresql://dev:dev@localhost:5432/app"SQLA_DSN = "postgresql+asyncpg://dev:dev@localhost:5432/app"
engine = create_async_engine(SQLA_DSN, pool_pre_ping=True)SessionLocal = async_sessionmaker(engine, expire_on_commit=False)import datetime as dt
from sqlalchemy import String, funcfrom sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase): pass
class Task(Base): __tablename__ = "tasks"
id: Mapped[int] = mapped_column(primary_key=True) title: Mapped[str] = mapped_column(String(255)) done: Mapped[bool] = mapped_column(default=False, server_default="false") created_at: Mapped[dt.datetime] = mapped_column(server_default=func.now())The same five operations, four ways
Section titled “The same five operations, four ways”Here is the heart of the exercise: create, get, list_tasks, update, and
delete, written once per approach. Flip the tabs and watch the same five
operations change shape — the amount of magic goes up and the amount of
hand-written SQL goes down as you move left to right.
Raw async SQL and a hand-written row-to-dict map. No ORM, no magic: you write
the INSERT ... RETURNING, you pull each column out of the Record yourself,
you read the affected-row count. This is exactly Go’s pgx or Node’s
postgres.js — maximum control, maximum boilerplate. Note Postgres-native
$1, $2 placeholders.
import asyncpg
from app.db import ASYNCPG_DSN
async def make_pool() -> asyncpg.Pool: return await asyncpg.create_pool(ASYNCPG_DSN, min_size=2, max_size=10)
async def create(pool: asyncpg.Pool, title: str) -> dict: row = await pool.fetchrow( "INSERT INTO tasks (title) VALUES ($1) RETURNING id, title, done, created_at", title, ) return dict(row)
async def get(pool: asyncpg.Pool, task_id: int) -> dict | None: row = await pool.fetchrow("SELECT * FROM tasks WHERE id = $1", task_id) return dict(row) if row else None
async def list_tasks(pool: asyncpg.Pool) -> list[dict]: rows = await pool.fetch("SELECT * FROM tasks ORDER BY id") return [dict(r) for r in rows]
async def update(pool: asyncpg.Pool, task_id: int, title: str, done: bool) -> bool: status = await pool.execute( "UPDATE tasks SET title = $1, done = $2 WHERE id = $3", title, done, task_id ) return status.endswith("1") # "UPDATE 1"
async def delete(pool: asyncpg.Pool, task_id: int) -> bool: status = await pool.execute("DELETE FROM tasks WHERE id = $1", task_id) return status.endswith("1") # "DELETE 1"The typed expression language. You build queries from Table/Column objects —
select, insert, update, delete — that read like SQL but are composable
Python the type checker can see. No string SQL, no ORM entity tracking. This is
the Knex/Kysely sweet spot. engine.begin() wraps each call in a transaction.
import sqlalchemy as safrom sqlalchemy import delete as sa_deletefrom sqlalchemy import insert, select, update as sa_updatefrom sqlalchemy.ext.asyncio import AsyncEngine
# A standalone Core table (mirrors the ORM model; could also reuse Task.__table__)metadata = sa.MetaData()tasks = sa.Table( "tasks", metadata, sa.Column("id", sa.BigInteger, primary_key=True), sa.Column("title", sa.String(255), nullable=False), sa.Column("done", sa.Boolean, nullable=False), sa.Column("created_at", sa.DateTime(timezone=True)),)
async def create(engine: AsyncEngine, title: str) -> dict: async with engine.begin() as conn: result = await conn.execute( insert(tasks).values(title=title).returning(*tasks.c) ) return dict(result.mappings().one())
async def get(engine: AsyncEngine, task_id: int) -> dict | None: async with engine.connect() as conn: result = await conn.execute(select(tasks).where(tasks.c.id == task_id)) row = result.mappings().one_or_none() return dict(row) if row else None
async def list_tasks(engine: AsyncEngine) -> list[dict]: async with engine.connect() as conn: result = await conn.execute(select(tasks).order_by(tasks.c.id)) return [dict(r) for r in result.mappings()]
async def update(engine: AsyncEngine, task_id: int, title: str, done: bool) -> bool: async with engine.begin() as conn: result = await conn.execute( sa_update(tasks).where(tasks.c.id == task_id).values(title=title, done=done) ) return result.rowcount > 0
async def delete(engine: AsyncEngine, task_id: int) -> bool: async with engine.begin() as conn: result = await conn.execute(sa_delete(tasks).where(tasks.c.id == task_id)) return result.rowcount > 0The full ORM end. You query mapped Task objects with select() and
session.scalars(), and updates are just attribute mutations — the unit of work
emits the SQL at flush. The identity map, dirty tracking, and lazy-by-default
(disabled here) come along. This is GORM/Prisma/TypeORM territory, the modern
2.0 style (no legacy Query).
from sqlalchemy import selectfrom sqlalchemy.ext.asyncio import AsyncSession
from app.models import Task
async def create(session: AsyncSession, title: str) -> Task: task = Task(title=title) session.add(task) await session.commit() await session.refresh(task) return task
async def get(session: AsyncSession, task_id: int) -> Task | None: return await session.get(Task, task_id)
async def list_tasks(session: AsyncSession) -> list[Task]: result = await session.scalars(select(Task).order_by(Task.id)) return list(result.all())
async def update(session: AsyncSession, task_id: int, title: str, done: bool) -> bool: task = await session.get(Task, task_id) if task is None: return False task.title = title # dirty tracking — no explicit UPDATE task.done = done await session.commit() return True
async def delete(session: AsyncSession, task_id: int) -> bool: task = await session.get(Task, task_id) if task is None: return False await session.delete(task) await session.commit() return TrueOne class is both the table and the Pydantic model. The CRUD reads almost like
the ORM version — because SQLModel is the SQLAlchemy ORM underneath — but the
Task here also validates, serializes, and produces an OpenAPI schema for free,
which is why it shines in FastAPI. session.exec() is SQLModel’s typed wrapper
over session.execute().
from sqlmodel import Field, SQLModel, selectfrom sqlmodel.ext.asyncio.session import AsyncSession
class Task(SQLModel, table=True): __tablename__ = "tasks" id: int | None = Field(default=None, primary_key=True) title: str = Field(max_length=255) done: bool = Field(default=False)
async def create(session: AsyncSession, title: str) -> Task: task = Task(title=title) session.add(task) await session.commit() await session.refresh(task) return task
async def get(session: AsyncSession, task_id: int) -> Task | None: return await session.get(Task, task_id)
async def list_tasks(session: AsyncSession) -> list[Task]: result = await session.exec(select(Task).order_by(Task.id)) return list(result.all())
async def update(session: AsyncSession, task_id: int, title: str, done: bool) -> bool: task = await session.get(Task, task_id) if task is None: return False task.title = title task.done = done session.add(task) await session.commit() return True
async def delete(session: AsyncSession, task_id: int) -> bool: task = await session.get(Task, task_id) if task is None: return False await session.delete(task) await session.commit() return TrueOne driver, four backends
Section titled “One driver, four backends”main.py runs the same five operations through each module and prints the result
so you can see they’re identical. The created_at column is filled server-side
by Postgres, so every approach reads back the same row.
import asyncio
from sqlmodel.ext.asyncio.session import AsyncSession as SQLModelSession
from app import crud_asyncpg, crud_core, crud_orm, crud_sqlmodelfrom app.db import SessionLocal, engine
async def main() -> None: # Way 1 — raw asyncpg (its own pool) pool = await crud_asyncpg.make_pool() t = await crud_asyncpg.create(pool, "asyncpg task") await crud_asyncpg.update(pool, t["id"], "asyncpg task (done)", True) print("asyncpg :", await crud_asyncpg.get(pool, t["id"])) await crud_asyncpg.delete(pool, t["id"]) await pool.close()
# Way 2 — SQLAlchemy Core (uses the shared engine) t2 = await crud_core.create(engine, "core task") print("core :", await crud_core.get(engine, t2["id"])) await crud_core.delete(engine, t2["id"])
# Way 3 — SQLAlchemy ORM (session per operation) async with SessionLocal() as session: t3 = await crud_orm.create(session, "orm task") print("orm :", (await crud_orm.get(session, t3.id)).title) await crud_orm.delete(session, t3.id)
# Way 4 — SQLModel async with SQLModelSession(engine) as session: t4 = await crud_sqlmodel.create(session, "sqlmodel task") print("sqlmodel:", (await crud_sqlmodel.get(session, t4.id)).title) await crud_sqlmodel.delete(session, t4.id)
await engine.dispose()
if __name__ == "__main__": asyncio.run(main())What changed, and what it tells you
Section titled “What changed, and what it tells you”Read the tabs top to bottom and the trade-off is physical:
- asyncpg — most explicit. You own every SQL string and every column read. Zero magic, highest performance ceiling, but a typo in a column name is a runtime error and you maintain the mapping by hand.
- SQLAlchemy Core — typed query builder. No string SQL, no entity overhead;
the
Tableis the single source of truth and the expressions read like the SQL they generate. The reporting/dynamic-query sweet spot. - SQLAlchemy ORM — fewest moving parts for a rich domain.
updateis just mutating an attribute; the unit of work does the SQL. The cost is hidden behavior (identity map, flush timing) you occasionally have to reason about. - SQLModel — the ORM ergonomics plus Pydantic validation in one class. Least boilerplate for FastAPI CRUD; you trade some depth (you fall back to SQLAlchemy for advanced cases).
There’s no universal winner. asyncpg for hot paths and bulk, Core for reporting, the ORM for rich domains, SQLModel for FastAPI CRUD — and a real app can mix them over the same engine.
Run it
Section titled “Run it”-
Start Postgres from the shared infra (one-time per session):
Terminal window cd shared-infradocker compose up -d postgres -
Apply the migration to create the
taskstable:Terminal window uv run alembic upgrade head -
Run all four approaches:
Terminal window uv run python -m app.mainYou’ll see one line per approach, each reading back a row it just wrote:
asyncpg : {'id': 1, 'title': 'asyncpg task (done)', 'done': True, ...}core : {'id': 2, 'title': 'core task', 'done': False, ...}orm : orm tasksqlmodel: sqlmodel task -
Lint, format, and type-check:
Terminal window uv run ruff format .uv run ruff check .uv run ty check src -
Roll the schema back when you’re done experimenting:
Terminal window uv run alembic downgrade -1