Skip to content

Redis & Caching

You already know Redis from ioredis (TypeScript) or go-redis (Go) — set a key, read it back, slap a TTL on it, build a rate limiter out of INCR. None of that changes in Python. What changes is the client: you use the redis package (redis-py), and specifically its async client at redis.asyncio. It speaks the same await-everything dialect as the rest of your FastAPI app.

Concernioredis (TS)go-redis (Go)redis-py async (Python)
Packageioredisgithub.com/redis/go-redis/v9redis (import redis.asyncio)
Client objectnew Redis()redis.NewClient(&Options{})redis.asyncio.from_url(...)
Connection modelsingle socket + auto-pipelinepoolpool (ConnectionPool)
Asyncnative Promisescontext.Context per callasync/await, coroutines
Cluster / Sentinelbuilt-inbuilt-inRedisCluster, Sentinel
Serializationmanual (JSON)manual (JSON)manual (JSON via Pydantic)
Pipelines / transactionspipeline() / multi()Pipeline / TxPipelineclient.pipeline()

There’s no Spring-@Cacheable-style framework annotation in the Python world. Caching is explicit: you write the get→miss→load→set yourself, or wrap it in a small decorator. That’s a feature — there’s no AOP proxy magic to debug, just a function.

Add it to a uv project. redis[hiredis] pulls in the optional C parser, which roughly doubles throughput on large replies — always include it in production.

Terminal window
uv add "redis[hiredis]"

redis.asyncio.from_url is the one call you need. It does not open a socket eagerly — it builds a lazy ConnectionPool, and connections are created on first command. This is the equivalent of constructing a go-redis client or an ioredis instance: cheap, share one across your whole app.

import Redis from "ioredis";
// One shared client for the whole process. ioredis pools internally.
export const redis = new Redis("redis://localhost:6379", {
maxRetriesPerRequest: 3,
});
await redis.set("user:1:name", "Alice");
const name = await redis.get("user:1:name"); // "Alice"

Don’t create a client per request and don’t reach for a module global you forget to close. Open the pool in the FastAPI lifespan (covered in Module 07) and stash it on app.state. This is the Python equivalent of wiring a singleton *redis.Client into your Go server struct or constructing ioredis once at module load.

app/main.py
from contextlib import asynccontextmanager
from collections.abc import AsyncIterator
import redis.asyncio as redis
from fastapi import FastAPI, Request
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
# Startup: build the pool once.
app.state.redis = redis.from_url(
"redis://localhost:6379",
decode_responses=True,
max_connections=20,
)
await app.state.redis.ping() # fail fast if Redis is down
yield
# Shutdown: drain the pool.
await app.state.redis.aclose()
app = FastAPI(lifespan=lifespan)
def get_redis(request: Request) -> redis.Redis:
"""FastAPI dependency: hand the shared client to route functions."""
return request.app.state.redis
app/routes.py
from typing import Annotated
import redis.asyncio as redis
from fastapi import APIRouter, Depends
from app.main import get_redis
router = APIRouter()
RedisDep = Annotated[redis.Redis, Depends(get_redis)]
@router.get("/ping")
async def ping(r: RedisDep) -> dict[str, bool]:
return {"ok": await r.ping()}

The command names are identical to what you type in redis-cli and nearly identical across all three clients — the only real difference is await and method casing. Here’s the type-by-type map:

Redis typeCommandsioredisgo-redisredis-py async
StringGET/SET/INCRget/set/incrGet/Set/Incrget/set/incr
HashHSET/HGETALLhset/hgetallHSet/HGetAllhset/hgetall
ListLPUSH/RPOP/LRANGElpush/rpop/lrangeLPush/RPop/LRangelpush/rpop/lrange
SetSADD/SMEMBERSsadd/smembersSAdd/SMemberssadd/smembers
Sorted setZADD/ZRANGEzadd/zrangeZAdd/ZRangezadd/zrange
data types
# --- String + counter ---
await r.set("greeting", "hello", ex=60) # SET ... EX 60 (TTL in seconds)
await r.get("greeting") # "hello"
await r.incr("page:home:views") # atomic counter -> 1, 2, 3...
await r.setnx("lock:job", "1") # SET if Not eXists -> True/False
# --- Hash (a record) ---
await r.hset("user:1", mapping={"name": "Alice", "score": "100"})
await r.hgetall("user:1") # {"name": "Alice", "score": "100"}
await r.hincrby("user:1", "score", 5) # 105
# --- List (a FIFO queue: LPUSH to enqueue, RPOP to dequeue) ---
await r.lpush("jobs", "job-a", "job-b")
await r.rpop("jobs") # "job-a"
await r.lrange("jobs", 0, -1) # ["job-b"]
await r.blpop("jobs", timeout=5) # blocking pop, waits up to 5s
# --- Set (unique membership, set algebra) ---
await r.sadd("article:1:tags", "python", "redis")
await r.sismember("article:1:tags", "python") # True
await r.sinter("tag:python", "tag:redis") # articles with BOTH tags
# --- Sorted set (leaderboard: members scored, rankable) ---
await r.zadd("leaderboard", {"alice": 100, "bob": 80})
await r.zincrby("leaderboard", 5, "bob") # bob -> 85
await r.zrevrange("leaderboard", 0, 2, withscores=True) # top 3 [(b"alice",100.0),...]
await r.zrevrank("leaderboard", "bob") # 1 (0-based, 0 = highest)

