Materialized View Pattern
Problem: A ScoopJoy dashboard needs pre-aggregated daily sales data.
Computing it in real-time from Sales Invoice with 500K+ records takes 3+
seconds. You need sub-100ms response times.
Solution: Use a custom DocType as a materialized view — a Daily Sales Summary populated by a scheduled job and incrementally updated whenever an
invoice is submitted or cancelled.
Step 1: Create the materialized-view DocType
Section titled “Step 1: Create the materialized-view DocType”The summary DocType stores one row per outlet per day. The autoname
format:DSS-{posting_date}-{franchise_outlet} gives each row a deterministic
name so the refresh job can upsert by name, and fetch_from pulls outlet
metadata for free.
{ "doctype": "DocType", "name": "Daily Sales Summary", "module": "ScoopJoy", "autoname": "format:DSS-{posting_date}-{franchise_outlet}", "is_submittable": 0, "allow_import": 1, "fields": [ {"fieldname": "posting_date", "fieldtype": "Date", "in_list_view": 1, "reqd": 1}, {"fieldname": "franchise_outlet", "fieldtype": "Link", "options": "Franchise Outlet", "in_list_view": 1, "reqd": 1}, {"fieldname": "outlet_name", "fieldtype": "Data", "fetch_from": "franchise_outlet.outlet_name"}, {"fieldname": "region", "fieldtype": "Data", "fetch_from": "franchise_outlet.region"}, {"fieldname": "order_count", "fieldtype": "Int", "in_list_view": 1}, {"fieldname": "gross_revenue", "fieldtype": "Currency", "in_list_view": 1}, {"fieldname": "net_revenue", "fieldtype": "Currency"}, {"fieldname": "total_tax", "fieldtype": "Currency"}, {"fieldname": "total_discount", "fieldtype": "Currency"}, {"fieldname": "avg_order_value", "fieldtype": "Currency"}, {"fieldname": "items_sold", "fieldtype": "Float"}, {"fieldname": "unique_customers", "fieldtype": "Int"}, {"fieldname": "top_item", "fieldtype": "Data"}, {"fieldname": "top_item_qty", "fieldtype": "Float"}, {"fieldname": "last_refreshed", "fieldtype": "Datetime"} ], "permissions": [ {"role": "Franchise Manager", "read": 1}, {"role": "System Manager", "read": 1, "write": 1} ]}Step 2: The aggregation engine
Section titled “Step 2: The aggregation engine”refresh_daily_summary runs the expensive GROUP BY once, then upserts a
summary row per outlet. Because the document name is derived from the date and
outlet, an existing row is loaded and overwritten rather than duplicated.
import frappefrom frappe.utils import today, now, getdate, add_daysfrom frappe.query_builder.functions import Sum, Count, Avg, CountDistinct
def refresh_daily_summary(date=None, outlet=None): """ Recompute the Daily Sales Summary for a given date and/or outlet. If no args, refreshes today. """ target_date = getdate(date) if date else getdate(today())
SI = frappe.qb.DocType("Sales Invoice") SII = frappe.qb.DocType("Sales Invoice Item")
# Main aggregation query = ( frappe.qb.from_(SI) .join(SII).on(SII.parent == SI.name) .select( SI.posting_date, SI.franchise_outlet, Count(SI.name).as_("order_count"), Sum(SI.grand_total).as_("gross_revenue"), Sum(SI.net_total).as_("net_revenue"), Sum(SI.total_taxes_and_charges).as_("total_tax"), Sum(SI.discount_amount).as_("total_discount"), Avg(SI.grand_total).as_("avg_order_value"), Sum(SII.qty).as_("items_sold"), CountDistinct(SI.customer).as_("unique_customers"), ) .where(SI.docstatus == 1) .where(SI.posting_date == target_date) .groupby(SI.franchise_outlet) )
if outlet: query = query.where(SI.franchise_outlet == outlet)
rows = query.run(as_dict=True)
# Top item per outlet (batch) outlet_names = [r.franchise_outlet for r in rows] top_items = _get_top_items_for_date(target_date, outlet_names)
# Upsert each row for row in rows: summary_name = f"DSS-{target_date}-{row.franchise_outlet}"
if frappe.db.exists("Daily Sales Summary", summary_name): doc = frappe.get_doc("Daily Sales Summary", summary_name) else: doc = frappe.new_doc("Daily Sales Summary") doc.name = summary_name doc.posting_date = target_date doc.franchise_outlet = row.franchise_outlet
doc.order_count = row.order_count doc.gross_revenue = row.gross_revenue doc.net_revenue = row.net_revenue doc.total_tax = row.total_tax doc.total_discount = row.total_discount doc.avg_order_value = row.avg_order_value doc.items_sold = row.items_sold doc.unique_customers = row.unique_customers
top = top_items.get(row.franchise_outlet) if top: doc.top_item = top["item_name"] doc.top_item_qty = top["total_qty"]
doc.last_refreshed = now() doc.save(ignore_permissions=True)
frappe.db.commit() return len(rows)The “top item per outlet” is a windowed query — ROW_NUMBER() OVER (PARTITION BY ...) ranks items within each outlet, and the outer query keeps only the
rank-1 row. This is one of those cases where raw SQL is clearer than the query
builder:
SELECT franchise_outlet, item_name, total_qtyFROM ( SELECT si.franchise_outlet, sii.item_name, SUM(sii.qty) AS total_qty, ROW_NUMBER() OVER ( PARTITION BY si.franchise_outlet ORDER BY SUM(sii.qty) DESC ) AS rn FROM `tabSales Invoice Item` sii JOIN `tabSales Invoice` si ON sii.parent = si.name WHERE si.docstatus = 1 AND si.posting_date = %(date)s AND si.franchise_outlet IN %(outlets)s GROUP BY si.franchise_outlet, sii.item_name) ranked WHERE rn = 1def _get_top_items_for_date(date, outlet_names): if not outlet_names: return {}
result = frappe.db.sql( """ /* windowed top-item query shown above */ """, {"date": date, "outlets": outlet_names}, as_dict=True, )
return {r.franchise_outlet: r for r in result}Incremental update on submit/cancel
Section titled “Incremental update on submit/cancel”Wired through doc_events, this refreshes only the affected outlet and date
when an invoice changes — keeping the summary near-real-time without rescanning
everything.
def on_sales_invoice_change(doc, method): """ Called via doc_events hook when a Sales Invoice is submitted or cancelled. Refreshes only the affected outlet + date. """ if doc.franchise_outlet and doc.posting_date: refresh_daily_summary(date=doc.posting_date, outlet=doc.franchise_outlet)Scheduled nightly refresh
Section titled “Scheduled nightly refresh”The nightly job refreshes today and yesterday, so late or backdated entries are caught even if no submit event fired for them.
def scheduled_daily_refresh(): """ Nightly job: refresh today and yesterday (catches late entries). Register in hooks.py scheduler_events. """ refresh_daily_summary(date=today()) refresh_daily_summary(date=add_days(today(), -1)) frappe.logger().info("Daily Sales Summary: nightly refresh complete")Rebuild historical data
Section titled “Rebuild historical data”For backfilling a whole range — run from the bench console — iterate day by day and publish progress so the operator sees movement on large rebuilds.
def rebuild_historical_summaries(from_date, to_date): """ Rebuild summaries for a date range. Use from bench console: >>> from scoopjoy.scoopjoy.utils.daily_summary import rebuild_historical_summaries >>> rebuild_historical_summaries("2025-01-01", "2025-03-31") """ current = getdate(from_date) end = getdate(to_date) count = 0
while current <= end: rows = refresh_daily_summary(date=current) count += rows current = add_days(current, 1)
if count % 100 == 0: frappe.publish_progress( percent=((getdate(str(current)) - getdate(from_date)).days / (end - getdate(from_date)).days * 100), title="Rebuilding Sales Summaries", description=f"Processing {current}...", )
frappe.db.commit() frappe.logger().info(f"Rebuilt {count} daily summary records from {from_date} to {to_date}") return countStep 3: Dashboard API reads from the summary
Section titled “Step 3: Dashboard API reads from the summary”Every dashboard metric now queries the small summary table instead of the raw invoice table — month-to-date totals, a 30-day trend, and the top 10 outlets are three quick aggregations over a few thousand rows.
import frappefrom frappe.utils import today, add_days, get_first_dayfrom frappe.query_builder.functions import Sum, Avg
@frappe.whitelist()def get_dashboard_metrics(): """ Dashboard reads from Daily Sales Summary (pre-aggregated). Sub-100ms response time vs 3+ seconds from raw Sales Invoice. """ DSS = frappe.qb.DocType("Daily Sales Summary") month_start = get_first_day(today())
# Month-to-date metrics -- single query on the summary table mtd = ( frappe.qb.from_(DSS) .select( Sum(DSS.gross_revenue).as_("total_revenue"), Sum(DSS.order_count).as_("total_orders"), Avg(DSS.avg_order_value).as_("avg_order_value"), Sum(DSS.unique_customers).as_("total_customers"), ) .where(DSS.posting_date >= month_start) ).run(as_dict=True)[0]
# Daily trend for the last 30 days trend = ( frappe.qb.from_(DSS) .select( DSS.posting_date, Sum(DSS.gross_revenue).as_("revenue"), Sum(DSS.order_count).as_("orders"), ) .where(DSS.posting_date >= add_days(today(), -30)) .groupby(DSS.posting_date) .orderby(DSS.posting_date) ).run(as_dict=True)
# Top outlets this month top_outlets = ( frappe.qb.from_(DSS) .select( DSS.franchise_outlet, DSS.outlet_name, DSS.region, Sum(DSS.gross_revenue).as_("revenue"), Sum(DSS.order_count).as_("orders"), ) .where(DSS.posting_date >= month_start) .groupby(DSS.franchise_outlet) .orderby(Sum(DSS.gross_revenue), order=frappe.qb.desc) .limit(10) ).run(as_dict=True)
return { "mtd": mtd, "trend": trend, "top_outlets": top_outlets, }Step 4: Wire it up in hooks.py
Section titled “Step 4: Wire it up in hooks.py”Register the incremental update on the Sales Invoice lifecycle events and the
nightly refresh under scheduler_events.
doc_events = { "Sales Invoice": { "on_submit": [ "scoopjoy.scoopjoy.utils.cache_manager.invalidate_sales_cache", "scoopjoy.scoopjoy.utils.daily_summary.on_sales_invoice_change", ], "on_cancel": [ "scoopjoy.scoopjoy.utils.cache_manager.invalidate_sales_cache", "scoopjoy.scoopjoy.utils.daily_summary.on_sales_invoice_change", ], }}
scheduler_events = { "daily": [ "scoopjoy.scoopjoy.utils.cache_manager.warm_caches", "scoopjoy.scoopjoy.utils.daily_summary.scheduled_daily_refresh", ]}