Skip to content

ORM Power Patterns

Problem: Perform complex data operations efficiently in Frappe — without dropping to hand-written SQL where the ORM already has a fast, safe path, and without leaving correctness bugs (race conditions, deadlocks, injection) where it doesn’t.

This recipe is a tour of eight power patterns for the ScoopJoy backend, each pairing a real problem with the right frappe.db / frappe.qb / frappe.get_all tool. Each subheading stands on its own — jump to the one you need.

Problem: Insert thousands of records without the overhead of doc.insert() for each.

Solution: Use frappe.db.bulk_insert for raw speed, skipping controller hooks.

scoopjoy/scoopjoy/utils/bulk_ops.py
import frappe
from frappe.utils import now_datetime, random_string
def bulk_insert_daily_sales(records):
"""Insert 10,000+ daily sales snapshot records efficiently.
frappe.db.bulk_insert bypasses ORM validation, controllers, and hooks.
Use only for trusted, pre-validated data (e.g., imports, migrations).
"""
now = now_datetime()
user = frappe.session.user
fields = [
"name", "creation", "modified", "owner", "modified_by", "docstatus",
"outlet", "sales_date", "total_amount", "items_sold", "footfall",
]
values = []
for i, rec in enumerate(records):
values.append([
f"SJDS-{i+1:06d}-{random_string(4)}", # name (must be unique)
now, # creation
now, # modified
user, # owner
user, # modified_by
0, # docstatus
rec["outlet"],
rec["sales_date"],
rec["total_amount"],
rec["items_sold"],
rec["footfall"],
])
frappe.db.bulk_insert(
"SJ Daily Sales Snapshot",
fields=fields,
values=values,
ignore_duplicates=False,
chunk_size=5000,
)
frappe.db.commit()
return len(values)
def slow_insert_for_comparison(records):
"""The naive approach — 100x slower for 10K records. DO NOT use in production."""
for rec in records:
doc = frappe.get_doc({
"doctype": "SJ Daily Sales Snapshot",
"outlet": rec["outlet"],
"sales_date": rec["sales_date"],
"total_amount": rec["total_amount"],
"items_sold": rec["items_sold"],
"footfall": rec["footfall"],
})
doc.insert(ignore_permissions=True)
frappe.db.commit()

Problem: Increment a counter (e.g., visit count) without read-modify-write race conditions.

Solution: Use frappe.db.sql with SET x = x + 1 — the database handles atomicity.

scoopjoy/scoopjoy/utils/counters.py
import frappe
def increment_outlet_visit_count(outlet_name, increment=1):
"""Atomic increment — no race condition even under concurrent requests."""
frappe.db.sql(
"""
UPDATE `tabSJ Franchise Outlet`
SET visit_count = visit_count + %s,
modified = NOW(),
modified_by = %s
WHERE name = %s
""",
(increment, frappe.session.user, outlet_name),
)
# No need to read before writing — the DB does it atomically
def decrement_stock_count(item_code, warehouse, qty):
"""Atomic stock decrement with underflow protection."""
result = frappe.db.sql(
"""
UPDATE `tabSJ Stock Ledger`
SET qty_on_hand = qty_on_hand - %s,
modified = NOW()
WHERE item_code = %s
AND warehouse = %s
AND qty_on_hand >= %s
""",
(qty, item_code, warehouse, qty),
)
if not frappe.db.sql("SELECT ROW_COUNT()")[0][0]:
frappe.throw(f"Insufficient stock for {item_code} in {warehouse}")

Problem: Generate a pivot-style summary — total sales by outlet, broken down by payment mode.

Solution: Use CASE WHEN inside aggregate functions via frappe.qb.

