Skip to content

External API Client with Circuit Breaker

Problem: ScoopJoy integrates with a third-party dairy-supplier inventory API that’s unreliable and rate-limited. Hammering it with retries when it’s already down wastes worker time and risks getting your API key banned.

Solution: Build a reusable API client with Redis-backed caching, a circuit breaker, per-endpoint rate limiting, and exponential-backoff retries. The circuit breaker is the heart of it: after enough consecutive failures it stops calling the service entirely, then probes for recovery before reopening the floodgates.

The client and its consumers live in a few small modules under the scoopjoy app:

  • Directoryapps/scoopjoy/scoopjoy/
    • Directoryutils/
      • api_client.py the resilient client + circuit breaker + rate limiter
    • Directoryintegrations/
      • supplier_sync.py the background job that uses the client
    • hooks.py registers the scheduler event

The breaker is a three-state machine. It starts closed (calls flow through). After failure_threshold failures it trips to open and rejects calls outright. Once recovery_timeout elapses it moves to half_open and allows a single probe request: success closes the circuit again, failure reopens it.

Circuit breaker state machine
Rendering diagram…

The whole client lives in one module. The CircuitBreaker and APIRateLimiter classes both persist their state in frappe.cache (Redis) so every Gunicorn worker shares the same view, and ExternalAPIClient wires them together with retries.

The CircuitBreaker stores {"state", "failures", "last_failure"} as JSON in Redis. can_execute() is where the state transitions happen: an open circuit flips to half_open once the recovery window passes and lets a single probe through.