Redis stores bytes. To cache a domain object you serialize it. The 2026-Python answer is Pydantic v2’s model_dump_json() / model_validate_json() (see Module 04) — it’s fast (Rust core), handles datetime/UUID/Decimal correctly, and validates on the way back out so a corrupt or schema-drifted cache entry fails loudly instead of silently.

serializing a model
from datetime import datetime
from pydantic import BaseModel
class Product(BaseModel):
id: int
name: str
price: float
created_at: datetime
async def cache_product(r, product: Product) -> None:
# model_dump_json() -> compact str; round-trips datetime as ISO-8601.
await r.set(f"product:{product.id}", product.model_dump_json(), ex=300)
async def load_product(r, product_id: int) -> Product | None:
raw = await r.get(f"product:{product_id}")
if raw is None:
return None
# Validates types on the way out — a malformed entry raises, doesn't lie.
return Product.model_validate_json(raw)

Cache-aside (a.k.a. lazy loading) is the pattern you’ll use 90% of the time: on a read, check the cache; on a miss, load from the source and populate the cache with a TTL; on a write, invalidate. The app owns the cache — there’s no read-through proxy.

Cache-aside read path
Rendering diagram…
async function getProduct(id: number): Promise<Product | null> {
const cached = await redis.get(`product:${id}`);
if (cached) return JSON.parse(cached);
const product = await db.product.findUnique({ where: { id } });
if (product) {
await redis.set(`product:${id}`, JSON.stringify(product), "EX", 300);
}
return product;
}

Invalidation: write the DB, then delete the key

Section titled “Invalidation: write the DB, then delete the key”

On a write, the safe default is delete the cache key, don’t update it. Deleting forces the next read to repopulate from the source of truth; updating risks writing a stale value if two writers race. (Updating in place — write-through — is fine when you control the write path and want a warm cache; more below.)

invalidate on write
async def update_product(r, db, product_id: int, patch: ProductPatch) -> Product:
product = await save_to_db(db, product_id, patch)
await r.delete(f"product:{product_id}") # next read repopulates
return product

Once you’ve written cache-aside twice, wrap it. A decorator turns any expensive async function into a cached one — this is the closest Python gets to Spring’s @Cacheable, except it’s twelve lines you can actually read. It keys on the function name plus arguments and stores the JSON.

app/cache.py
import functools
import json
from collections.abc import Awaitable, Callable
from typing import Any
import redis.asyncio as redis
def cached(r: redis.Redis, *, ttl: int, prefix: str = "cache"):
"""Cache-aside decorator for async functions returning JSON-able values."""
def decorator[**P, T](fn: Callable[P, Awaitable[T]]) -> Callable[P, Awaitable[T]]:
@functools.wraps(fn)
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
# Build a stable key from the call signature.
key = f"{prefix}:{fn.__name__}:" + ":".join(map(str, args))
hit = await r.get(key)
if hit is not None:
return json.loads(hit)
result = await fn(*args, **kwargs)
await r.set(key, json.dumps(result, default=str), ex=ttl)
return result
return wrapper
return decorator
# Usage — note PEP 695 generics ([**P, T]) keep the wrapper fully typed.
@cached(client, ttl=300, prefix="reports")
async def expensive_report(year: int, region: str) -> dict[str, Any]:
... # a 2-second aggregation query

Write-through updates the cache and the database on every write, keeping the cache permanently warm. You trade write latency (two writes) for never serving a cold read. Use it for hot, read-heavy keys you control:

write-through
async def set_product(r, db, product: Product) -> Product:
saved = await save_to_db(db, product)
# Write the fresh value straight into the cache instead of deleting.
await r.set(f"product:{saved.id}", saved.model_dump_json(), ex=300)
return saved

The risk is the inverse of delete-on-write: if the cache write succeeds but a later DB write in the same logical operation fails, or two writers race, the cache can hold a value the DB never committed. Cache-aside-with-delete is the safer default; reach for write-through only when the warm-cache win is worth the coordination.

