ORM Power Patterns
Problem: Perform complex data operations efficiently in Frappe — without dropping to hand-written SQL where the ORM already has a fast, safe path, and without leaving correctness bugs (race conditions, deadlocks, injection) where it doesn’t.
This recipe is a tour of eight power patterns for the ScoopJoy backend, each
pairing a real problem with the right frappe.db / frappe.qb / frappe.get_all
tool. Each subheading stands on its own — jump to the one you need.
Bulk insert 10,000 records
Section titled “Bulk insert 10,000 records”Problem: Insert thousands of records without the overhead of doc.insert() for
each.
Solution: Use frappe.db.bulk_insert for raw speed, skipping controller hooks.
import frappefrom frappe.utils import now_datetime, random_string
def bulk_insert_daily_sales(records): """Insert 10,000+ daily sales snapshot records efficiently.
frappe.db.bulk_insert bypasses ORM validation, controllers, and hooks. Use only for trusted, pre-validated data (e.g., imports, migrations). """ now = now_datetime() user = frappe.session.user
fields = [ "name", "creation", "modified", "owner", "modified_by", "docstatus", "outlet", "sales_date", "total_amount", "items_sold", "footfall", ]
values = [] for i, rec in enumerate(records): values.append([ f"SJDS-{i+1:06d}-{random_string(4)}", # name (must be unique) now, # creation now, # modified user, # owner user, # modified_by 0, # docstatus rec["outlet"], rec["sales_date"], rec["total_amount"], rec["items_sold"], rec["footfall"], ])
frappe.db.bulk_insert( "SJ Daily Sales Snapshot", fields=fields, values=values, ignore_duplicates=False, chunk_size=5000, ) frappe.db.commit() return len(values)
def slow_insert_for_comparison(records): """The naive approach — 100x slower for 10K records. DO NOT use in production.""" for rec in records: doc = frappe.get_doc({ "doctype": "SJ Daily Sales Snapshot", "outlet": rec["outlet"], "sales_date": rec["sales_date"], "total_amount": rec["total_amount"], "items_sold": rec["items_sold"], "footfall": rec["footfall"], }) doc.insert(ignore_permissions=True) frappe.db.commit()Atomic counter update
Section titled “Atomic counter update”Problem: Increment a counter (e.g., visit count) without read-modify-write race conditions.
Solution: Use frappe.db.sql with SET x = x + 1 — the database handles
atomicity.
import frappe
def increment_outlet_visit_count(outlet_name, increment=1): """Atomic increment — no race condition even under concurrent requests.""" frappe.db.sql( """ UPDATE `tabSJ Franchise Outlet` SET visit_count = visit_count + %s, modified = NOW(), modified_by = %s WHERE name = %s """, (increment, frappe.session.user, outlet_name), ) # No need to read before writing — the DB does it atomically
def decrement_stock_count(item_code, warehouse, qty): """Atomic stock decrement with underflow protection.""" result = frappe.db.sql( """ UPDATE `tabSJ Stock Ledger` SET qty_on_hand = qty_on_hand - %s, modified = NOW() WHERE item_code = %s AND warehouse = %s AND qty_on_hand >= %s """, (qty, item_code, warehouse, qty), ) if not frappe.db.sql("SELECT ROW_COUNT()")[0][0]: frappe.throw(f"Insufficient stock for {item_code} in {warehouse}")Conditional aggregation with frappe.qb
Section titled “Conditional aggregation with frappe.qb”Problem: Generate a pivot-style summary — total sales by outlet, broken down by payment mode.
Solution: Use CASE WHEN inside aggregate functions via frappe.qb.
import frappefrom frappe.query_builder.functions import Sum, Countfrom frappe.query_builder import Case
def sales_by_payment_mode(): """Pivot: total sales per outlet, split by Cash / UPI / Card.""" SI = frappe.qb.DocType("Sales Invoice")
query = ( frappe.qb.from_(SI) .select( SI.sj_franchise_outlet.as_("outlet"), Count("*").as_("total_invoices"), Sum( Case() .when(SI.mode_of_payment == "Cash", SI.grand_total) .else_(0) ).as_("cash_total"), Sum( Case() .when(SI.mode_of_payment == "UPI", SI.grand_total) .else_(0) ).as_("upi_total"), Sum( Case() .when(SI.mode_of_payment == "Card", SI.grand_total) .else_(0) ).as_("card_total"), Sum(SI.grand_total).as_("grand_total"), ) .where(SI.docstatus == 1) .groupby(SI.sj_franchise_outlet) .orderby(Sum(SI.grand_total), order=frappe.qb.desc) )
return query.run(as_dict=True)Subquery filters
Section titled “Subquery filters”Problem: Get all customers who placed at least one order above 50,000 INR.
Solution: Use a subquery with frappe.qb.
import frappefrom frappe.query_builder.functions import Maxfrom pypika.terms import ExistsCriterion
def high_value_customers(threshold=50000): """Get customers who have at least one Sales Invoice above threshold.""" Customer = frappe.qb.DocType("Customer") SI = frappe.qb.DocType("Sales Invoice")
subquery = ( frappe.qb.from_(SI) .select(SI.name) .where(SI.customer == Customer.name) .where(SI.docstatus == 1) .where(SI.grand_total > threshold) .limit(1) )
query = ( frappe.qb.from_(Customer) .select(Customer.name, Customer.customer_name, Customer.territory) .where(ExistsCriterion(subquery)) )
return query.run(as_dict=True)
def customers_with_max_order_value(): """Get each customer with their maximum order value using a correlated subquery.""" Customer = frappe.qb.DocType("Customer") SI = frappe.qb.DocType("Sales Invoice")
max_subquery = ( frappe.qb.from_(SI) .select(Max(SI.grand_total)) .where(SI.customer == Customer.name) .where(SI.docstatus == 1) )
query = ( frappe.qb.from_(Customer) .select( Customer.name, Customer.customer_name, max_subquery.as_("max_order_value"), ) .orderby(max_subquery, order=frappe.qb.desc) .limit(50) )
return query.run(as_dict=True)EXISTS vs JOIN — when to use which
Section titled “EXISTS vs JOIN — when to use which”Problem: You need to filter a parent table based on a condition in a
child/related table. Should you use EXISTS or JOIN?
Solution: Rules of thumb with benchmarkable examples — use EXISTS when you
only need parent columns and a boolean “does at least one child match?” check; use
JOIN when you need columns or aggregates from both tables.
import frappefrom pypika.terms import ExistsCriterion
def outlets_with_complaints_exists(): """USE EXISTS when: - You only need parent data (no child columns in SELECT) - Child table has many matching rows per parent (avoids duplicates) - You want a boolean check: "does at least one child match?"
EXISTS stops at the first match per parent row — O(1) per parent. """ Outlet = frappe.qb.DocType("SJ Franchise Outlet") Note = frappe.qb.DocType("SJ Note")
subquery = ( frappe.qb.from_(Note) .select(Note.name) .where(Note.reference_doctype == "SJ Franchise Outlet") .where(Note.reference_name == Outlet.name) .where(Note.note_type == "Complaint") .where(Note.resolved == 0) )
return ( frappe.qb.from_(Outlet) .select(Outlet.name, Outlet.outlet_name, Outlet.city) .where(ExistsCriterion(subquery)) ).run(as_dict=True)
def outlets_with_complaint_details_join(): """USE JOIN when: - You need columns from both tables in SELECT - You need aggregation on child data (COUNT, SUM) - The relationship is 1:1 or nearly 1:1
JOIN can produce duplicates for 1:N — use GROUP BY or DISTINCT. """ Outlet = frappe.qb.DocType("SJ Franchise Outlet") Note = frappe.qb.DocType("SJ Note")
return ( frappe.qb.from_(Outlet) .join(Note) .on( (Note.reference_doctype == "SJ Franchise Outlet") & (Note.reference_name == Outlet.name) ) .select( Outlet.name, Outlet.outlet_name, Note.subject.as_("complaint_subject"), Note.creation.as_("complaint_date"), ) .where(Note.note_type == "Complaint") .where(Note.resolved == 0) .orderby(Note.creation, order=frappe.qb.desc) ).run(as_dict=True)frappe.get_all with complex filters
Section titled “frappe.get_all with complex filters”Problem: Use or_filters, nested conditions, and advanced operators.
Solution: Leverage Frappe’s filter syntax — lists of
[doctype, field, operator, value].
import frappe
def search_outlets_complex(): """Demonstrate all filter operators available in frappe.get_all."""
# Basic AND filters results = frappe.get_all( "SJ Franchise Outlet", filters={ "city": ["in", ["Mumbai", "Pune", "Nagpur"]], "status": ["!=", "Closed"], "monthly_rent": [">=", 50000], "agreement_date": ["between", ["2025-01-01", "2026-12-31"]], }, fields=["name", "outlet_name", "city", "monthly_rent"], )
# OR filters — any ONE of these conditions is enough results_or = frappe.get_all( "SJ Franchise Outlet", filters={"status": "Active"}, or_filters={ "city": "Mumbai", "is_flagship": 1, "monthly_revenue": [">", 500000], }, fields=["name", "outlet_name", "city"], )
# LIKE and NOT LIKE results_like = frappe.get_all( "SJ Franchise Outlet", filters={ "outlet_name": ["like", "%Premium%"], "city": ["not like", "%Test%"], }, fields=["name", "outlet_name"], )
# IS NULL / IS NOT NULL results_null = frappe.get_all( "SJ Franchise Outlet", filters={ "closed_date": ["is", "not set"], # IS NULL "agreement_date": ["is", "set"], # IS NOT NULL }, fields=["name", "outlet_name"], )
# Nested list-style filters (explicit doctype reference) results_nested = frappe.get_all( "SJ Franchise Outlet", filters=[ ["SJ Franchise Outlet", "city", "in", ["Mumbai", "Delhi"]], ["SJ Franchise Outlet", "creation", ">", "2025-06-01"], ], fields=["name", "outlet_name", "city", "creation"], order_by="creation desc", limit_page_length=50, )
return { "basic_and": results, "with_or": results_or, "like_pattern": results_like, "null_checks": results_null, "nested_explicit": results_nested, }Raw SQL with parameterized queries
Section titled “Raw SQL with parameterized queries”Problem: Sometimes you need raw SQL for complex queries, but must avoid SQL injection.
Solution: ALWAYS use %s placeholders with frappe.db.sql. NEVER use
f-strings or .format() for values.
import frappe
def raw_query_safe(city, min_revenue, start_date, end_date): """CORRECT: Parameterized query — values are escaped by the DB driver.""" return frappe.db.sql( """ SELECT o.name, o.outlet_name, o.city, SUM(si.grand_total) as total_revenue, COUNT(si.name) as invoice_count FROM `tabSJ Franchise Outlet` o INNER JOIN `tabSales Invoice` si ON si.sj_franchise_outlet = o.name WHERE o.city = %s AND si.docstatus = 1 AND si.posting_date BETWEEN %s AND %s GROUP BY o.name HAVING total_revenue >= %s ORDER BY total_revenue DESC """, (city, start_date, end_date, min_revenue), as_dict=True, )
def raw_query_DANGEROUS(city, min_revenue): """WRONG: SQL injection vulnerability. NEVER DO THIS.
If city = "Mumbai'; DROP TABLE `tabSales Invoice`; --" the entire table is gone. """ # return frappe.db.sql( # f"SELECT * FROM `tabSJ Franchise Outlet` WHERE city = '{city}'" # ) raise SecurityError("This function exists only as a warning. Never use string interpolation in SQL.")
def raw_query_with_dynamic_table(doctype_name, filters): """When table names must be dynamic, validate against known DocTypes.""" # Validate the doctype exists (prevents table name injection) if not frappe.db.exists("DocType", doctype_name): frappe.throw(f"Invalid DocType: {doctype_name}")
table = f"tab{doctype_name}" return frappe.db.sql( f"SELECT name, creation FROM `{table}` WHERE owner = %s ORDER BY creation DESC LIMIT 20", (frappe.session.user,), as_dict=True, )Database-level locking
Section titled “Database-level locking”Problem: Two users simultaneously approve the same leave application, causing double deduction from the leave balance.
Solution: Use SELECT ... FOR UPDATE to lock the row during the transaction.
import frappefrom frappe import _
def deduct_leave_balance(employee, leave_type, days): """Thread-safe leave balance deduction using row-level locking.
SELECT ... FOR UPDATE locks the row until the transaction commits. Any other transaction trying to read FOR UPDATE on the same row will block until this transaction completes. """ # Lock the row balance = frappe.db.sql( """ SELECT name, remaining_leaves FROM `tabSJ Leave Balance` WHERE employee = %s AND leave_type = %s FOR UPDATE """, (employee, leave_type), as_dict=True, )
if not balance: frappe.throw(_("No leave balance found for {0} - {1}").format(employee, leave_type))
current = balance[0].remaining_leaves if current < days: frappe.throw( _("Insufficient leave balance. Available: {0}, Requested: {1}").format(current, days) )
# Update while the lock is held frappe.db.sql( """ UPDATE `tabSJ Leave Balance` SET remaining_leaves = remaining_leaves - %s, modified = NOW(), modified_by = %s WHERE name = %s """, (days, frappe.session.user, balance[0].name), ) # Lock is released when the transaction commits (at end of request or explicit commit) return current - days
def transfer_stock_between_warehouses(item_code, from_wh, to_wh, qty): """Atomic stock transfer with deadlock prevention.
IMPORTANT: Always lock rows in consistent order (alphabetical by warehouse name) to prevent deadlocks when two transfers happen simultaneously in opposite directions. """ # Sort warehouses to ensure consistent lock ordering wh_ordered = sorted([from_wh, to_wh])
for wh in wh_ordered: frappe.db.sql( """ SELECT name FROM `tabSJ Stock Ledger` WHERE item_code = %s AND warehouse = %s FOR UPDATE """, (item_code, wh), )
# Now both rows are locked — safe to update # Deduct from source frappe.db.sql( """ UPDATE `tabSJ Stock Ledger` SET qty_on_hand = qty_on_hand - %s, modified = NOW() WHERE item_code = %s AND warehouse = %s AND qty_on_hand >= %s """, (qty, item_code, from_wh, qty), )
affected = frappe.db.sql("SELECT ROW_COUNT() as cnt")[0][0] if not affected: frappe.throw(_("Insufficient stock in {0}").format(from_wh))
# Add to destination frappe.db.sql( """ UPDATE `tabSJ Stock Ledger` SET qty_on_hand = qty_on_hand + %s, modified = NOW() WHERE item_code = %s AND warehouse = %s """, (qty, item_code, to_wh), )