scoopjoy/scoopjoy/utils/reports.py
import frappe
from frappe.query_builder.functions import Sum, Count
from frappe.query_builder import Case
def sales_by_payment_mode():
"""Pivot: total sales per outlet, split by Cash / UPI / Card."""
SI = frappe.qb.DocType("Sales Invoice")
query = (
frappe.qb.from_(SI)
.select(
SI.sj_franchise_outlet.as_("outlet"),
Count("*").as_("total_invoices"),
Sum(
Case()
.when(SI.mode_of_payment == "Cash", SI.grand_total)
.else_(0)
).as_("cash_total"),
Sum(
Case()
.when(SI.mode_of_payment == "UPI", SI.grand_total)
.else_(0)
).as_("upi_total"),
Sum(
Case()
.when(SI.mode_of_payment == "Card", SI.grand_total)
.else_(0)
).as_("card_total"),
Sum(SI.grand_total).as_("grand_total"),
)
.where(SI.docstatus == 1)
.groupby(SI.sj_franchise_outlet)
.orderby(Sum(SI.grand_total), order=frappe.qb.desc)
)
return query.run(as_dict=True)

Problem: Get all customers who placed at least one order above 50,000 INR.

Solution: Use a subquery with frappe.qb.

scoopjoy/scoopjoy/utils/reports.py (append)
import frappe
from frappe.query_builder.functions import Max
from pypika.terms import ExistsCriterion
def high_value_customers(threshold=50000):
"""Get customers who have at least one Sales Invoice above threshold."""
Customer = frappe.qb.DocType("Customer")
SI = frappe.qb.DocType("Sales Invoice")
subquery = (
frappe.qb.from_(SI)
.select(SI.name)
.where(SI.customer == Customer.name)
.where(SI.docstatus == 1)
.where(SI.grand_total > threshold)
.limit(1)
)
query = (
frappe.qb.from_(Customer)
.select(Customer.name, Customer.customer_name, Customer.territory)
.where(ExistsCriterion(subquery))
)
return query.run(as_dict=True)
def customers_with_max_order_value():
"""Get each customer with their maximum order value using a correlated subquery."""
Customer = frappe.qb.DocType("Customer")
SI = frappe.qb.DocType("Sales Invoice")
max_subquery = (
frappe.qb.from_(SI)
.select(Max(SI.grand_total))
.where(SI.customer == Customer.name)
.where(SI.docstatus == 1)
)
query = (
frappe.qb.from_(Customer)
.select(
Customer.name,
Customer.customer_name,
max_subquery.as_("max_order_value"),
)
.orderby(max_subquery, order=frappe.qb.desc)
.limit(50)
)
return query.run(as_dict=True)

Problem: You need to filter a parent table based on a condition in a child/related table. Should you use EXISTS or JOIN?

Solution: Rules of thumb with benchmarkable examples — use EXISTS when you only need parent columns and a boolean “does at least one child match?” check; use JOIN when you need columns or aggregates from both tables.

scoopjoy/scoopjoy/utils/query_patterns.py
import frappe
from pypika.terms import ExistsCriterion
def outlets_with_complaints_exists():
"""USE EXISTS when:
- You only need parent data (no child columns in SELECT)
- Child table has many matching rows per parent (avoids duplicates)
- You want a boolean check: "does at least one child match?"
EXISTS stops at the first match per parent row — O(1) per parent.
"""
Outlet = frappe.qb.DocType("SJ Franchise Outlet")
Note = frappe.qb.DocType("SJ Note")
subquery = (
frappe.qb.from_(Note)
.select(Note.name)
.where(Note.reference_doctype == "SJ Franchise Outlet")
.where(Note.reference_name == Outlet.name)
.where(Note.note_type == "Complaint")
.where(Note.resolved == 0)
)
return (
frappe.qb.from_(Outlet)
.select(Outlet.name, Outlet.outlet_name, Outlet.city)
.where(ExistsCriterion(subquery))
).run(as_dict=True)
def outlets_with_complaint_details_join():
"""USE JOIN when:
- You need columns from both tables in SELECT
- You need aggregation on child data (COUNT, SUM)
- The relationship is 1:1 or nearly 1:1
JOIN can produce duplicates for 1:N — use GROUP BY or DISTINCT.
"""
Outlet = frappe.qb.DocType("SJ Franchise Outlet")
Note = frappe.qb.DocType("SJ Note")
return (
frappe.qb.from_(Outlet)
.join(Note)
.on(
(Note.reference_doctype == "SJ Franchise Outlet")
& (Note.reference_name == Outlet.name)
)
.select(
Outlet.name,
Outlet.outlet_name,
Note.subject.as_("complaint_subject"),
Note.creation.as_("complaint_date"),
)
.where(Note.note_type == "Complaint")
.where(Note.resolved == 0)
.orderby(Note.creation, order=frappe.qb.desc)
).run(as_dict=True)

