Skip to content

PostgreSQL Access

Everything you know about database access from TypeScript (pg/postgres.js, Drizzle, Prisma) and Go (database/sql + pgx, sqlc, GORM) has a Python counterpart. The big shift for the 2026 Python stack: it is async-first. You talk to Postgres over a non-blocking driver inside an event loop, the same way you do in Node, not with the blocking psycopg2-and-a-thread-pool model older Python tutorials assume.

This module covers the same tiny domain — a tasks table — four ways, from the lowest-level driver up to the highest-level model, so you can feel the trade-offs and pick deliberately:

  1. Raw asyncpg — a connection pool and parametrized queries. Fastest, lowest-level. The peer of Go’s pgx and Node’s postgres.js.
  2. SQLAlchemy 2.0 Core — a typed SQL expression language (select(), insert()) over an async engine. The peer of Knex/Kysely or Go’s squirrel.
  3. SQLAlchemy 2.0 ORM (async)Mapped[...] models, AsyncSession, relationships, identity map. The peer of TypeORM/Prisma or GORM.
  4. SQLModel — Pydantic v2 and SQLAlchemy fused into one model class. Optimized for FastAPI CRUD.
Your backgroundClosest Python equivalentStyle
pgx / database/sql (Go)asyncpgRaw async SQL, manual mapping
pg / postgres.js (TS)asyncpgRaw async SQL, connection pool
sqlc (Go)(no exact peer) — closest is SQLAlchemy Core + typed rowsTyped queries
Knex / Kysely (TS)SQLAlchemy 2.0 CoreType-checked query builder
squirrel (Go)SQLAlchemy 2.0 CoreExpression-language builder
GORM (Go)SQLAlchemy 2.0 ORMFull ORM, identity map
TypeORM / Prisma (TS)SQLAlchemy 2.0 ORM or SQLModelORM with model classes
Drizzle (TS)SQLAlchemy 2.0 Core (closest typed builder)Schema-as-code, typed
golang-migrate / Prisma MigrateAlembicVersioned migrations

The four sit on a spectrum from raw driver control on the left to model-level convenience on the right:

The four ways, on a control-vs-convenience spectrum
Rendering diagram…
asyncpgSQLAlchemy CoreSQLAlchemy ORMSQLModel
LevelDriverQuery builderORMORM + validation
ReturnsRecord (dict-like)Row tuplesmapped objectsmapped objects (Pydantic)
Type safetymanualtyped columnsMapped[T]Mapped[T] + Pydantic
Identity map / dirty trackingNoNoYesYes
Relationshipsmanual joinsmanual joinsrelationship() + eager loadingRelationship()
Best forhot paths, bulk, full SQLreporting, dynamic SQLrich domainsFastAPI CRUD

The one thing to internalize coming from Node: in 2026 Python you use a non-blocking driver inside the asyncio event loop, just like pg in Node. Every query is awaited; nothing blocks the loop. If you’ve seen blocking psycopg2 in old tutorials, that’s the legacy path — we don’t use it here.

Coming from Go, the difference is that there is no implicit parallelism: one event loop, one thread (see the Async module). A connection pool hands out connections to coroutines, and you must not hand the same connection or ORM session to two coroutines running concurrently — more on that in the gotchas.

// postgres.js — pooled, parametrized, async
import postgres from "postgres";
const sql = postgres("postgres://dev:dev@localhost:5432/app");
const rows = await sql`
SELECT id, title, done FROM tasks WHERE done = ${false}
`;
for (const row of rows) console.log(row.id, row.title);
await sql.end();

Note that asyncpg, like pgx and Postgres itself, uses native $1, $2 placeholders — not the ? of JDBC or sqlite. SQLAlchemy abstracts placeholders away entirely.

asyncpg is a from-scratch async Postgres driver — it does not implement DB-API, it speaks the Postgres binary protocol directly, and it is one of the fastest Postgres drivers in any language. It is the Python peer of Go’s pgx and Node’s postgres.js: a pool, parametrized queries, and rows you map yourself.

