Redis & Caching
You already know Redis from ioredis (TypeScript) or go-redis (Go) — set a key,
read it back, slap a TTL on it, build a rate limiter out of INCR. None of that
changes in Python. What changes is the client: you use the redis package
(redis-py), and specifically its async client at redis.asyncio. It speaks the
same await-everything dialect as the rest of your FastAPI app.
The client landscape
Section titled “The client landscape”| Concern | ioredis (TS) | go-redis (Go) | redis-py async (Python) |
|---|---|---|---|
| Package | ioredis | github.com/redis/go-redis/v9 | redis (import redis.asyncio) |
| Client object | new Redis() | redis.NewClient(&Options{}) | redis.asyncio.from_url(...) |
| Connection model | single socket + auto-pipeline | pool | pool (ConnectionPool) |
| Async | native Promises | context.Context per call | async/await, coroutines |
| Cluster / Sentinel | built-in | built-in | RedisCluster, Sentinel |
| Serialization | manual (JSON) | manual (JSON) | manual (JSON via Pydantic) |
| Pipelines / transactions | pipeline() / multi() | Pipeline / TxPipeline | client.pipeline() |
There’s no Spring-@Cacheable-style framework annotation in the Python world.
Caching is explicit: you write the get→miss→load→set yourself, or wrap it in a
small decorator. That’s a feature — there’s no AOP proxy magic to debug, just a
function.
Installing and connecting
Section titled “Installing and connecting”Add it to a uv project. redis[hiredis] pulls in the optional C parser, which
roughly doubles throughput on large replies — always include it in production.
uv add "redis[hiredis]"from_url + the connection pool
Section titled “from_url + the connection pool”redis.asyncio.from_url is the one call you need. It does not open a socket
eagerly — it builds a lazy ConnectionPool, and connections are created on first
command. This is the equivalent of constructing a go-redis client or an ioredis
instance: cheap, share one across your whole app.
import Redis from "ioredis";
// One shared client for the whole process. ioredis pools internally.export const redis = new Redis("redis://localhost:6379", { maxRetriesPerRequest: 3,});
await redis.set("user:1:name", "Alice");const name = await redis.get("user:1:name"); // "Alice"import "github.com/redis/go-redis/v9"
// PoolSize defaults to 10 * GOMAXPROCS.var rdb = redis.NewClient(&redis.Options{ Addr: "localhost:6379", PoolSize: 20,})
rdb.Set(ctx, "user:1:name", "Alice", 0)name, _ := rdb.Get(ctx, "user:1:name").Result() // "Alice"import redis.asyncio as redis
# Lazy pool — no socket opened until the first command.# decode_responses=True returns str instead of bytes (skip it for binary values).client = redis.from_url( "redis://localhost:6379", decode_responses=True, max_connections=20,)
await client.set("user:1:name", "Alice")name = await client.get("user:1:name") # "Alice" (str, not b"Alice")Lifespan integration with FastAPI
Section titled “Lifespan integration with FastAPI”Don’t create a client per request and don’t reach for a module global you forget to
close. Open the pool in the FastAPI lifespan (covered in
Module 07) and stash it on app.state. This is the
Python equivalent of wiring a singleton *redis.Client into your Go server struct
or constructing ioredis once at module load.
from contextlib import asynccontextmanagerfrom collections.abc import AsyncIterator
import redis.asyncio as redisfrom fastapi import FastAPI, Request
@asynccontextmanagerasync def lifespan(app: FastAPI) -> AsyncIterator[None]: # Startup: build the pool once. app.state.redis = redis.from_url( "redis://localhost:6379", decode_responses=True, max_connections=20, ) await app.state.redis.ping() # fail fast if Redis is down yield # Shutdown: drain the pool. await app.state.redis.aclose()
app = FastAPI(lifespan=lifespan)
def get_redis(request: Request) -> redis.Redis: """FastAPI dependency: hand the shared client to route functions.""" return request.app.state.redisfrom typing import Annotated
import redis.asyncio as redisfrom fastapi import APIRouter, Depends
from app.main import get_redis
router = APIRouter()RedisDep = Annotated[redis.Redis, Depends(get_redis)]
@router.get("/ping")async def ping(r: RedisDep) -> dict[str, bool]: return {"ok": await r.ping()}Core data types & commands
Section titled “Core data types & commands”The command names are identical to what you type in redis-cli and nearly
identical across all three clients — the only real difference is await and method
casing. Here’s the type-by-type map:
| Redis type | Commands | ioredis | go-redis | redis-py async |
|---|---|---|---|---|
| String | GET/SET/INCR | get/set/incr | Get/Set/Incr | get/set/incr |
| Hash | HSET/HGETALL | hset/hgetall | HSet/HGetAll | hset/hgetall |
| List | LPUSH/RPOP/LRANGE | lpush/rpop/lrange | LPush/RPop/LRange | lpush/rpop/lrange |
| Set | SADD/SMEMBERS | sadd/smembers | SAdd/SMembers | sadd/smembers |
| Sorted set | ZADD/ZRANGE | zadd/zrange | ZAdd/ZRange | zadd/zrange |
Strings, hashes, lists, sets, sorted sets
Section titled “Strings, hashes, lists, sets, sorted sets”# --- String + counter ---await r.set("greeting", "hello", ex=60) # SET ... EX 60 (TTL in seconds)await r.get("greeting") # "hello"await r.incr("page:home:views") # atomic counter -> 1, 2, 3...await r.setnx("lock:job", "1") # SET if Not eXists -> True/False
# --- Hash (a record) ---await r.hset("user:1", mapping={"name": "Alice", "score": "100"})await r.hgetall("user:1") # {"name": "Alice", "score": "100"}await r.hincrby("user:1", "score", 5) # 105
# --- List (a FIFO queue: LPUSH to enqueue, RPOP to dequeue) ---await r.lpush("jobs", "job-a", "job-b")await r.rpop("jobs") # "job-a"await r.lrange("jobs", 0, -1) # ["job-b"]await r.blpop("jobs", timeout=5) # blocking pop, waits up to 5s
# --- Set (unique membership, set algebra) ---await r.sadd("article:1:tags", "python", "redis")await r.sismember("article:1:tags", "python") # Trueawait r.sinter("tag:python", "tag:redis") # articles with BOTH tags
# --- Sorted set (leaderboard: members scored, rankable) ---await r.zadd("leaderboard", {"alice": 100, "bob": 80})await r.zincrby("leaderboard", 5, "bob") # bob -> 85await r.zrevrange("leaderboard", 0, 2, withscores=True) # top 3 [(b"alice",100.0),...]await r.zrevrank("leaderboard", "bob") # 1 (0-based, 0 = highest)Serialization: JSON via Pydantic
Section titled “Serialization: JSON via Pydantic”Redis stores bytes. To cache a domain object you serialize it. The 2026-Python
answer is Pydantic v2’s model_dump_json() / model_validate_json() (see
Module 04) — it’s fast (Rust core), handles
datetime/UUID/Decimal correctly, and validates on the way back out so a
corrupt or schema-drifted cache entry fails loudly instead of silently.
from datetime import datetimefrom pydantic import BaseModel
class Product(BaseModel): id: int name: str price: float created_at: datetime
async def cache_product(r, product: Product) -> None: # model_dump_json() -> compact str; round-trips datetime as ISO-8601. await r.set(f"product:{product.id}", product.model_dump_json(), ex=300)
async def load_product(r, product_id: int) -> Product | None: raw = await r.get(f"product:{product_id}") if raw is None: return None # Validates types on the way out — a malformed entry raises, doesn't lie. return Product.model_validate_json(raw)Cache-aside
Section titled “Cache-aside”Cache-aside (a.k.a. lazy loading) is the pattern you’ll use 90% of the time: on a read, check the cache; on a miss, load from the source and populate the cache with a TTL; on a write, invalidate. The app owns the cache — there’s no read-through proxy.
flowchart TB A["read(id)"] --> B["GET cache:id"] B -->|"hit"| C["return cached value (no DB hit)"] B -->|"miss"| D["load from DB"] D --> E["SET cache:id value EX ttl"] E --> F["return value"]
async function getProduct(id: number): Promise<Product | null> { const cached = await redis.get(`product:${id}`); if (cached) return JSON.parse(cached);
const product = await db.product.findUnique({ where: { id } }); if (product) { await redis.set(`product:${id}`, JSON.stringify(product), "EX", 300); } return product;}func GetProduct(ctx context.Context, id int) (*Product, error) { key := fmt.Sprintf("product:%d", id) if raw, err := rdb.Get(ctx, key).Result(); err == nil { var p Product json.Unmarshal([]byte(raw), &p) return &p, nil } // redis.Nil means miss; fall through
p, err := loadFromDB(ctx, id) if err != nil || p == nil { return p, err } raw, _ := json.Marshal(p) rdb.Set(ctx, key, raw, 5*time.Minute) return p, nil}async def get_product(r, db, product_id: int) -> Product | None: key = f"product:{product_id}"
cached = await r.get(key) if cached is not None: # cache hit return Product.model_validate_json(cached)
product = await load_from_db(db, product_id) # cache miss -> load if product is not None: await r.set(key, product.model_dump_json(), ex=300) # populate w/ TTL return productInvalidation: write the DB, then delete the key
Section titled “Invalidation: write the DB, then delete the key”On a write, the safe default is delete the cache key, don’t update it. Deleting forces the next read to repopulate from the source of truth; updating risks writing a stale value if two writers race. (Updating in place — write-through — is fine when you control the write path and want a warm cache; more below.)
async def update_product(r, db, product_id: int, patch: ProductPatch) -> Product: product = await save_to_db(db, product_id, patch) await r.delete(f"product:{product_id}") # next read repopulates return productA reusable cache decorator
Section titled “A reusable cache decorator”Once you’ve written cache-aside twice, wrap it. A decorator turns any expensive
async function into a cached one — this is the closest Python gets to Spring’s
@Cacheable, except it’s twelve lines you can actually read. It keys on the function
name plus arguments and stores the JSON.
import functoolsimport jsonfrom collections.abc import Awaitable, Callablefrom typing import Any
import redis.asyncio as redis
def cached(r: redis.Redis, *, ttl: int, prefix: str = "cache"): """Cache-aside decorator for async functions returning JSON-able values."""
def decorator[**P, T](fn: Callable[P, Awaitable[T]]) -> Callable[P, Awaitable[T]]: @functools.wraps(fn) async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T: # Build a stable key from the call signature. key = f"{prefix}:{fn.__name__}:" + ":".join(map(str, args))
hit = await r.get(key) if hit is not None: return json.loads(hit)
result = await fn(*args, **kwargs) await r.set(key, json.dumps(result, default=str), ex=ttl) return result
return wrapper
return decorator
# Usage — note PEP 695 generics ([**P, T]) keep the wrapper fully typed.@cached(client, ttl=300, prefix="reports")async def expensive_report(year: int, region: str) -> dict[str, Any]: ... # a 2-second aggregation queryWrite-through, briefly
Section titled “Write-through, briefly”Write-through updates the cache and the database on every write, keeping the cache permanently warm. You trade write latency (two writes) for never serving a cold read. Use it for hot, read-heavy keys you control:
async def set_product(r, db, product: Product) -> Product: saved = await save_to_db(db, product) # Write the fresh value straight into the cache instead of deleting. await r.set(f"product:{saved.id}", saved.model_dump_json(), ex=300) return savedThe risk is the inverse of delete-on-write: if the cache write succeeds but a later DB write in the same logical operation fails, or two writers race, the cache can hold a value the DB never committed. Cache-aside-with-delete is the safer default; reach for write-through only when the warm-cache win is worth the coordination.
Rate limiting
Section titled “Rate limiting”Rate limiting is the other canonical Redis job. The counter lives in Redis so
every instance of your service shares one limit per client — which is the whole
point, and what an in-memory express-rate-limit or golang.org/x/time/rate
limiter can’t do across a fleet.
The non-negotiable property: the check must be atomic. A read-then-write race
(read count → decide → increment) lets two concurrent requests both see “9 of 10”
and both proceed. The fix is either a single atomic command (INCR) or a Lua
script, which Redis runs as one indivisible operation.
| Algorithm | Data structure | Accuracy | Cost | Notes |
|---|---|---|---|---|
| Fixed window | INCR + EXPIRE | low (boundary burst) | 1 key | cheapest; allows 2× at edges |
| Sliding window | sorted set of timestamps | high | O(log n) per req | precise; what most APIs want |
| Token bucket | hash (tokens + last-refill) | high, allows bursts | small Lua | smooth refill + burst capacity |
Fixed window
Section titled “Fixed window”Bucket the clock into slices of window seconds, INCR the bucket, set EXPIRE on
the first hit. Cheap, but a client can fire limit requests at the end of one window
and limit more at the start of the next — 2 × limit in a heartbeat.
import time
FIXED_WINDOW_LUA = """local count = redis.call('INCR', KEYS[1])if count == 1 then redis.call('EXPIRE', KEYS[1], ARGV[1])endlocal ttl = redis.call('TTL', KEYS[1])return {count, ttl}"""
async def fixed_window_allow(r, client_id: str, limit: int, window: int) -> bool: bucket = int(time.time()) // window key = f"rate:fixed:{client_id}:{bucket}" count, _ttl = await r.eval(FIXED_WINDOW_LUA, 1, key, window) return count <= limitSliding window (sorted sets + Lua)
Section titled “Sliding window (sorted sets + Lua)”Keep one sorted set per client where each member is a request and its score is the
timestamp. On each request: drop members older than the window (ZREMRANGEBYSCORE),
count what’s left (ZCARD), reject if at limit, else add the request and refresh the
TTL. Precise, and the whole sequence is one atomic Lua script.
import timeimport uuid
SLIDING_WINDOW_LUA = """-- KEYS[1] = zset key-- ARGV: now_ms, window_ms, limit, member, ttl_secondslocal now = tonumber(ARGV[1])local window = tonumber(ARGV[2])local limit = tonumber(ARGV[3])
redis.call('ZREMRANGEBYSCORE', KEYS[1], 0, now - window)local count = redis.call('ZCARD', KEYS[1])if count >= limit then local oldest = redis.call('ZRANGE', KEYS[1], 0, 0, 'WITHSCORES') return {0, oldest[2]}endredis.call('ZADD', KEYS[1], now, ARGV[4])redis.call('EXPIRE', KEYS[1], ARGV[5])return {1, 0}"""
async def sliding_window_allow(r, client_id: str, limit: int, window: int) -> bool: now_ms = int(time.time() * 1000) member = f"{now_ms}-{uuid.uuid4()}" # unique: two reqs in the same ms must not collide allowed, _oldest = await r.eval( SLIDING_WINDOW_LUA, 1, f"rate:sliding:{client_id}", now_ms, window * 1000, limit, member, window, ) return bool(allowed)Token bucket
Section titled “Token bucket”A bucket holds up to capacity tokens, refilled at rate tokens/second. Each request
spends one token; an empty bucket rejects. It allows bursts (up to capacity) while
enforcing a long-run average — the model behind golang.org/x/time/rate. Store
tokens and last_refill in a hash and reconcile lazily in Lua:
-- KEYS[1] = bucket hash ARGV: capacity, refill_per_sec, now, requestedlocal capacity = tonumber(ARGV[1])local refill = tonumber(ARGV[2])local now = tonumber(ARGV[3])local want = tonumber(ARGV[4])
local data = redis.call('HMGET', KEYS[1], 'tokens', 'ts')local tokens = tonumber(data[1]) or capacitylocal ts = tonumber(data[2]) or now
tokens = math.min(capacity, tokens + (now - ts) * refill) -- lazy refilllocal allowed = tokens >= wantif allowed then tokens = tokens - want end
redis.call('HMSET', KEYS[1], 'tokens', tokens, 'ts', now)redis.call('EXPIRE', KEYS[1], math.ceil(capacity / refill) * 2)return allowed and 1 or 0As a FastAPI dependency
Section titled “As a FastAPI dependency”In Express you’d register rate-limit middleware globally; in Go you’d wrap your
http.Handler. FastAPI’s idiomatic unit is a dependency — it runs before the
route, can read the request, and can short-circuit by raising HTTPException. A
dependency targets specific routes precisely; a middleware applies to everything.
import rateLimit from "express-rate-limit"; // in-memory by default — NOT shared across instances
app.use("/api", rateLimit({ windowMs: 60_000, max: 10 }));func RateLimit(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { if !allow(req.Context(), clientID(req), 10, 60) { w.Header().Set("Retry-After", "60") http.Error(w, "Too Many Requests", http.StatusTooManyRequests) return } next.ServeHTTP(w, req) })}from typing import Annotatedfrom fastapi import Depends, HTTPException, Request, status
def rate_limit(limit: int = 10, window: int = 60): """Factory -> a FastAPI dependency. Add it to any route that needs limiting."""
async def dependency(request: Request) -> None: r = request.app.state.redis client_id = request.headers.get("x-api-key") or request.client.host if not await sliding_window_allow(r, client_id, limit, window): raise HTTPException( status_code=status.HTTP_429_TOO_MANY_REQUESTS, detail="Rate limit exceeded", headers={"Retry-After": str(window)}, )
return dependency
@app.get("/api/search", dependencies=[Depends(rate_limit(limit=5, window=60))])async def search(q: str) -> list[str]: ...Pub/Sub
Section titled “Pub/Sub”Redis Pub/Sub is fire-and-forget real-time messaging: publish to a channel, every current subscriber gets it, and if nobody’s listening the message vanishes. It’s perfect for cache invalidation across instances and live UI nudges — and wrong for anything that must not be lost.
The async client gives you a pubsub() object you consume with async for — the
same async for you learned for streams in
Module 06.
const pub = new Redis();await pub.publish("invalidate", JSON.stringify({ key: "product:1" }));
const sub = new Redis(); // a subscriber connection is dedicated — can't run other cmdssub.subscribe("invalidate");sub.on("message", (_chan, msg) => cache.delete(JSON.parse(msg).key));rdb.Publish(ctx, "invalidate", `{"key":"product:1"}`)
sub := rdb.Subscribe(ctx, "invalidate")for msg := range sub.Channel() { cache.Delete(parseKey(msg.Payload))}# --- Publisher (any client) ---await r.publish("invalidate", json.dumps({"key": "product:1"}))
# --- Subscriber (typically a background task in the lifespan) ---async def invalidation_listener(r) -> None: async with r.pubsub() as pubsub: await pubsub.subscribe("invalidate") async for message in pubsub.listen(): if message["type"] != "message": # skip the "subscribe" ack continue payload = json.loads(message["data"]) local_cache.pop(payload["key"], None)A note on Redis Streams
Section titled “A note on Redis Streams”Pub/Sub drops messages with no live subscriber. Redis Streams (XADD / XREAD
/ consumer groups via XREADGROUP) is the durable, replayable cousin: messages
persist, late consumers can read history, and consumer groups give you Kafka-style
partitioned, acknowledged consumption with redelivery of un-acked messages. It’s a
genuine lightweight event log.
But if you need durable events as a first-class architectural concern — ordering guarantees, long retention, many consumer groups, real throughput — reach for Kafka, covered next in Module 11. Rule of thumb: Streams for lightweight intra-app work queues you already have Redis for; Kafka when the event log is the system.
Distributed locks & idempotency
Section titled “Distributed locks & idempotency”A simple lock with SET NX
Section titled “A simple lock with SET NX”A distributed lock coordinates “only one worker does X at a time” across instances.
The primitive is SET key token NX EX ttl: set-if-not-exists, with a TTL so a
crashed holder doesn’t deadlock the lock forever. Release by deleting only if you
still own it — checked atomically in Lua, because a naive GET-then-DEL can
delete a lock another worker acquired after yours expired.
import uuidfrom contextlib import asynccontextmanager
RELEASE_LUA = """if redis.call('GET', KEYS[1]) == ARGV[1] then return redis.call('DEL', KEYS[1])endreturn 0"""
@asynccontextmanagerasync def redis_lock(r, name: str, ttl: int = 10): token = str(uuid.uuid4()) # proves ownership on release acquired = await r.set(f"lock:{name}", token, nx=True, ex=ttl) if not acquired: raise RuntimeError(f"could not acquire lock {name}") try: yield finally: await r.eval(RELEASE_LUA, 1, f"lock:{name}", token) # release iff still oursIdempotency keys
Section titled “Idempotency keys”The flip side of locking: make a request safe to retry. The client sends an
Idempotency-Key header; you SET NX it before doing the work and return the stored
response on a replay. This is how Stripe-style “retrying a charge won’t double-charge”
works, and it’s a handful of Redis commands.
async def idempotent(request: Request) -> str | None: key = request.headers.get("idempotency-key") if key is None: return None r = request.app.state.redis # Reserve the key; if it already holds a response, this returns it. first = await r.set(f"idem:{key}", "in-progress", nx=True, ex=86400) if not first: cached = await r.get(f"idem:{key}") raise HTTPException(409, detail=f"duplicate request (status: {cached})") return key # On success, the handler overwrites idem:{key} with the serialized response.Gotchas TS/Go devs hit
Section titled “Gotchas TS/Go devs hit”Thundering herd / cache stampede
Section titled “Thundering herd / cache stampede”When a hot key expires, every concurrent request misses at once and stampedes the
database with the same expensive query — the thundering herd. The fix is
single-flight: only one caller recomputes; the rest wait for that result. In Go
this is golang.org/x/sync/singleflight; in Python you build it with an
asyncio.Lock per key (in-process) plus a short Redis SET NX lock (cross-process).
import asyncio
_locks: dict[str, asyncio.Lock] = {}
async def get_single_flight(r, db, product_id: int) -> Product | None: key = f"product:{product_id}" cached = await r.get(key) if cached is not None: return Product.model_validate_json(cached)
# Per-key in-process lock: collapse concurrent misses in THIS worker to one load. lock = _locks.setdefault(key, asyncio.Lock()) async with lock: cached = await r.get(key) # double-check: someone may have filled it if cached is not None: return Product.model_validate_json(cached) product = await load_from_db(db, product_id) if product is not None: await r.set(key, product.model_dump_json(), ex=300) return productFor cross-instance herds, add jitter to your TTLs (ex=300 + random.randint(0, 30))
so a million keys set in the same deploy don’t all expire in the same second, and/or
gate the recompute with the Redis SET NX lock from above.
The rest of the checklist
Section titled “The rest of the checklist”| Gotcha | What bites you | Discipline |
|---|---|---|
| Stale cache | serving old data after a write | delete key after DB commit; short TTL as a backstop |
| No TTL | keys live forever, Redis OOMs | every cache key gets an ex=; reserve no-TTL for durable state |
| TTL stampede | mass-set keys expire together | add random jitter to TTLs |
bytes vs str | if value == "x" silently false | decode_responses=True (or decode explicitly) |
| Empty vs missing | []/{} are falsy like missing | check await r.exists(k) when it matters |
pickle | RCE from a poisoned entry | JSON only across trust boundaries |
Caching None | one miss poisons the cache for the TTL | decide explicitly whether negatives are cached, and use a short TTL if so |
Practice
Section titled “Practice”Wire cache-aside and a sliding-window limiter onto a real FastAPI service against the shared Redis instance.