PostgreSQL Access
Everything you know about database access from TypeScript (pg/postgres.js, Drizzle, Prisma) and Go (database/sql + pgx, sqlc, GORM) has a Python counterpart. The big shift for the 2026 Python stack: it is async-first. You talk to Postgres over a non-blocking driver inside an event loop, the same way you do in Node, not with the blocking psycopg2-and-a-thread-pool model older Python tutorials assume.
This module covers the same tiny domain — a tasks table — four ways, from the lowest-level driver up to the highest-level model, so you can feel the trade-offs and pick deliberately:
- Raw asyncpg — a connection pool and parametrized queries. Fastest, lowest-level. The peer of Go’s
pgxand Node’spostgres.js. - SQLAlchemy 2.0 Core — a typed SQL expression language (
select(),insert()) over an async engine. The peer of Knex/Kysely or Go’ssquirrel. - SQLAlchemy 2.0 ORM (async) —
Mapped[...]models,AsyncSession, relationships, identity map. The peer of TypeORM/Prisma or GORM. - SQLModel — Pydantic v2 and SQLAlchemy fused into one model class. Optimized for FastAPI CRUD.
The Python database landscape
Section titled “The Python database landscape”Mapping from what you know
Section titled “Mapping from what you know”| Your background | Closest Python equivalent | Style |
|---|---|---|
pgx / database/sql (Go) | asyncpg | Raw async SQL, manual mapping |
pg / postgres.js (TS) | asyncpg | Raw async SQL, connection pool |
sqlc (Go) | (no exact peer) — closest is SQLAlchemy Core + typed rows | Typed queries |
| Knex / Kysely (TS) | SQLAlchemy 2.0 Core | Type-checked query builder |
squirrel (Go) | SQLAlchemy 2.0 Core | Expression-language builder |
| GORM (Go) | SQLAlchemy 2.0 ORM | Full ORM, identity map |
| TypeORM / Prisma (TS) | SQLAlchemy 2.0 ORM or SQLModel | ORM with model classes |
| Drizzle (TS) | SQLAlchemy 2.0 Core (closest typed builder) | Schema-as-code, typed |
| golang-migrate / Prisma Migrate | Alembic | Versioned migrations |
The four approaches at a glance
Section titled “The four approaches at a glance”The four sit on a spectrum from raw driver control on the left to model-level convenience on the right:
flowchart LR A["asyncpg<br/>raw async SQL, manual mapping<br/>lowest level, fastest"] B["SQLAlchemy Core<br/>typed expression language, no ORM<br/>full SQL control, no entity overhead"] C["SQLAlchemy ORM<br/>Mapped[] models, identity map<br/>units of work, relations, rich domain logic"] D["SQLModel<br/>Pydantic + ORM, one class<br/>FastAPI-native CRUD, least boilerplate"] A -->|more convenience| B -->|more convenience| C -->|more convenience| D
| asyncpg | SQLAlchemy Core | SQLAlchemy ORM | SQLModel | |
|---|---|---|---|---|
| Level | Driver | Query builder | ORM | ORM + validation |
| Returns | Record (dict-like) | Row tuples | mapped objects | mapped objects (Pydantic) |
| Type safety | manual | typed columns | Mapped[T] | Mapped[T] + Pydantic |
| Identity map / dirty tracking | No | No | Yes | Yes |
| Relationships | manual joins | manual joins | relationship() + eager loading | Relationship() |
| Best for | hot paths, bulk, full SQL | reporting, dynamic SQL | rich domains | FastAPI CRUD |
Async, not blocking: the mental model
Section titled “Async, not blocking: the mental model”The one thing to internalize coming from Node: in 2026 Python you use a non-blocking driver inside the asyncio event loop, just like pg in Node. Every query is awaited; nothing blocks the loop. If you’ve seen blocking psycopg2 in old tutorials, that’s the legacy path — we don’t use it here.
Coming from Go, the difference is that there is no implicit parallelism: one event loop, one thread (see the Async module). A connection pool hands out connections to coroutines, and you must not hand the same connection or ORM session to two coroutines running concurrently — more on that in the gotchas.
// postgres.js — pooled, parametrized, asyncimport postgres from "postgres";
const sql = postgres("postgres://dev:dev@localhost:5432/app");
const rows = await sql` SELECT id, title, done FROM tasks WHERE done = ${false}`;for (const row of rows) console.log(row.id, row.title);
await sql.end();// pgx pool — parametrized, context-awarectx := context.Background()pool, err := pgxpool.New(ctx, "postgres://dev:dev@localhost:5432/app")if err != nil { log.Fatal(err)}defer pool.Close()
rows, err := pool.Query(ctx, "SELECT id, title, done FROM tasks WHERE done = $1", false)if err != nil { log.Fatal(err)}defer rows.Close()for rows.Next() { var id int64 var title string var done bool rows.Scan(&id, &title, &done) fmt.Println(id, title)}# asyncpg — pooled, parametrized, asyncimport asyncioimport asyncpg
async def main() -> None: pool = await asyncpg.create_pool("postgresql://dev:dev@localhost:5432/app") async with pool.acquire() as conn: rows = await conn.fetch( "SELECT id, title, done FROM tasks WHERE done = $1", False ) for row in rows: print(row["id"], row["title"]) await pool.close()
asyncio.run(main())Note that asyncpg, like pgx and Postgres itself, uses native $1, $2 placeholders — not the ? of JDBC or sqlite. SQLAlchemy abstracts placeholders away entirely.
1. Raw asyncpg
Section titled “1. Raw asyncpg”asyncpg is a from-scratch async Postgres driver — it does not implement DB-API, it speaks the Postgres binary protocol directly, and it is one of the fastest Postgres drivers in any language. It is the Python peer of Go’s pgx and Node’s postgres.js: a pool, parametrized queries, and rows you map yourself.
Connection pool
Section titled “Connection pool”You create one pool at startup and share it for the whole process — never open a connection per request.
import asyncpg
# Create once, at startup, then reuse for the whole process.async def make_pool() -> asyncpg.Pool: return await asyncpg.create_pool( "postgresql://dev:dev@localhost:5432/app", min_size=5, # connections kept warm max_size=20, # ceiling under load max_inactive_connection_lifetime=300.0, # recycle idle conns after 5 min command_timeout=30.0, )| Concern | Go (pgxpool) | TS (postgres.js) | Python (asyncpg) |
|---|---|---|---|
| Create pool | pgxpool.New | postgres(url, { max }) | asyncpg.create_pool |
| Max connections | pool_max_conns in URL | max | max_size |
| Min / warm | pool_min_conns | idle_timeout (inverse) | min_size |
| Acquire | pool.Acquire(ctx) | implicit per query | pool.acquire() |
| Query timeout | context.WithTimeout | { timeout } | command_timeout |
The query methods
Section titled “The query methods”asyncpg gives you four read shapes, all parametrized with $n:
async with pool.acquire() as conn: # All matching rows -> list[Record] rows = await conn.fetch("SELECT * FROM tasks WHERE done = $1", False)
# Single row or None row = await conn.fetchrow("SELECT * FROM tasks WHERE id = $1", 1)
# Single scalar value (first column of first row) count = await conn.fetchval("SELECT count(*) FROM tasks")
# No result set (INSERT/UPDATE/DELETE without RETURNING) await conn.execute("DELETE FROM tasks WHERE id = $1", 1)A Record is dict-like and tuple-like: row["title"] and row[1] both work. There is no automatic mapping to a class — you do it yourself, which is the whole point of staying at this level.
# Postgres-native IN: pass a list to ANY()ids = [1, 2, 3]rows = await conn.fetch("SELECT * FROM tasks WHERE id = ANY($1)", ids)Transactions
Section titled “Transactions”async with pool.acquire() as conn: async with conn.transaction(): await conn.execute("UPDATE accounts SET balance = balance - $1 WHERE id = $2", 100, 1) await conn.execute("UPDATE accounts SET balance = balance + $1 WHERE id = $2", 100, 2) # commits on clean exit, rolls back if the block raisesThis is exactly pgx’s conn.Begin(ctx) / tx.Commit() discipline, but with async with doing the commit/rollback for you, the way Go’s defer tx.Rollback() pattern wishes it could.
2. SQLAlchemy 2.0 Core
Section titled “2. SQLAlchemy 2.0 Core”SQLAlchemy 2.0 is two things in one package: Core (a typed SQL expression language and connection engine) and the ORM built on top of it. Core is the Knex/Kysely layer — you build queries from Python objects, get full SQL control, but no entity tracking. With the 2.0 async engine it runs on top of asyncpg.
Engine and table metadata
Section titled “Engine and table metadata”import sqlalchemy as safrom sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine( "postgresql+asyncpg://dev:dev@localhost:5432/app", pool_size=20, max_overflow=10, pool_pre_ping=True, # validate a conn before handing it out echo=False, # set True to log every SQL statement)
metadata = sa.MetaData()
tasks = sa.Table( "tasks", metadata, sa.Column("id", sa.BigInteger, primary_key=True, autoincrement=True), sa.Column("title", sa.String(255), nullable=False), sa.Column("done", sa.Boolean, nullable=False, server_default=sa.false()), sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()),)Note postgresql+asyncpg:// — the +asyncpg part tells SQLAlchemy to use asyncpg as the driver. Drop it and you get the blocking default; the async engine requires an async driver.
Querying with the expression language
Section titled “Querying with the expression language”from sqlalchemy import select, insert, update, delete
async with engine.begin() as conn: # begin() => transaction, auto-commit on exit # INSERT ... RETURNING id result = await conn.execute( insert(tasks).values(title="Write docs").returning(tasks.c.id) ) new_id = result.scalar_one()
# SELECT with a typed WHERE result = await conn.execute( select(tasks).where(tasks.c.done == False).order_by(tasks.c.id) # noqa: E712 ) for row in result.mappings(): # .mappings() => dict-like rows print(row["title"])
# UPDATE await conn.execute( update(tasks).where(tasks.c.id == new_id).values(done=True) )
# DELETE await conn.execute(delete(tasks).where(tasks.c.id == new_id))The query objects are composable Python — select(tasks).where(...) builds an AST, not a string, so a typo in a column name (tasks.c.titel) is caught the moment it runs, and ty/mypy can flag the obvious mistakes statically. This is the Knex/Drizzle value proposition.
3. SQLAlchemy 2.0 ORM (async)
Section titled “3. SQLAlchemy 2.0 ORM (async)”The ORM adds an identity map, change tracking (the “unit of work”), and relationship() navigation on top of Core. The modern 2.0 style uses Mapped[...] annotations and mapped_column() — fully typed, checked by ty/mypy.
Declarative models with Mapped
Section titled “Declarative models with Mapped”// TypeORM entity@Entity("tasks")export class Task { @PrimaryGeneratedColumn("increment") id: number;
@Column({ length: 255 }) title: string;
@Column({ default: false }) done: boolean;
@CreateDateColumn() createdAt: Date;}// GORM modeltype Task struct { ID int64 `gorm:"primaryKey"` Title string Done bool `gorm:"default:false"` CreatedAt time.Time}import datetime as dtfrom sqlalchemy import String, funcfrom sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase): pass
class Task(Base): __tablename__ = "tasks"
id: Mapped[int] = mapped_column(primary_key=True) title: Mapped[str] = mapped_column(String(255)) done: Mapped[bool] = mapped_column(default=False, server_default="false") created_at: Mapped[dt.datetime] = mapped_column(server_default=func.now())The Mapped[int] / Mapped[str] annotations are how SQLAlchemy 2.0 infers column types and gives you a typed attribute — task.title is a str to the type checker, task.done is a bool. Mapped[str | None] makes a column nullable. This is a genuine improvement over the old untyped Column(...) class attributes.
Async sessions
Section titled “Async sessions”The session is the unit-of-work scope. You build it from an async_sessionmaker and open one per logical operation (per request, in a web app).
from sqlalchemy.ext.asyncio import ( create_async_engine, async_sessionmaker, AsyncSession,)
engine = create_async_engine("postgresql+asyncpg://dev:dev@localhost:5432/app")
# expire_on_commit=False keeps objects usable after commit (important for web responses)SessionLocal = async_sessionmaker(engine, expire_on_commit=False)CRUD with select() + session.scalars()
Section titled “CRUD with select() + session.scalars()”from sqlalchemy import select
async with SessionLocal() as session: async with session.begin(): # one transaction; commits on clean exit # CREATE task = Task(title="Write docs") session.add(task) await session.flush() # assigns task.id without committing yet
# READ one — get by PK uses the identity map task = await session.get(Task, 1)
# READ list result = await session.scalars( select(Task).where(Task.done == False).order_by(Task.id) # noqa: E712 ) tasks = result.all() # list[Task], fully typed
# UPDATE — just mutate; the unit of work flushes the change async with session.begin(): task = await session.get(Task, 1) task.done = True # no explicit UPDATE — dirty tracking emits it at flush
# DELETE async with session.begin(): task = await session.get(Task, 1) await session.delete(task)session.scalars(select(Task)) returns the Task objects (the ORM unwraps the single-entity rows for you); session.execute(select(Task)) returns Row tuples instead. Use scalars when you select whole entities.
Relationships and the N+1 trap
Section titled “Relationships and the N+1 trap”Add a parent and a relationship():
from sqlalchemy import ForeignKeyfrom sqlalchemy.orm import relationship
class Project(Base): __tablename__ = "projects" id: Mapped[int] = mapped_column(primary_key=True) name: Mapped[str] = mapped_column(String(255)) tasks: Mapped[list["Task"]] = relationship(back_populates="project")
class Task(Base): __tablename__ = "tasks" id: Mapped[int] = mapped_column(primary_key=True) title: Mapped[str] = mapped_column(String(255)) project_id: Mapped[int | None] = mapped_column(ForeignKey("projects.id")) project: Mapped["Project | None"] = relationship(back_populates="tasks")The N+1 problem is the same one GORM and TypeORM have: loop over N projects, touch project.tasks on each, and you fire 1 + N queries. The fix is eager loading with selectinload (one extra query, batched by IN) or joinedload (a single JOIN):
from sqlalchemy.orm import selectinload
# 1 query for projects + 1 batched query for all their tasks = 2 totalresult = await session.scalars( select(Project).options(selectinload(Project.tasks)))for project in result: print(project.name, len(project.tasks)) # no extra query| N+1 fix | When to use | SQL emitted |
|---|---|---|
selectinload(rel) | one-to-many, large child sets | parent query + WHERE child.fk IN (...) |
joinedload(rel) | many-to-one, small fan-out | single LEFT JOIN |
.where(...).join(...) (manual) | full control, reporting | whatever you write |
4. SQLModel
Section titled “4. SQLModel”SQLModel is from the author of FastAPI. One class is both a Pydantic v2 model (validation, serialization, OpenAPI schema) and a SQLAlchemy ORM model (a real table). For FastAPI CRUD this collapses the usual “DB model + request schema + response schema” triplet into far less code.
from sqlmodel import SQLModel, Field
class Task(SQLModel, table=True): id: int | None = Field(default=None, primary_key=True) title: str = Field(max_length=255) done: bool = Field(default=False)table=True makes it a real table; the same class without it is a plain Pydantic model you can use as a request body. Querying uses SQLModel’s select() (a thin wrapper over SQLAlchemy’s) and the async session:
from sqlmodel import selectfrom sqlmodel.ext.asyncio.session import AsyncSession
async with AsyncSession(engine) as session: task = Task(title="Write docs") # also validates as a Pydantic model session.add(task) await session.commit() await session.refresh(task)
result = await session.exec(select(Task).where(Task.done == False)) # noqa: E712 for t in result.all(): print(t.title)The tradeoffs
Section titled “The tradeoffs”| SQLAlchemy ORM | SQLModel | |
|---|---|---|
| Model definition | Mapped[...] + mapped_column | one Field(...) per column |
| Validation | none (separate Pydantic schema) | built-in (it is Pydantic) |
| FastAPI integration | manual response models | model doubles as request/response |
| Escape hatch to Core | full | full (it’s SQLAlchemy underneath) |
| Maturity / docs for advanced cases | very high | thinner; you fall back to SQLAlchemy docs |
When to use which
Section titled “When to use which”| Factor | asyncpg | SQLAlchemy Core | SQLAlchemy ORM | SQLModel |
|---|---|---|---|---|
| Abstraction | none (raw SQL) | typed query builder | full ORM | ORM + validation |
| Type safety | manual | typed columns | Mapped[T] | Mapped[T] + Pydantic |
| Boilerplate | medium (map rows) | low | low | lowest |
| Performance ceiling | highest | high | good (object overhead) | good |
| Complex/dynamic SQL | easy (write it) | easy (compose it) | possible (drop to Core) | drop to Core |
| Rich domain + relations | painful | manual | excellent | good |
| FastAPI request/response | manual schemas | manual schemas | manual schemas | built in |
| Comparable to | pgx / postgres.js | Knex / Kysely / squirrel | GORM / TypeORM | Prisma-ish for FastAPI |
Reach for raw asyncpg on hot paths, bulk loads (copy_records_to_table), and queries where you want to own every byte of SQL.
Reach for SQLAlchemy Core for reporting and dynamic query building without entity overhead.
Reach for the SQLAlchemy ORM for rich domains with relationships, where the identity map and unit-of-work pay for themselves.
Reach for SQLModel when you’re building FastAPI CRUD and want one model to serve as table, validation, and OpenAPI schema.
And yes, you can mix them over the same engine/pool: ORM for domain writes, Core or asyncpg for the reporting query.
Connection pooling, sessions, and the gotchas
Section titled “Connection pooling, sessions, and the gotchas”Pool sizing
Section titled “Pool sizing”The defaults are conservative. Size the pool to your concurrency, and remember the hard ceiling is Postgres’s max_connections (default 100) shared across all your app instances.
| Knob | SQLAlchemy async engine | asyncpg pool | Rule of thumb |
|---|---|---|---|
| Steady-state size | pool_size | min_size | match expected concurrent queries |
| Burst headroom | max_overflow | max_size - min_size | small buffer for spikes |
| Liveness check | pool_pre_ping=True | built in | on; cheap insurance vs. stale conns |
| Recycle | pool_recycle=1800 | max_inactive_connection_lifetime | under any proxy idle timeout |
Transactions
Section titled “Transactions”The modern idiom is async with session.begin() — it opens a transaction and commits on a clean exit, rolls back on exception. No manual commit()/rollback().
async with SessionLocal() as session: async with session.begin(): session.add(Task(title="a")) session.add(Task(title="b")) # both committed together, or neitherDon’t share a session (or connection) across tasks
Section titled “Don’t share a session (or connection) across tasks”This is the asyncio-specific footgun, and it bites Go developers who reach for the goroutine-style “fan out over a shared handle” pattern.
import asyncio
# WRONG — shared session across concurrent tasksasync def bad(session): async with asyncio.TaskGroup() as tg: tg.create_task(session.scalars(select(Task))) # boom: concurrent use tg.create_task(session.scalars(select(Project)))
# RIGHT — a fresh session per taskasync def fetch_tasks(): async with SessionLocal() as s: return (await s.scalars(select(Task))).all()
async def fetch_projects(): async with SessionLocal() as s: return (await s.scalars(select(Project))).all()
async def good(): async with asyncio.TaskGroup() as tg: t1 = tg.create_task(fetch_tasks()) t2 = tg.create_task(fetch_projects()) return t1.result(), t2.result()Migrations with Alembic
Section titled “Migrations with Alembic”Alembic is SQLAlchemy’s migration tool — the peer of Prisma Migrate, golang-migrate, or Rails migrations. It autogenerates migrations by diffing your models against the live database, and applies them up or down.
| Tool | Style | Schema source |
|---|---|---|
| Prisma Migrate (TS) | schema-first, auto SQL | schema.prisma |
| golang-migrate (Go) | hand-written SQL files | *.up.sql / *.down.sql |
| Alembic (Python) | Python migration scripts, autogenerated from models | your Base.metadata |
Init, autogenerate, upgrade, downgrade
Section titled “Init, autogenerate, upgrade, downgrade”-
Scaffold an async Alembic environment:
Terminal window uv run alembic init -t async alembicThe
-t asynctemplate wires up an async engine inenv.py— essential, since our engine is async. -
Point Alembic at your metadata. In
alembic/env.py, set the target and the URL:alembic/env.py from app.models import Basetarget_metadata = Base.metadata# set the async URL (or read it from your settings / env)config.set_main_option("sqlalchemy.url", "postgresql+asyncpg://dev:dev@localhost:5432/app") -
Autogenerate a migration by diffing models vs. database:
Terminal window uv run alembic revision --autogenerate -m "create tasks table" -
Apply it (and roll back if needed):
Terminal window uv run alembic upgrade head # apply all pendinguv run alembic downgrade -1 # undo the last oneuv run alembic current # show applied revision
A generated migration is a Python module with upgrade() and downgrade():
def upgrade() -> None: op.create_table( "tasks", sa.Column("id", sa.BigInteger(), primary_key=True, autoincrement=True), sa.Column("title", sa.String(length=255), nullable=False), sa.Column("done", sa.Boolean(), server_default=sa.false(), nullable=False), sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()), )
def downgrade() -> None: op.drop_table("tasks")Integrating with FastAPI
Section titled “Integrating with FastAPI”The standard pattern (tie this to the FastAPI module): a single engine created at app startup via the lifespan handler, and a session-per-request injected with Depends.
from contextlib import asynccontextmanagerfrom collections.abc import AsyncIterator
from fastapi import FastAPI, Depends, HTTPExceptionfrom sqlalchemy import selectfrom sqlalchemy.ext.asyncio import ( create_async_engine, async_sessionmaker, AsyncSession,)
from app.models import Task
engine = create_async_engine("postgresql+asyncpg://dev:dev@localhost:5432/app")SessionLocal = async_sessionmaker(engine, expire_on_commit=False)
@asynccontextmanagerasync def lifespan(app: FastAPI) -> AsyncIterator[None]: # startup: engine/pool is ready (created above) yield # shutdown: drain the pool cleanly await engine.dispose()
app = FastAPI(lifespan=lifespan)
# Session-per-request dependency — one fresh session, closed after the responseasync def get_session() -> AsyncIterator[AsyncSession]: async with SessionLocal() as session: yield session
@app.get("/tasks/{task_id}")async def get_task(task_id: int, session: AsyncSession = Depends(get_session)): task = await session.get(Task, task_id) if task is None: raise HTTPException(404, "task not found") return task
@app.get("/tasks")async def list_tasks(session: AsyncSession = Depends(get_session)): return (await session.scalars(select(Task).order_by(Task.id))).all()The key moves: the engine (and its pool) is a process-wide singleton created once and disposed in lifespan; each request gets its own session from Depends(get_session), which closes it (returning the connection to the pool) when the response is sent. This satisfies the “one session per task” rule automatically — each request is its own task.
Practice
Section titled “Practice”Implement the same CRUD four ways against the shared Postgres, with an Alembic migration to create the table.