scoopjoy/scoopjoy/utils/api_client.py
import frappe
import requests
import json
import time
import hashlib
from frappe.utils import cint
class CircuitBreaker:
"""
Circuit breaker: stops calling a failing service after N failures.
States: CLOSED (normal) -> OPEN (reject calls) -> HALF_OPEN (test recovery).
"""
STATE_CLOSED = "closed"
STATE_OPEN = "open"
STATE_HALF_OPEN = "half_open"
def __init__(self, name, failure_threshold=5, recovery_timeout=60):
self.name = name
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout # seconds
self._cache_key = f"circuit_breaker:{name}"
def _get_state(self):
data = frappe.cache.get_value(self._cache_key)
if not data:
return {"state": self.STATE_CLOSED, "failures": 0, "last_failure": 0}
return json.loads(data)
def _set_state(self, state_data):
frappe.cache.set_value(
self._cache_key,
json.dumps(state_data),
expires_in_sec=self.recovery_timeout * 10,
)
def can_execute(self):
"""Check if the circuit allows a request."""
state = self._get_state()
if state["state"] == self.STATE_CLOSED:
return True
if state["state"] == self.STATE_OPEN:
# Has the recovery timeout elapsed? If so, allow one probe.
if time.time() - state["last_failure"] > self.recovery_timeout:
state["state"] = self.STATE_HALF_OPEN
self._set_state(state)
return True
return False
if state["state"] == self.STATE_HALF_OPEN:
return True # Allow the test request
return False
def record_success(self):
"""Reset circuit on a successful call."""
self._set_state({
"state": self.STATE_CLOSED,
"failures": 0,
"last_failure": 0,
})
def record_failure(self):
"""Increment failure count; open circuit if threshold reached."""
state = self._get_state()
state["failures"] = state.get("failures", 0) + 1
state["last_failure"] = time.time()
if state["failures"] >= self.failure_threshold:
state["state"] = self.STATE_OPEN
self._set_state(state)
class APIRateLimiter:
"""Fixed-window rate limiter for outgoing API calls."""
def __init__(self, name, max_requests=30, window_seconds=60):
self.name = name
self.max_requests = max_requests
self.window_seconds = window_seconds
self._cache_key = f"api_rate_limit:{name}"
def acquire(self):
"""Returns True if the request is allowed, False if rate-limited."""
window = int(time.time() // self.window_seconds)
key = f"{self._cache_key}:{window}"
current = cint(frappe.cache.get_value(key))
if current >= self.max_requests:
return False
frappe.cache.set_value(key, current + 1, expires_in_sec=self.window_seconds)
return True
def wait_time(self):
"""Seconds until the current window resets."""
window_start = int(time.time() // self.window_seconds) * self.window_seconds
return (window_start + self.window_seconds) - time.time()
class ExternalAPIClient:
"""
Resilient API client with caching, circuit breaker, rate limiting, and retry.
Usage:
client = ExternalAPIClient(
base_url="https://api.supplier.example.com/v1",
api_key="sk-xxxx",
name="supplier_api",
)
products = client.get("/products", cache_ttl=300) # cached GET
result = client.post("/orders", json={"sku": "VAN-001", "qty": 100})
"""
def __init__(
self,
base_url,
api_key=None,
name="external_api",
max_retries=3,
timeout=30,
rate_limit_requests=30,
rate_limit_window=60,
circuit_failure_threshold=5,
circuit_recovery_timeout=120,
):
self.base_url = base_url.rstrip("/")
self.api_key = api_key
self.name = name
self.max_retries = max_retries
self.timeout = timeout
self.session = requests.Session()
self.session.headers.update({
"User-Agent": "ScoopJoy-ERP/1.0",
"Accept": "application/json",
})
if api_key:
self.session.headers["Authorization"] = f"Bearer {api_key}"
# Resilience components, both backed by frappe.cache (Redis)
self.circuit = CircuitBreaker(
name=name,
failure_threshold=circuit_failure_threshold,
recovery_timeout=circuit_recovery_timeout,
)
self.rate_limiter = APIRateLimiter(
name=name,
max_requests=rate_limit_requests,
window_seconds=rate_limit_window,
)
def _cache_key(self, method, endpoint, params=None):
"""Generate a deterministic cache key."""
key_data = f"{method}:{self.base_url}{endpoint}:{json.dumps(params or {}, sort_keys=True)}"
return f"api_cache:{self.name}:{hashlib.md5(key_data.encode()).hexdigest()}"
def _log_request(self, method, endpoint, status_code=None, error=None, duration=None):
"""Log an API request for debugging and monitoring."""
frappe.logger(self.name).info(
f"{method} {endpoint} -> {status_code or 'ERROR'} ({duration:.2f}s)"
+ (f" ERROR: {error}" if error else "")
)
def _request(self, method, endpoint, cache_ttl=0, **kwargs):
"""Core request method with all resilience patterns."""
# 1. Circuit breaker gate
if not self.circuit.can_execute():
raise ConnectionError(
f"Circuit breaker OPEN for {self.name}. "
f"Service appears down. Retry after {self.circuit.recovery_timeout}s."
)
# 2. Rate limiter gate
if not self.rate_limiter.acquire():
wait = self.rate_limiter.wait_time()
raise ConnectionError(
f"Rate limit exceeded for {self.name}. Try again in {wait:.0f}s."
)
# 3. Cache lookup (GET only)
if method == "GET" and cache_ttl > 0:
cache_key = self._cache_key(method, endpoint, kwargs.get("params"))
cached = frappe.cache.get_value(cache_key)
if cached:
return json.loads(cached)
# 4. Execute with retry + exponential backoff
url = f"{self.base_url}{endpoint}"
last_exception = None
for attempt in range(self.max_retries + 1):
start = time.time()
try:
response = self.session.request(
method, url, timeout=self.timeout, **kwargs
)
duration = time.time() - start
self._log_request(method, endpoint, response.status_code, duration=duration)
response.raise_for_status()
self.circuit.record_success()
result = response.json()
if method == "GET" and cache_ttl > 0:
frappe.cache.set_value(
cache_key,
json.dumps(result),
expires_in_sec=cache_ttl,
)
return result
except requests.exceptions.HTTPError as e:
duration = time.time() - start
status = e.response.status_code if e.response else None
# Don't retry 4xx client errors (except 429) — they won't fix themselves
if status and 400 <= status < 500 and status != 429:
self.circuit.record_success() # Not a server failure
self._log_request(method, endpoint, status, str(e), duration)
raise
last_exception = e
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e:
duration = time.time() - start
self._log_request(method, endpoint, error=str(e), duration=duration)
last_exception = e
# A server/connection failure counts against the circuit breaker
self.circuit.record_failure()
# Exponential backoff before the next attempt
if attempt < self.max_retries:
delay = (2 ** attempt) + (time.time() % 1) # add jitter
time.sleep(delay)
raise last_exception or ConnectionError(
f"Failed after {self.max_retries + 1} attempts"
)
def get(self, endpoint, cache_ttl=0, **kwargs):
return self._request("GET", endpoint, cache_ttl=cache_ttl, **kwargs)
def post(self, endpoint, **kwargs):
return self._request("POST", endpoint, **kwargs)
def put(self, endpoint, **kwargs):
return self._request("PUT", endpoint, **kwargs)
def delete(self, endpoint, **kwargs):
return self._request("DELETE", endpoint, **kwargs)

Note how the gates run in order: circuit breaker, then rate limiter, then cache. A 4xx client error (other than 429) calls record_success() and re-raises — a bad request is your bug, not a sign the supplier is down, so it must not trip the breaker. Connection errors, timeouts, 5xx, and 429 all count as failures and feed the retry loop with jittered exponential backoff.

Step 2: Using the client — supplier stock sync

Section titled “Step 2: Using the client — supplier stock sync”

A factory builds a client configured from ScoopJoy Settings (note the get_password call to read the encrypted API key), and the background job pages through the supplier’s inventory, updating matching Item records.

scoopjoy/scoopjoy/integrations/supplier_sync.py
import frappe
from frappe.utils import flt
from scoopjoy.utils.api_client import ExternalAPIClient
def get_supplier_client():
"""Factory: create a configured API client for the dairy supplier."""
settings = frappe.get_cached_doc("ScoopJoy Settings")
return ExternalAPIClient(
base_url=settings.supplier_api_url,
api_key=settings.get_password("supplier_api_key"),
name="dairy_supplier",
rate_limit_requests=20,
rate_limit_window=60,
circuit_failure_threshold=3,
circuit_recovery_timeout=300,
)
def sync_supplier_stock():
"""Background job: fetch stock levels from the supplier and update Items."""
client = get_supplier_client()
page = 1
synced = 0
errors = []
while True:
try:
data = client.get(
"/inventory",
params={"page": page, "per_page": 100},
cache_ttl=120, # cache each page for 2 min
)
except ConnectionError as e:
# Circuit open, rate-limited, or out of retries — stop cleanly
frappe.log_error(title="Supplier Sync: API Error", message=str(e))
break
products = data.get("products", [])
if not products:
break
for product in products:
try:
supplier_sku = product["sku"]
available_qty = flt(product["available_qty"])
unit_price = flt(product["unit_price"])
# Find the matching Item by supplier SKU
item_code = frappe.db.get_value(
"Item Supplier",
{"supplier_part_no": supplier_sku},
"parent",
)
if not item_code:
continue
frappe.db.set_value("Item", item_code, {
"custom_supplier_stock_qty": available_qty,
"custom_supplier_unit_price": unit_price,
"custom_supplier_stock_updated_at": frappe.utils.now_datetime(),
})
if available_qty < 50:
_create_low_stock_alert(item_code, supplier_sku, available_qty)
synced += 1
except Exception as e:
errors.append(f"{product.get('sku')}: {e}")
page += 1
frappe.logger("supplier_sync").info(
f"Supplier stock sync complete: {synced} items synced, {len(errors)} errors"
)
if errors:
frappe.log_error(
title="Supplier Sync Errors",
message="\n".join(errors[:50]), # limit log size
)
return {"synced": synced, "errors": len(errors)}
def _create_low_stock_alert(item_code, supplier_sku, qty):
"""Create a ToDo alert for the procurement team."""
existing = frappe.db.exists("ToDo", {
"reference_type": "Item",
"reference_name": item_code,
"status": "Open",
"description": ["like", "%supplier stock low%"],
})
if not existing:
frappe.get_doc({
"doctype": "ToDo",
"description": f"Supplier stock low for {item_code} (SKU: {supplier_sku}): only {qty} units left",
"reference_type": "Item",
"reference_name": item_code,
"allocated_to": frappe.db.get_single_value("ScoopJoy Settings", "procurement_manager"),
"priority": "High",
}).insert(ignore_permissions=True)

When the breaker is open the very first client.get(...) raises ConnectionError, the loop breaks immediately, and the job logs and exits instead of grinding through hundreds of doomed retries.

Register the sync as a cron-based scheduler event so it runs every four hours. See Scheduled Tasks for more on scheduler events.

scoopjoy/scoopjoy/hooks.py
scheduler_events = {
"cron": {
"0 */4 * * *": [ # every 4 hours
"scoopjoy.integrations.supplier_sync.sync_supplier_stock",
],
},
}