Problem: Use or_filters, nested conditions, and advanced operators.

Solution: Leverage Frappe’s filter syntax — lists of [doctype, field, operator, value].

scoopjoy/scoopjoy/utils/query_patterns.py (append)
import frappe
def search_outlets_complex():
"""Demonstrate all filter operators available in frappe.get_all."""
# Basic AND filters
results = frappe.get_all(
"SJ Franchise Outlet",
filters={
"city": ["in", ["Mumbai", "Pune", "Nagpur"]],
"status": ["!=", "Closed"],
"monthly_rent": [">=", 50000],
"agreement_date": ["between", ["2025-01-01", "2026-12-31"]],
},
fields=["name", "outlet_name", "city", "monthly_rent"],
)
# OR filters — any ONE of these conditions is enough
results_or = frappe.get_all(
"SJ Franchise Outlet",
filters={"status": "Active"},
or_filters={
"city": "Mumbai",
"is_flagship": 1,
"monthly_revenue": [">", 500000],
},
fields=["name", "outlet_name", "city"],
)
# LIKE and NOT LIKE
results_like = frappe.get_all(
"SJ Franchise Outlet",
filters={
"outlet_name": ["like", "%Premium%"],
"city": ["not like", "%Test%"],
},
fields=["name", "outlet_name"],
)
# IS NULL / IS NOT NULL
results_null = frappe.get_all(
"SJ Franchise Outlet",
filters={
"closed_date": ["is", "not set"], # IS NULL
"agreement_date": ["is", "set"], # IS NOT NULL
},
fields=["name", "outlet_name"],
)
# Nested list-style filters (explicit doctype reference)
results_nested = frappe.get_all(
"SJ Franchise Outlet",
filters=[
["SJ Franchise Outlet", "city", "in", ["Mumbai", "Delhi"]],
["SJ Franchise Outlet", "creation", ">", "2025-06-01"],
],
fields=["name", "outlet_name", "city", "creation"],
order_by="creation desc",
limit_page_length=50,
)
return {
"basic_and": results,
"with_or": results_or,
"like_pattern": results_like,
"null_checks": results_null,
"nested_explicit": results_nested,
}

Problem: Sometimes you need raw SQL for complex queries, but must avoid SQL injection.

Solution: ALWAYS use %s placeholders with frappe.db.sql. NEVER use f-strings or .format() for values.