You create one pool at startup and share it for the whole process — never open a connection per request.

app/db_asyncpg.py
import asyncpg
# Create once, at startup, then reuse for the whole process.
async def make_pool() -> asyncpg.Pool:
return await asyncpg.create_pool(
"postgresql://dev:dev@localhost:5432/app",
min_size=5, # connections kept warm
max_size=20, # ceiling under load
max_inactive_connection_lifetime=300.0, # recycle idle conns after 5 min
command_timeout=30.0,
)
ConcernGo (pgxpool)TS (postgres.js)Python (asyncpg)
Create poolpgxpool.Newpostgres(url, { max })asyncpg.create_pool
Max connectionspool_max_conns in URLmaxmax_size
Min / warmpool_min_connsidle_timeout (inverse)min_size
Acquirepool.Acquire(ctx)implicit per querypool.acquire()
Query timeoutcontext.WithTimeout{ timeout }command_timeout

asyncpg gives you four read shapes, all parametrized with $n:

async with pool.acquire() as conn:
# All matching rows -> list[Record]
rows = await conn.fetch("SELECT * FROM tasks WHERE done = $1", False)
# Single row or None
row = await conn.fetchrow("SELECT * FROM tasks WHERE id = $1", 1)
# Single scalar value (first column of first row)
count = await conn.fetchval("SELECT count(*) FROM tasks")
# No result set (INSERT/UPDATE/DELETE without RETURNING)
await conn.execute("DELETE FROM tasks WHERE id = $1", 1)

A Record is dict-like and tuple-like: row["title"] and row[1] both work. There is no automatic mapping to a class — you do it yourself, which is the whole point of staying at this level.

# Postgres-native IN: pass a list to ANY()
ids = [1, 2, 3]
rows = await conn.fetch("SELECT * FROM tasks WHERE id = ANY($1)", ids)
async with pool.acquire() as conn:
async with conn.transaction():
await conn.execute("UPDATE accounts SET balance = balance - $1 WHERE id = $2", 100, 1)
await conn.execute("UPDATE accounts SET balance = balance + $1 WHERE id = $2", 100, 2)
# commits on clean exit, rolls back if the block raises

This is exactly pgx’s conn.Begin(ctx) / tx.Commit() discipline, but with async with doing the commit/rollback for you, the way Go’s defer tx.Rollback() pattern wishes it could.

SQLAlchemy 2.0 is two things in one package: Core (a typed SQL expression language and connection engine) and the ORM built on top of it. Core is the Knex/Kysely layer — you build queries from Python objects, get full SQL control, but no entity tracking. With the 2.0 async engine it runs on top of asyncpg.

app/db_core.py
import sqlalchemy as sa
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine(
"postgresql+asyncpg://dev:dev@localhost:5432/app",
pool_size=20,
max_overflow=10,
pool_pre_ping=True, # validate a conn before handing it out
echo=False, # set True to log every SQL statement
)
metadata = sa.MetaData()
tasks = sa.Table(
"tasks",
metadata,
sa.Column("id", sa.BigInteger, primary_key=True, autoincrement=True),
sa.Column("title", sa.String(255), nullable=False),
sa.Column("done", sa.Boolean, nullable=False, server_default=sa.false()),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
)

Note postgresql+asyncpg:// — the +asyncpg part tells SQLAlchemy to use asyncpg as the driver. Drop it and you get the blocking default; the async engine requires an async driver.

from sqlalchemy import select, insert, update, delete
async with engine.begin() as conn: # begin() => transaction, auto-commit on exit
# INSERT ... RETURNING id
result = await conn.execute(
insert(tasks).values(title="Write docs").returning(tasks.c.id)
)
new_id = result.scalar_one()
# SELECT with a typed WHERE
result = await conn.execute(
select(tasks).where(tasks.c.done == False).order_by(tasks.c.id) # noqa: E712
)
for row in result.mappings(): # .mappings() => dict-like rows
print(row["title"])
# UPDATE
await conn.execute(
update(tasks).where(tasks.c.id == new_id).values(done=True)
)
# DELETE
await conn.execute(delete(tasks).where(tasks.c.id == new_id))

