Database Transaction Patterns
Problem: A ScoopJoy franchise provisioning routine touches several documents at once — warehouses, cost centers, loyalty programs, stock bins. You need partial rollback for the optional parts, atomic stock updates that survive concurrency, deadlock recovery, and a clean way to pass data between lifecycle hooks.
Solution: Lean on Frappe’s transaction primitives — frappe.db.savepoint,
SELECT ... FOR UPDATE, a deadlock-retry wrapper, and frappe.flags — and know
exactly when Frappe commits for you versus when you must commit yourself.
Pattern 1: Savepoint for partial rollback
Section titled “Pattern 1: Savepoint for partial rollback”Required infrastructure must all succeed or the whole request rolls back. Optional extras are wrapped in their own savepoint, so a failure rolls back only that block and the routine carries on.
import frappefrom frappe import _
def provision_franchise_with_optional_extras(agreement_name): """ Create required infrastructure (must succeed) and optional extras (can fail). Uses savepoints to allow partial rollback of the optional parts. """ agreement = frappe.get_doc("Franchise Agreement", agreement_name)
# ── Required: these must all succeed or the whole thing rolls back ── create_warehouse(agreement) create_cost_center(agreement)
# ── Optional: loyalty program setup — failure is acceptable ── try: frappe.db.savepoint("loyalty_setup") create_loyalty_program(agreement) except Exception as e: frappe.db.rollback(save_point="loyalty_setup") frappe.logger("scoopjoy").warning( f"Loyalty setup failed for {agreement_name}, continuing: {e}" )
# ── Optional: marketing templates — failure is acceptable ── try: frappe.db.savepoint("marketing_setup") create_marketing_templates(agreement) except Exception as e: frappe.db.rollback(save_point="marketing_setup") frappe.logger("scoopjoy").warning( f"Marketing setup failed for {agreement_name}, continuing: {e}" )
# At this point: warehouse + cost center are guaranteed. # Loyalty and marketing may or may not exist. frappe.db.commit()
def create_warehouse(agreement): abbr = frappe.db.get_value("Company", agreement.company, "abbr") frappe.get_doc({ "doctype": "Warehouse", "warehouse_name": f"{agreement.franchise_name} - {agreement.territory}", "company": agreement.company, "parent_warehouse": f"Stores - {abbr}", }).insert(ignore_permissions=True)
def create_cost_center(agreement): abbr = frappe.db.get_value("Company", agreement.company, "abbr") frappe.get_doc({ "doctype": "Cost Center", "cost_center_name": f"{agreement.franchise_name} - {agreement.territory}", "company": agreement.company, "parent_cost_center": f"{agreement.company} - {abbr}", }).insert(ignore_permissions=True)
def create_loyalty_program(agreement): frappe.get_doc({ "doctype": "Loyalty Program", "loyalty_program_name": f"ScoopJoy Rewards - {agreement.territory}", "company": agreement.company, "custom_franchise_agreement": agreement.name, "collection_rules": [{ "tier_name": "Bronze", "collection_factor": 1, "minimum_total_spent": 0, }], }).insert(ignore_permissions=True)
def create_marketing_templates(agreement): frappe.get_doc({ "doctype": "Email Template", "name": f"Welcome - {agreement.franchise_name}", "subject": f"Welcome to ScoopJoy {agreement.franchise_name}!", "response": f"<p>Dear Customer, welcome to ScoopJoy {agreement.franchise_name}!</p>", }).insert(ignore_permissions=True)Pattern 2: Atomic operations with frappe.db.sql
Section titled “Pattern 2: Atomic operations with frappe.db.sql”A stock transfer reads a balance, checks it, then writes. If two transfers read the
same balance concurrently, both can pass the check and oversell. SELECT ... FOR UPDATE takes a row-level lock so the second transfer waits for the first to finish.
import frappefrom frappe.utils import flt
def atomic_stock_transfer(source_warehouse, target_warehouse, item_code, qty): """ Atomic stock quantity update using a single SQL statement. Prevents race conditions where two transfers read the same balance. """ # Verify sufficient stock with a row-level lock (SELECT ... FOR UPDATE) current_qty = frappe.db.sql( """ SELECT actual_qty FROM `tabBin` WHERE warehouse = %(warehouse)s AND item_code = %(item_code)s FOR UPDATE """, {"warehouse": source_warehouse, "item_code": item_code}, as_dict=True, )
if not current_qty or flt(current_qty[0].actual_qty) < flt(qty): frappe.throw( f"Insufficient stock in {source_warehouse}: " f"need {qty}, have {flt(current_qty[0].actual_qty) if current_qty else 0}" )
# Atomic decrement + increment in two SQL statements frappe.db.sql( """ UPDATE `tabBin` SET actual_qty = actual_qty - %(qty)s WHERE warehouse = %(warehouse)s AND item_code = %(item_code)s """, {"qty": qty, "warehouse": source_warehouse, "item_code": item_code}, )
frappe.db.sql( """ UPDATE `tabBin` SET actual_qty = actual_qty + %(qty)s WHERE warehouse = %(warehouse)s AND item_code = %(item_code)s """, {"qty": qty, "warehouse": target_warehouse, "item_code": item_code}, )Pattern 3: Deadlock retry
Section titled “Pattern 3: Deadlock retry”Under high concurrency, MariaDB may abort a transaction with a deadlock or lock-wait timeout. The fix is not to avoid it but to retry with exponential backoff. The wrapper rolls back the failed attempt before sleeping, then re-runs the operation.
import frappeimport timefrom frappe import _
def with_deadlock_retry(fn, max_retries=3, base_delay=0.5): """ Retry a database operation if a deadlock is detected. Uses exponential backoff between retries. """ last_error = None
for attempt in range(max_retries): try: result = fn() return result except Exception as e: error_str = str(e).lower() is_deadlock = "deadlock" in error_str or "lock wait timeout" in error_str
if not is_deadlock or attempt == max_retries - 1: raise
last_error = e delay = base_delay * (2 ** attempt) frappe.logger("scoopjoy").warning( f"Deadlock detected on attempt {attempt + 1}/{max_retries}. " f"Retrying in {delay}s: {e}" ) time.sleep(delay) frappe.db.rollback()
raise last_error
# Usage example:def bulk_update_prices(price_list, items_with_prices): """Update many item prices — may deadlock under high concurrency."""
def do_update(): for item in items_with_prices: existing = frappe.db.exists( "Item Price", {"price_list": price_list, "item_code": item["item_code"]}, ) if existing: frappe.db.set_value( "Item Price", existing, "price_list_rate", item["rate"] ) else: frappe.get_doc({ "doctype": "Item Price", "price_list": price_list, "item_code": item["item_code"], "price_list_rate": item["rate"], }).insert(ignore_permissions=True) frappe.db.commit()
with_deadlock_retry(do_update)Pattern 4: frappe.flags for cross-function communication
Section titled “Pattern 4: frappe.flags for cross-function communication”frappe.flags is a per-request scratchpad, cleared automatically when the request
ends. Use it to hand a value computed in validate() to on_submit() without
re-querying the database or stashing state on the document.
import frappefrom frappe import _from frappe.utils import flt
class ScoopJoyPOSInvoice: """ Use frappe.flags to pass data between hooks within the same request without polluting the document or using globals. """
def validate(self): super().validate()
# Calculate and stash royalty for use in on_submit if self.custom_franchise_agreement: royalty_pct = frappe.db.get_value( "Franchise Agreement", self.custom_franchise_agreement, "royalty_percentage", ) # Store in flags — available within this request only, cleared automatically frappe.flags.scoopjoy_royalty_amount = flt(self.grand_total) * flt(royalty_pct) / 100 frappe.flags.scoopjoy_is_franchise_sale = True
def on_submit(self): super().on_submit()
# Read the value calculated in validate — no extra DB query needed if frappe.flags.get("scoopjoy_is_franchise_sale"): royalty = frappe.flags.get("scoopjoy_royalty_amount", 0) if royalty > 0: self.db_set("custom_royalty_amount", royalty)
def on_cancel(self): super().on_cancel()
# Prevent double-processing if another hook re-triggers frappe.flags.scoopjoy_cancelling_pos = TruePattern 5: When frappe.db.commit() is needed vs automatic
Section titled “Pattern 5: When frappe.db.commit() is needed vs automatic”The single most common transaction bug is committing in the wrong place. Frappe already wraps web requests and doc events in a transaction; background work runs outside that lifecycle and must commit itself.
| Context | Commit behaviour |
|---|---|
| Web requests (GET/POST) | Automatic on success — do not call commit() |
doc_events (validate, on_submit, on_cancel, …) | Automatic — never call commit() |
| Whitelisted methods called via HTTP | Automatic on success |
Background jobs (frappe.enqueue) | Manual — you must call frappe.db.commit() |
Scheduled tasks (scheduler_events) | Manual |
| Bench console sessions | Manual |
Management commands (bench execute) | Manual |
import frappe
# Example: Background job that NEEDS explicit commitsdef reindex_franchise_data(): """Background job — must commit explicitly.""" agreements = frappe.get_all("Franchise Agreement", pluck="name")
for i, name in enumerate(agreements): frappe.db.set_value( "Franchise Agreement", name, "custom_last_reindex", frappe.utils.now_datetime() ) if (i + 1) % 100 == 0: frappe.db.commit()
frappe.db.commit() # Final commit for remaining items
# Example: Web request handler that does NOT need commit@frappe.whitelist()def update_franchise_status(agreement, status): """Whitelisted method — Frappe auto-commits on success.""" frappe.db.set_value("Franchise Agreement", agreement, "agreement_status", status) # NO frappe.db.commit() here — Frappe handles it return {"status": "updated"}