scoopjoy/scoopjoy/utils/query_patterns.py (append)
import frappe
def raw_query_safe(city, min_revenue, start_date, end_date):
"""CORRECT: Parameterized query — values are escaped by the DB driver."""
return frappe.db.sql(
"""
SELECT
o.name,
o.outlet_name,
o.city,
SUM(si.grand_total) as total_revenue,
COUNT(si.name) as invoice_count
FROM `tabSJ Franchise Outlet` o
INNER JOIN `tabSales Invoice` si
ON si.sj_franchise_outlet = o.name
WHERE o.city = %s
AND si.docstatus = 1
AND si.posting_date BETWEEN %s AND %s
GROUP BY o.name
HAVING total_revenue >= %s
ORDER BY total_revenue DESC
""",
(city, start_date, end_date, min_revenue),
as_dict=True,
)
def raw_query_DANGEROUS(city, min_revenue):
"""WRONG: SQL injection vulnerability. NEVER DO THIS.
If city = "Mumbai'; DROP TABLE `tabSales Invoice`; --"
the entire table is gone.
"""
# return frappe.db.sql(
# f"SELECT * FROM `tabSJ Franchise Outlet` WHERE city = '{city}'"
# )
raise SecurityError("This function exists only as a warning. Never use string interpolation in SQL.")
def raw_query_with_dynamic_table(doctype_name, filters):
"""When table names must be dynamic, validate against known DocTypes."""
# Validate the doctype exists (prevents table name injection)
if not frappe.db.exists("DocType", doctype_name):
frappe.throw(f"Invalid DocType: {doctype_name}")
table = f"tab{doctype_name}"
return frappe.db.sql(
f"SELECT name, creation FROM `{table}` WHERE owner = %s ORDER BY creation DESC LIMIT 20",
(frappe.session.user,),
as_dict=True,
)

Problem: Two users simultaneously approve the same leave application, causing double deduction from the leave balance.

Solution: Use SELECT ... FOR UPDATE to lock the row during the transaction.

scoopjoy/scoopjoy/utils/locking.py
import frappe
from frappe import _
def deduct_leave_balance(employee, leave_type, days):
"""Thread-safe leave balance deduction using row-level locking.
SELECT ... FOR UPDATE locks the row until the transaction commits.
Any other transaction trying to read FOR UPDATE on the same row
will block until this transaction completes.
"""
# Lock the row
balance = frappe.db.sql(
"""
SELECT name, remaining_leaves
FROM `tabSJ Leave Balance`
WHERE employee = %s AND leave_type = %s
FOR UPDATE
""",
(employee, leave_type),
as_dict=True,
)
if not balance:
frappe.throw(_("No leave balance found for {0} - {1}").format(employee, leave_type))
current = balance[0].remaining_leaves
if current < days:
frappe.throw(
_("Insufficient leave balance. Available: {0}, Requested: {1}").format(current, days)
)
# Update while the lock is held
frappe.db.sql(
"""
UPDATE `tabSJ Leave Balance`
SET remaining_leaves = remaining_leaves - %s,
modified = NOW(),
modified_by = %s
WHERE name = %s
""",
(days, frappe.session.user, balance[0].name),
)
# Lock is released when the transaction commits (at end of request or explicit commit)
return current - days
def transfer_stock_between_warehouses(item_code, from_wh, to_wh, qty):
"""Atomic stock transfer with deadlock prevention.
IMPORTANT: Always lock rows in consistent order (alphabetical by warehouse name)
to prevent deadlocks when two transfers happen simultaneously in opposite directions.
"""
# Sort warehouses to ensure consistent lock ordering
wh_ordered = sorted([from_wh, to_wh])
for wh in wh_ordered:
frappe.db.sql(
"""
SELECT name FROM `tabSJ Stock Ledger`
WHERE item_code = %s AND warehouse = %s
FOR UPDATE
""",
(item_code, wh),
)
# Now both rows are locked — safe to update
# Deduct from source
frappe.db.sql(
"""
UPDATE `tabSJ Stock Ledger`
SET qty_on_hand = qty_on_hand - %s, modified = NOW()
WHERE item_code = %s AND warehouse = %s AND qty_on_hand >= %s
""",
(qty, item_code, from_wh, qty),
)
affected = frappe.db.sql("SELECT ROW_COUNT() as cnt")[0][0]
if not affected:
frappe.throw(_("Insufficient stock in {0}").format(from_wh))
# Add to destination
frappe.db.sql(
"""
UPDATE `tabSJ Stock Ledger`
SET qty_on_hand = qty_on_hand + %s, modified = NOW()
WHERE item_code = %s AND warehouse = %s
""",
(qty, item_code, to_wh),
)