Rate limiting is the other canonical Redis job. The counter lives in Redis so every instance of your service shares one limit per client — which is the whole point, and what an in-memory express-rate-limit or golang.org/x/time/rate limiter can’t do across a fleet.

The non-negotiable property: the check must be atomic. A read-then-write race (read count → decide → increment) lets two concurrent requests both see “9 of 10” and both proceed. The fix is either a single atomic command (INCR) or a Lua script, which Redis runs as one indivisible operation.

AlgorithmData structureAccuracyCostNotes
Fixed windowINCR + EXPIRElow (boundary burst)1 keycheapest; allows 2× at edges
Sliding windowsorted set of timestampshighO(log n) per reqprecise; what most APIs want
Token buckethash (tokens + last-refill)high, allows burstssmall Luasmooth refill + burst capacity

Bucket the clock into slices of window seconds, INCR the bucket, set EXPIRE on the first hit. Cheap, but a client can fire limit requests at the end of one window and limit more at the start of the next — 2 × limit in a heartbeat.

app/ratelimit.py — fixed window
import time
FIXED_WINDOW_LUA = """
local count = redis.call('INCR', KEYS[1])
if count == 1 then
redis.call('EXPIRE', KEYS[1], ARGV[1])
end
local ttl = redis.call('TTL', KEYS[1])
return {count, ttl}
"""
async def fixed_window_allow(r, client_id: str, limit: int, window: int) -> bool:
bucket = int(time.time()) // window
key = f"rate:fixed:{client_id}:{bucket}"
count, _ttl = await r.eval(FIXED_WINDOW_LUA, 1, key, window)
return count <= limit

Keep one sorted set per client where each member is a request and its score is the timestamp. On each request: drop members older than the window (ZREMRANGEBYSCORE), count what’s left (ZCARD), reject if at limit, else add the request and refresh the TTL. Precise, and the whole sequence is one atomic Lua script.

app/ratelimit.py — sliding window
import time
import uuid
SLIDING_WINDOW_LUA = """
-- KEYS[1] = zset key
-- ARGV: now_ms, window_ms, limit, member, ttl_seconds
local now = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local limit = tonumber(ARGV[3])
redis.call('ZREMRANGEBYSCORE', KEYS[1], 0, now - window)
local count = redis.call('ZCARD', KEYS[1])
if count >= limit then
local oldest = redis.call('ZRANGE', KEYS[1], 0, 0, 'WITHSCORES')
return {0, oldest[2]}
end
redis.call('ZADD', KEYS[1], now, ARGV[4])
redis.call('EXPIRE', KEYS[1], ARGV[5])
return {1, 0}
"""
async def sliding_window_allow(r, client_id: str, limit: int, window: int) -> bool:
now_ms = int(time.time() * 1000)
member = f"{now_ms}-{uuid.uuid4()}" # unique: two reqs in the same ms must not collide
allowed, _oldest = await r.eval(
SLIDING_WINDOW_LUA, 1,
f"rate:sliding:{client_id}",
now_ms, window * 1000, limit, member, window,
)
return bool(allowed)

A bucket holds up to capacity tokens, refilled at rate tokens/second. Each request spends one token; an empty bucket rejects. It allows bursts (up to capacity) while enforcing a long-run average — the model behind golang.org/x/time/rate. Store tokens and last_refill in a hash and reconcile lazily in Lua:

Lua: token bucket
-- KEYS[1] = bucket hash ARGV: capacity, refill_per_sec, now, requested
local capacity = tonumber(ARGV[1])
local refill = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local want = tonumber(ARGV[4])
local data = redis.call('HMGET', KEYS[1], 'tokens', 'ts')
local tokens = tonumber(data[1]) or capacity
local ts = tonumber(data[2]) or now
tokens = math.min(capacity, tokens + (now - ts) * refill) -- lazy refill
local allowed = tokens >= want
if allowed then tokens = tokens - want end
redis.call('HMSET', KEYS[1], 'tokens', tokens, 'ts', now)
redis.call('EXPIRE', KEYS[1], math.ceil(capacity / refill) * 2)
return allowed and 1 or 0

In Express you’d register rate-limit middleware globally; in Go you’d wrap your http.Handler. FastAPI’s idiomatic unit is a dependency — it runs before the route, can read the request, and can short-circuit by raising HTTPException. A dependency targets specific routes precisely; a middleware applies to everything.

import rateLimit from "express-rate-limit"; // in-memory by default — NOT shared across instances
app.use("/api", rateLimit({ windowMs: 60_000, max: 10 }));

Redis Pub/Sub is fire-and-forget real-time messaging: publish to a channel, every current subscriber gets it, and if nobody’s listening the message vanishes. It’s perfect for cache invalidation across instances and live UI nudges — and wrong for anything that must not be lost.