The query objects are composable Python — select(tasks).where(...) builds an AST, not a string, so a typo in a column name (tasks.c.titel) is caught the moment it runs, and ty/mypy can flag the obvious mistakes statically. This is the Knex/Drizzle value proposition.

The ORM adds an identity map, change tracking (the “unit of work”), and relationship() navigation on top of Core. The modern 2.0 style uses Mapped[...] annotations and mapped_column() — fully typed, checked by ty/mypy.

// TypeORM entity
@Entity("tasks")
export class Task {
@PrimaryGeneratedColumn("increment")
id: number;
@Column({ length: 255 })
title: string;
@Column({ default: false })
done: boolean;
@CreateDateColumn()
createdAt: Date;
}

The Mapped[int] / Mapped[str] annotations are how SQLAlchemy 2.0 infers column types and gives you a typed attribute — task.title is a str to the type checker, task.done is a bool. Mapped[str | None] makes a column nullable. This is a genuine improvement over the old untyped Column(...) class attributes.

The session is the unit-of-work scope. You build it from an async_sessionmaker and open one per logical operation (per request, in a web app).

app/db.py
from sqlalchemy.ext.asyncio import (
create_async_engine,
async_sessionmaker,
AsyncSession,
)
engine = create_async_engine("postgresql+asyncpg://dev:dev@localhost:5432/app")
# expire_on_commit=False keeps objects usable after commit (important for web responses)
SessionLocal = async_sessionmaker(engine, expire_on_commit=False)
from sqlalchemy import select
async with SessionLocal() as session:
async with session.begin(): # one transaction; commits on clean exit
# CREATE
task = Task(title="Write docs")
session.add(task)
await session.flush() # assigns task.id without committing yet
# READ one — get by PK uses the identity map
task = await session.get(Task, 1)
# READ list
result = await session.scalars(
select(Task).where(Task.done == False).order_by(Task.id) # noqa: E712
)
tasks = result.all() # list[Task], fully typed
# UPDATE — just mutate; the unit of work flushes the change
async with session.begin():
task = await session.get(Task, 1)
task.done = True
# no explicit UPDATE — dirty tracking emits it at flush
# DELETE
async with session.begin():
task = await session.get(Task, 1)
await session.delete(task)

session.scalars(select(Task)) returns the Task objects (the ORM unwraps the single-entity rows for you); session.execute(select(Task)) returns Row tuples instead. Use scalars when you select whole entities.

Add a parent and a relationship():

app/models.py
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
class Project(Base):
__tablename__ = "projects"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(255))
tasks: Mapped[list["Task"]] = relationship(back_populates="project")
class Task(Base):
__tablename__ = "tasks"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(255))
project_id: Mapped[int | None] = mapped_column(ForeignKey("projects.id"))
project: Mapped["Project | None"] = relationship(back_populates="tasks")

The N+1 problem is the same one GORM and TypeORM have: loop over N projects, touch project.tasks on each, and you fire 1 + N queries. The fix is eager loading with selectinload (one extra query, batched by IN) or joinedload (a single JOIN):

from sqlalchemy.orm import selectinload
# 1 query for projects + 1 batched query for all their tasks = 2 total
result = await session.scalars(
select(Project).options(selectinload(Project.tasks))
)
for project in result:
print(project.name, len(project.tasks)) # no extra query
N+1 fixWhen to useSQL emitted
selectinload(rel)one-to-many, large child setsparent query + WHERE child.fk IN (...)
joinedload(rel)many-to-one, small fan-outsingle LEFT JOIN
.where(...).join(...) (manual)full control, reportingwhatever you write

