External API Client with Circuit Breaker
Problem: ScoopJoy integrates with a third-party dairy-supplier inventory API that’s unreliable and rate-limited. Hammering it with retries when it’s already down wastes worker time and risks getting your API key banned.
Solution: Build a reusable API client with Redis-backed caching, a circuit breaker, per-endpoint rate limiting, and exponential-backoff retries. The circuit breaker is the heart of it: after enough consecutive failures it stops calling the service entirely, then probes for recovery before reopening the floodgates.
The client and its consumers live in a few small modules under the scoopjoy app:
Directoryapps/scoopjoy/scoopjoy/
Directoryutils/
- api_client.py the resilient client + circuit breaker + rate limiter
Directoryintegrations/
- supplier_sync.py the background job that uses the client
- hooks.py registers the scheduler event
Circuit breaker states
Section titled “Circuit breaker states”The breaker is a three-state machine. It starts closed (calls flow through). After
failure_threshold failures it trips to open and rejects calls outright. Once
recovery_timeout elapses it moves to half_open and allows a single probe request:
success closes the circuit again, failure reopens it.
stateDiagram-v2 [*] --> Closed Closed --> Open: failures >= threshold Open --> HalfOpen: recovery_timeout elapsed HalfOpen --> Closed: probe succeeds HalfOpen --> Open: probe fails Closed --> Closed: success (reset count)
Step 1: The resilient client
Section titled “Step 1: The resilient client”The whole client lives in one module. The CircuitBreaker and APIRateLimiter
classes both persist their state in frappe.cache (Redis) so every Gunicorn worker
shares the same view, and ExternalAPIClient wires them together with retries.
The CircuitBreaker stores {"state", "failures", "last_failure"} as JSON in Redis.
can_execute() is where the state transitions happen: an open circuit flips to
half_open once the recovery window passes and lets a single probe through.
import frappeimport requestsimport jsonimport timeimport hashlibfrom frappe.utils import cint
class CircuitBreaker: """ Circuit breaker: stops calling a failing service after N failures. States: CLOSED (normal) -> OPEN (reject calls) -> HALF_OPEN (test recovery). """
STATE_CLOSED = "closed" STATE_OPEN = "open" STATE_HALF_OPEN = "half_open"
def __init__(self, name, failure_threshold=5, recovery_timeout=60): self.name = name self.failure_threshold = failure_threshold self.recovery_timeout = recovery_timeout # seconds self._cache_key = f"circuit_breaker:{name}"
def _get_state(self): data = frappe.cache.get_value(self._cache_key) if not data: return {"state": self.STATE_CLOSED, "failures": 0, "last_failure": 0} return json.loads(data)
def _set_state(self, state_data): frappe.cache.set_value( self._cache_key, json.dumps(state_data), expires_in_sec=self.recovery_timeout * 10, )
def can_execute(self): """Check if the circuit allows a request.""" state = self._get_state()
if state["state"] == self.STATE_CLOSED: return True
if state["state"] == self.STATE_OPEN: # Has the recovery timeout elapsed? If so, allow one probe. if time.time() - state["last_failure"] > self.recovery_timeout: state["state"] = self.STATE_HALF_OPEN self._set_state(state) return True return False
if state["state"] == self.STATE_HALF_OPEN: return True # Allow the test request
return False
def record_success(self): """Reset circuit on a successful call.""" self._set_state({ "state": self.STATE_CLOSED, "failures": 0, "last_failure": 0, })
def record_failure(self): """Increment failure count; open circuit if threshold reached.""" state = self._get_state() state["failures"] = state.get("failures", 0) + 1 state["last_failure"] = time.time()
if state["failures"] >= self.failure_threshold: state["state"] = self.STATE_OPEN
self._set_state(state)
class APIRateLimiter: """Fixed-window rate limiter for outgoing API calls."""
def __init__(self, name, max_requests=30, window_seconds=60): self.name = name self.max_requests = max_requests self.window_seconds = window_seconds self._cache_key = f"api_rate_limit:{name}"
def acquire(self): """Returns True if the request is allowed, False if rate-limited.""" window = int(time.time() // self.window_seconds) key = f"{self._cache_key}:{window}" current = cint(frappe.cache.get_value(key))
if current >= self.max_requests: return False
frappe.cache.set_value(key, current + 1, expires_in_sec=self.window_seconds) return True
def wait_time(self): """Seconds until the current window resets.""" window_start = int(time.time() // self.window_seconds) * self.window_seconds return (window_start + self.window_seconds) - time.time()
class ExternalAPIClient: """ Resilient API client with caching, circuit breaker, rate limiting, and retry.
Usage: client = ExternalAPIClient( base_url="https://api.supplier.example.com/v1", api_key="sk-xxxx", name="supplier_api", ) products = client.get("/products", cache_ttl=300) # cached GET result = client.post("/orders", json={"sku": "VAN-001", "qty": 100}) """
def __init__( self, base_url, api_key=None, name="external_api", max_retries=3, timeout=30, rate_limit_requests=30, rate_limit_window=60, circuit_failure_threshold=5, circuit_recovery_timeout=120, ): self.base_url = base_url.rstrip("/") self.api_key = api_key self.name = name self.max_retries = max_retries self.timeout = timeout self.session = requests.Session()
self.session.headers.update({ "User-Agent": "ScoopJoy-ERP/1.0", "Accept": "application/json", }) if api_key: self.session.headers["Authorization"] = f"Bearer {api_key}"
# Resilience components, both backed by frappe.cache (Redis) self.circuit = CircuitBreaker( name=name, failure_threshold=circuit_failure_threshold, recovery_timeout=circuit_recovery_timeout, ) self.rate_limiter = APIRateLimiter( name=name, max_requests=rate_limit_requests, window_seconds=rate_limit_window, )
def _cache_key(self, method, endpoint, params=None): """Generate a deterministic cache key.""" key_data = f"{method}:{self.base_url}{endpoint}:{json.dumps(params or {}, sort_keys=True)}" return f"api_cache:{self.name}:{hashlib.md5(key_data.encode()).hexdigest()}"
def _log_request(self, method, endpoint, status_code=None, error=None, duration=None): """Log an API request for debugging and monitoring.""" frappe.logger(self.name).info( f"{method} {endpoint} -> {status_code or 'ERROR'} ({duration:.2f}s)" + (f" ERROR: {error}" if error else "") )
def _request(self, method, endpoint, cache_ttl=0, **kwargs): """Core request method with all resilience patterns."""
# 1. Circuit breaker gate if not self.circuit.can_execute(): raise ConnectionError( f"Circuit breaker OPEN for {self.name}. " f"Service appears down. Retry after {self.circuit.recovery_timeout}s." )
# 2. Rate limiter gate if not self.rate_limiter.acquire(): wait = self.rate_limiter.wait_time() raise ConnectionError( f"Rate limit exceeded for {self.name}. Try again in {wait:.0f}s." )
# 3. Cache lookup (GET only) if method == "GET" and cache_ttl > 0: cache_key = self._cache_key(method, endpoint, kwargs.get("params")) cached = frappe.cache.get_value(cache_key) if cached: return json.loads(cached)
# 4. Execute with retry + exponential backoff url = f"{self.base_url}{endpoint}" last_exception = None
for attempt in range(self.max_retries + 1): start = time.time() try: response = self.session.request( method, url, timeout=self.timeout, **kwargs ) duration = time.time() - start self._log_request(method, endpoint, response.status_code, duration=duration)
response.raise_for_status() self.circuit.record_success()
result = response.json()
if method == "GET" and cache_ttl > 0: frappe.cache.set_value( cache_key, json.dumps(result), expires_in_sec=cache_ttl, )
return result
except requests.exceptions.HTTPError as e: duration = time.time() - start status = e.response.status_code if e.response else None
# Don't retry 4xx client errors (except 429) — they won't fix themselves if status and 400 <= status < 500 and status != 429: self.circuit.record_success() # Not a server failure self._log_request(method, endpoint, status, str(e), duration) raise
last_exception = e
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as e: duration = time.time() - start self._log_request(method, endpoint, error=str(e), duration=duration) last_exception = e
# A server/connection failure counts against the circuit breaker self.circuit.record_failure()
# Exponential backoff before the next attempt if attempt < self.max_retries: delay = (2 ** attempt) + (time.time() % 1) # add jitter time.sleep(delay)
raise last_exception or ConnectionError( f"Failed after {self.max_retries + 1} attempts" )
def get(self, endpoint, cache_ttl=0, **kwargs): return self._request("GET", endpoint, cache_ttl=cache_ttl, **kwargs)
def post(self, endpoint, **kwargs): return self._request("POST", endpoint, **kwargs)
def put(self, endpoint, **kwargs): return self._request("PUT", endpoint, **kwargs)
def delete(self, endpoint, **kwargs): return self._request("DELETE", endpoint, **kwargs)Note how the gates run in order: circuit breaker, then rate limiter, then cache. A
4xx client error (other than 429) calls record_success() and re-raises — a bad
request is your bug, not a sign the supplier is down, so it must not trip the
breaker. Connection errors, timeouts, 5xx, and 429 all count as failures and feed
the retry loop with jittered exponential backoff.
Step 2: Using the client — supplier stock sync
Section titled “Step 2: Using the client — supplier stock sync”A factory builds a client configured from ScoopJoy Settings (note the
get_password call to read the encrypted API key), and the background job pages
through the supplier’s inventory, updating matching Item records.
import frappefrom frappe.utils import fltfrom scoopjoy.utils.api_client import ExternalAPIClient
def get_supplier_client(): """Factory: create a configured API client for the dairy supplier.""" settings = frappe.get_cached_doc("ScoopJoy Settings") return ExternalAPIClient( base_url=settings.supplier_api_url, api_key=settings.get_password("supplier_api_key"), name="dairy_supplier", rate_limit_requests=20, rate_limit_window=60, circuit_failure_threshold=3, circuit_recovery_timeout=300, )
def sync_supplier_stock(): """Background job: fetch stock levels from the supplier and update Items.""" client = get_supplier_client() page = 1 synced = 0 errors = []
while True: try: data = client.get( "/inventory", params={"page": page, "per_page": 100}, cache_ttl=120, # cache each page for 2 min ) except ConnectionError as e: # Circuit open, rate-limited, or out of retries — stop cleanly frappe.log_error(title="Supplier Sync: API Error", message=str(e)) break
products = data.get("products", []) if not products: break
for product in products: try: supplier_sku = product["sku"] available_qty = flt(product["available_qty"]) unit_price = flt(product["unit_price"])
# Find the matching Item by supplier SKU item_code = frappe.db.get_value( "Item Supplier", {"supplier_part_no": supplier_sku}, "parent", ) if not item_code: continue
frappe.db.set_value("Item", item_code, { "custom_supplier_stock_qty": available_qty, "custom_supplier_unit_price": unit_price, "custom_supplier_stock_updated_at": frappe.utils.now_datetime(), })
if available_qty < 50: _create_low_stock_alert(item_code, supplier_sku, available_qty)
synced += 1
except Exception as e: errors.append(f"{product.get('sku')}: {e}")
page += 1
frappe.logger("supplier_sync").info( f"Supplier stock sync complete: {synced} items synced, {len(errors)} errors" )
if errors: frappe.log_error( title="Supplier Sync Errors", message="\n".join(errors[:50]), # limit log size )
return {"synced": synced, "errors": len(errors)}
def _create_low_stock_alert(item_code, supplier_sku, qty): """Create a ToDo alert for the procurement team.""" existing = frappe.db.exists("ToDo", { "reference_type": "Item", "reference_name": item_code, "status": "Open", "description": ["like", "%supplier stock low%"], }) if not existing: frappe.get_doc({ "doctype": "ToDo", "description": f"Supplier stock low for {item_code} (SKU: {supplier_sku}): only {qty} units left", "reference_type": "Item", "reference_name": item_code, "allocated_to": frappe.db.get_single_value("ScoopJoy Settings", "procurement_manager"), "priority": "High", }).insert(ignore_permissions=True)When the breaker is open the very first client.get(...) raises ConnectionError,
the loop breaks immediately, and the job logs and exits instead of grinding through
hundreds of doomed retries.
Step 3: Schedule it
Section titled “Step 3: Schedule it”Register the sync as a cron-based scheduler event so it runs every four hours. See Scheduled Tasks for more on scheduler events.
scheduler_events = { "cron": { "0 */4 * * *": [ # every 4 hours "scoopjoy.integrations.supplier_sync.sync_supplier_stock", ], },}