The async client gives you a pubsub() object you consume with async for — the same async for you learned for streams in Module 06.

const pub = new Redis();
await pub.publish("invalidate", JSON.stringify({ key: "product:1" }));
const sub = new Redis(); // a subscriber connection is dedicated — can't run other cmds
sub.subscribe("invalidate");
sub.on("message", (_chan, msg) => cache.delete(JSON.parse(msg).key));

Pub/Sub drops messages with no live subscriber. Redis Streams (XADD / XREAD / consumer groups via XREADGROUP) is the durable, replayable cousin: messages persist, late consumers can read history, and consumer groups give you Kafka-style partitioned, acknowledged consumption with redelivery of un-acked messages. It’s a genuine lightweight event log.

But if you need durable events as a first-class architectural concern — ordering guarantees, long retention, many consumer groups, real throughput — reach for Kafka, covered next in Module 11. Rule of thumb: Streams for lightweight intra-app work queues you already have Redis for; Kafka when the event log is the system.

A distributed lock coordinates “only one worker does X at a time” across instances. The primitive is SET key token NX EX ttl: set-if-not-exists, with a TTL so a crashed holder doesn’t deadlock the lock forever. Release by deleting only if you still own it — checked atomically in Lua, because a naive GET-then-DEL can delete a lock another worker acquired after yours expired.

app/lock.py
import uuid
from contextlib import asynccontextmanager
RELEASE_LUA = """
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
end
return 0
"""
@asynccontextmanager
async def redis_lock(r, name: str, ttl: int = 10):
token = str(uuid.uuid4()) # proves ownership on release
acquired = await r.set(f"lock:{name}", token, nx=True, ex=ttl)
if not acquired:
raise RuntimeError(f"could not acquire lock {name}")
try:
yield
finally:
await r.eval(RELEASE_LUA, 1, f"lock:{name}", token) # release iff still ours

The flip side of locking: make a request safe to retry. The client sends an Idempotency-Key header; you SET NX it before doing the work and return the stored response on a replay. This is how Stripe-style “retrying a charge won’t double-charge” works, and it’s a handful of Redis commands.

idempotency dependency
async def idempotent(request: Request) -> str | None:
key = request.headers.get("idempotency-key")
if key is None:
return None
r = request.app.state.redis
# Reserve the key; if it already holds a response, this returns it.
first = await r.set(f"idem:{key}", "in-progress", nx=True, ex=86400)
if not first:
cached = await r.get(f"idem:{key}")
raise HTTPException(409, detail=f"duplicate request (status: {cached})")
return key
# On success, the handler overwrites idem:{key} with the serialized response.

When a hot key expires, every concurrent request misses at once and stampedes the database with the same expensive query — the thundering herd. The fix is single-flight: only one caller recomputes; the rest wait for that result. In Go this is golang.org/x/sync/singleflight; in Python you build it with an asyncio.Lock per key (in-process) plus a short Redis SET NX lock (cross-process).

single-flight cache-aside
import asyncio
_locks: dict[str, asyncio.Lock] = {}
async def get_single_flight(r, db, product_id: int) -> Product | None:
key = f"product:{product_id}"
cached = await r.get(key)
if cached is not None:
return Product.model_validate_json(cached)
# Per-key in-process lock: collapse concurrent misses in THIS worker to one load.
lock = _locks.setdefault(key, asyncio.Lock())
async with lock:
cached = await r.get(key) # double-check: someone may have filled it
if cached is not None:
return Product.model_validate_json(cached)
product = await load_from_db(db, product_id)
if product is not None:
await r.set(key, product.model_dump_json(), ex=300)
return product

For cross-instance herds, add jitter to your TTLs (ex=300 + random.randint(0, 30)) so a million keys set in the same deploy don’t all expire in the same second, and/or gate the recompute with the Redis SET NX lock from above.

GotchaWhat bites youDiscipline
Stale cacheserving old data after a writedelete key after DB commit; short TTL as a backstop
No TTLkeys live forever, Redis OOMsevery cache key gets an ex=; reserve no-TTL for durable state
TTL stampedemass-set keys expire togetheradd random jitter to TTLs
bytes vs strif value == "x" silently falsedecode_responses=True (or decode explicitly)
Empty vs missing[]/{} are falsy like missingcheck await r.exists(k) when it matters
pickleRCE from a poisoned entryJSON only across trust boundaries
Caching Noneone miss poisons the cache for the TTLdecide explicitly whether negatives are cached, and use a short TTL if so

Wire cache-aside and a sliding-window limiter onto a real FastAPI service against the shared Redis instance.