SQLModel is from the author of FastAPI. One class is both a Pydantic v2 model (validation, serialization, OpenAPI schema) and a SQLAlchemy ORM model (a real table). For FastAPI CRUD this collapses the usual “DB model + request schema + response schema” triplet into far less code.

app/sqlmodels.py
from sqlmodel import SQLModel, Field
class Task(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
title: str = Field(max_length=255)
done: bool = Field(default=False)

table=True makes it a real table; the same class without it is a plain Pydantic model you can use as a request body. Querying uses SQLModel’s select() (a thin wrapper over SQLAlchemy’s) and the async session:

from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
async with AsyncSession(engine) as session:
task = Task(title="Write docs") # also validates as a Pydantic model
session.add(task)
await session.commit()
await session.refresh(task)
result = await session.exec(select(Task).where(Task.done == False)) # noqa: E712
for t in result.all():
print(t.title)
SQLAlchemy ORMSQLModel
Model definitionMapped[...] + mapped_columnone Field(...) per column
Validationnone (separate Pydantic schema)built-in (it is Pydantic)
FastAPI integrationmanual response modelsmodel doubles as request/response
Escape hatch to Corefullfull (it’s SQLAlchemy underneath)
Maturity / docs for advanced casesvery highthinner; you fall back to SQLAlchemy docs
FactorasyncpgSQLAlchemy CoreSQLAlchemy ORMSQLModel
Abstractionnone (raw SQL)typed query builderfull ORMORM + validation
Type safetymanualtyped columnsMapped[T]Mapped[T] + Pydantic
Boilerplatemedium (map rows)lowlowlowest
Performance ceilinghighesthighgood (object overhead)good
Complex/dynamic SQLeasy (write it)easy (compose it)possible (drop to Core)drop to Core
Rich domain + relationspainfulmanualexcellentgood
FastAPI request/responsemanual schemasmanual schemasmanual schemasbuilt in
Comparable topgx / postgres.jsKnex / Kysely / squirrelGORM / TypeORMPrisma-ish for FastAPI

Reach for raw asyncpg on hot paths, bulk loads (copy_records_to_table), and queries where you want to own every byte of SQL.

Reach for SQLAlchemy Core for reporting and dynamic query building without entity overhead.

Reach for the SQLAlchemy ORM for rich domains with relationships, where the identity map and unit-of-work pay for themselves.

Reach for SQLModel when you’re building FastAPI CRUD and want one model to serve as table, validation, and OpenAPI schema.

And yes, you can mix them over the same engine/pool: ORM for domain writes, Core or asyncpg for the reporting query.

Connection pooling, sessions, and the gotchas

Section titled “Connection pooling, sessions, and the gotchas”

The defaults are conservative. Size the pool to your concurrency, and remember the hard ceiling is Postgres’s max_connections (default 100) shared across all your app instances.

KnobSQLAlchemy async engineasyncpg poolRule of thumb
Steady-state sizepool_sizemin_sizematch expected concurrent queries
Burst headroommax_overflowmax_size - min_sizesmall buffer for spikes
Liveness checkpool_pre_ping=Truebuilt inon; cheap insurance vs. stale conns
Recyclepool_recycle=1800max_inactive_connection_lifetimeunder any proxy idle timeout

The modern idiom is async with session.begin() — it opens a transaction and commits on a clean exit, rolls back on exception. No manual commit()/rollback().

async with SessionLocal() as session:
async with session.begin():
session.add(Task(title="a"))
session.add(Task(title="b"))
# both committed together, or neither

Don’t share a session (or connection) across tasks

Section titled “Don’t share a session (or connection) across tasks”

This is the asyncio-specific footgun, and it bites Go developers who reach for the goroutine-style “fan out over a shared handle” pattern.

import asyncio
# WRONG — shared session across concurrent tasks
async def bad(session):
async with asyncio.TaskGroup() as tg:
tg.create_task(session.scalars(select(Task))) # boom: concurrent use
tg.create_task(session.scalars(select(Project)))
# RIGHT — a fresh session per task
async def fetch_tasks():
async with SessionLocal() as s:
return (await s.scalars(select(Task))).all()
async def fetch_projects():
async with SessionLocal() as s:
return (await s.scalars(select(Project))).all()
async def good():
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(fetch_tasks())
t2 = tg.create_task(fetch_projects())
return t1.result(), t2.result()

Alembic is SQLAlchemy’s migration tool — the peer of Prisma Migrate, golang-migrate, or Rails migrations. It autogenerates migrations by diffing your models against the live database, and applies them up or down.

ToolStyleSchema source
Prisma Migrate (TS)schema-first, auto SQLschema.prisma
golang-migrate (Go)hand-written SQL files*.up.sql / *.down.sql
Alembic (Python)Python migration scripts, autogenerated from modelsyour Base.metadata
  1. Scaffold an async Alembic environment:

    Terminal window
    uv run alembic init -t async alembic

    The -t async template wires up an async engine in env.py — essential, since our engine is async.

  2. Point Alembic at your metadata. In alembic/env.py, set the target and the URL:

    alembic/env.py
    from app.models import Base
    target_metadata = Base.metadata
    # set the async URL (or read it from your settings / env)
    config.set_main_option(
    "sqlalchemy.url", "postgresql+asyncpg://dev:dev@localhost:5432/app"
    )
  3. Autogenerate a migration by diffing models vs. database:

    Terminal window
    uv run alembic revision --autogenerate -m "create tasks table"
  4. Apply it (and roll back if needed):

    Terminal window
    uv run alembic upgrade head # apply all pending
    uv run alembic downgrade -1 # undo the last one
    uv run alembic current # show applied revision

A generated migration is a Python module with upgrade() and downgrade():

alembic/versions/0001_create_tasks.py
def upgrade() -> None:
op.create_table(
"tasks",
sa.Column("id", sa.BigInteger(), primary_key=True, autoincrement=True),
sa.Column("title", sa.String(length=255), nullable=False),
sa.Column("done", sa.Boolean(), server_default=sa.false(), nullable=False),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
)
def downgrade() -> None:
op.drop_table("tasks")

The standard pattern (tie this to the FastAPI module): a single engine created at app startup via the lifespan handler, and a session-per-request injected with Depends.

app/main.py
from contextlib import asynccontextmanager
from collections.abc import AsyncIterator
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy import select
from sqlalchemy.ext.asyncio import (
create_async_engine, async_sessionmaker, AsyncSession,
)
from app.models import Task
engine = create_async_engine("postgresql+asyncpg://dev:dev@localhost:5432/app")
SessionLocal = async_sessionmaker(engine, expire_on_commit=False)
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
# startup: engine/pool is ready (created above)
yield
# shutdown: drain the pool cleanly
await engine.dispose()
app = FastAPI(lifespan=lifespan)
# Session-per-request dependency — one fresh session, closed after the response
async def get_session() -> AsyncIterator[AsyncSession]:
async with SessionLocal() as session:
yield session
@app.get("/tasks/{task_id}")
async def get_task(task_id: int, session: AsyncSession = Depends(get_session)):
task = await session.get(Task, task_id)
if task is None:
raise HTTPException(404, "task not found")
return task
@app.get("/tasks")
async def list_tasks(session: AsyncSession = Depends(get_session)):
return (await session.scalars(select(Task).order_by(Task.id))).all()

The key moves: the engine (and its pool) is a process-wide singleton created once and disposed in lifespan; each request gets its own session from Depends(get_session), which closes it (returning the connection to the pool) when the response is sent. This satisfies the “one session per task” rule automatically — each request is its own task.

Implement the same CRUD four ways against the shared Postgres, with an Alembic migration to create the table.