Skip to content

Materialized View Pattern

Problem: A ScoopJoy dashboard needs pre-aggregated daily sales data. Computing it in real-time from Sales Invoice with 500K+ records takes 3+ seconds. You need sub-100ms response times.

Solution: Use a custom DocType as a materialized view — a Daily Sales Summary populated by a scheduled job and incrementally updated whenever an invoice is submitted or cancelled.

Step 1: Create the materialized-view DocType

Section titled “Step 1: Create the materialized-view DocType”

The summary DocType stores one row per outlet per day. The autoname format:DSS-{posting_date}-{franchise_outlet} gives each row a deterministic name so the refresh job can upsert by name, and fetch_from pulls outlet metadata for free.

scoopjoy/scoopjoy/doctype/daily_sales_summary/daily_sales_summary.json
{
"doctype": "DocType",
"name": "Daily Sales Summary",
"module": "ScoopJoy",
"autoname": "format:DSS-{posting_date}-{franchise_outlet}",
"is_submittable": 0,
"allow_import": 1,
"fields": [
{"fieldname": "posting_date", "fieldtype": "Date", "in_list_view": 1, "reqd": 1},
{"fieldname": "franchise_outlet", "fieldtype": "Link", "options": "Franchise Outlet", "in_list_view": 1, "reqd": 1},
{"fieldname": "outlet_name", "fieldtype": "Data", "fetch_from": "franchise_outlet.outlet_name"},
{"fieldname": "region", "fieldtype": "Data", "fetch_from": "franchise_outlet.region"},
{"fieldname": "order_count", "fieldtype": "Int", "in_list_view": 1},
{"fieldname": "gross_revenue", "fieldtype": "Currency", "in_list_view": 1},
{"fieldname": "net_revenue", "fieldtype": "Currency"},
{"fieldname": "total_tax", "fieldtype": "Currency"},
{"fieldname": "total_discount", "fieldtype": "Currency"},
{"fieldname": "avg_order_value", "fieldtype": "Currency"},
{"fieldname": "items_sold", "fieldtype": "Float"},
{"fieldname": "unique_customers", "fieldtype": "Int"},
{"fieldname": "top_item", "fieldtype": "Data"},
{"fieldname": "top_item_qty", "fieldtype": "Float"},
{"fieldname": "last_refreshed", "fieldtype": "Datetime"}
],
"permissions": [
{"role": "Franchise Manager", "read": 1},
{"role": "System Manager", "read": 1, "write": 1}
]
}

refresh_daily_summary runs the expensive GROUP BY once, then upserts a summary row per outlet. Because the document name is derived from the date and outlet, an existing row is loaded and overwritten rather than duplicated.

scoopjoy/scoopjoy/utils/daily_summary.py
import frappe
from frappe.utils import today, now, getdate, add_days
from frappe.query_builder.functions import Sum, Count, Avg, CountDistinct
def refresh_daily_summary(date=None, outlet=None):
"""
Recompute the Daily Sales Summary for a given date and/or outlet.
If no args, refreshes today.
"""
target_date = getdate(date) if date else getdate(today())
SI = frappe.qb.DocType("Sales Invoice")
SII = frappe.qb.DocType("Sales Invoice Item")
# Main aggregation
query = (
frappe.qb.from_(SI)
.join(SII).on(SII.parent == SI.name)
.select(
SI.posting_date,
SI.franchise_outlet,
Count(SI.name).as_("order_count"),
Sum(SI.grand_total).as_("gross_revenue"),
Sum(SI.net_total).as_("net_revenue"),
Sum(SI.total_taxes_and_charges).as_("total_tax"),
Sum(SI.discount_amount).as_("total_discount"),
Avg(SI.grand_total).as_("avg_order_value"),
Sum(SII.qty).as_("items_sold"),
CountDistinct(SI.customer).as_("unique_customers"),
)
.where(SI.docstatus == 1)
.where(SI.posting_date == target_date)
.groupby(SI.franchise_outlet)
)
if outlet:
query = query.where(SI.franchise_outlet == outlet)
rows = query.run(as_dict=True)
# Top item per outlet (batch)
outlet_names = [r.franchise_outlet for r in rows]
top_items = _get_top_items_for_date(target_date, outlet_names)
# Upsert each row
for row in rows:
summary_name = f"DSS-{target_date}-{row.franchise_outlet}"
if frappe.db.exists("Daily Sales Summary", summary_name):
doc = frappe.get_doc("Daily Sales Summary", summary_name)
else:
doc = frappe.new_doc("Daily Sales Summary")
doc.name = summary_name
doc.posting_date = target_date
doc.franchise_outlet = row.franchise_outlet
doc.order_count = row.order_count
doc.gross_revenue = row.gross_revenue
doc.net_revenue = row.net_revenue
doc.total_tax = row.total_tax
doc.total_discount = row.total_discount
doc.avg_order_value = row.avg_order_value
doc.items_sold = row.items_sold
doc.unique_customers = row.unique_customers
top = top_items.get(row.franchise_outlet)
if top:
doc.top_item = top["item_name"]
doc.top_item_qty = top["total_qty"]
doc.last_refreshed = now()
doc.save(ignore_permissions=True)
frappe.db.commit()
return len(rows)

The “top item per outlet” is a windowed query — ROW_NUMBER() OVER (PARTITION BY ...) ranks items within each outlet, and the outer query keeps only the rank-1 row. This is one of those cases where raw SQL is clearer than the query builder:

scoopjoy/scoopjoy/utils/daily_summary.py — _get_top_items_for_date()
SELECT franchise_outlet, item_name, total_qty
FROM (
SELECT
si.franchise_outlet,
sii.item_name,
SUM(sii.qty) AS total_qty,
ROW_NUMBER() OVER (
PARTITION BY si.franchise_outlet
ORDER BY SUM(sii.qty) DESC
) AS rn
FROM `tabSales Invoice Item` sii
JOIN `tabSales Invoice` si ON sii.parent = si.name
WHERE si.docstatus = 1
AND si.posting_date = %(date)s
AND si.franchise_outlet IN %(outlets)s
GROUP BY si.franchise_outlet, sii.item_name
) ranked WHERE rn = 1
scoopjoy/scoopjoy/utils/daily_summary.py
def _get_top_items_for_date(date, outlet_names):
if not outlet_names:
return {}
result = frappe.db.sql(
""" /* windowed top-item query shown above */ """,
{"date": date, "outlets": outlet_names},
as_dict=True,
)
return {r.franchise_outlet: r for r in result}

Wired through doc_events, this refreshes only the affected outlet and date when an invoice changes — keeping the summary near-real-time without rescanning everything.

scoopjoy/scoopjoy/utils/daily_summary.py
def on_sales_invoice_change(doc, method):
"""
Called via doc_events hook when a Sales Invoice is submitted or cancelled.
Refreshes only the affected outlet + date.
"""
if doc.franchise_outlet and doc.posting_date:
refresh_daily_summary(date=doc.posting_date, outlet=doc.franchise_outlet)

The nightly job refreshes today and yesterday, so late or backdated entries are caught even if no submit event fired for them.

scoopjoy/scoopjoy/utils/daily_summary.py
def scheduled_daily_refresh():
"""
Nightly job: refresh today and yesterday (catches late entries).
Register in hooks.py scheduler_events.
"""
refresh_daily_summary(date=today())
refresh_daily_summary(date=add_days(today(), -1))
frappe.logger().info("Daily Sales Summary: nightly refresh complete")

For backfilling a whole range — run from the bench console — iterate day by day and publish progress so the operator sees movement on large rebuilds.

scoopjoy/scoopjoy/utils/daily_summary.py
def rebuild_historical_summaries(from_date, to_date):
"""
Rebuild summaries for a date range. Use from bench console:
>>> from scoopjoy.scoopjoy.utils.daily_summary import rebuild_historical_summaries
>>> rebuild_historical_summaries("2025-01-01", "2025-03-31")
"""
current = getdate(from_date)
end = getdate(to_date)
count = 0
while current <= end:
rows = refresh_daily_summary(date=current)
count += rows
current = add_days(current, 1)
if count % 100 == 0:
frappe.publish_progress(
percent=((getdate(str(current)) - getdate(from_date)).days /
(end - getdate(from_date)).days * 100),
title="Rebuilding Sales Summaries",
description=f"Processing {current}...",
)
frappe.db.commit()
frappe.logger().info(f"Rebuilt {count} daily summary records from {from_date} to {to_date}")
return count

Step 3: Dashboard API reads from the summary

Section titled “Step 3: Dashboard API reads from the summary”

Every dashboard metric now queries the small summary table instead of the raw invoice table — month-to-date totals, a 30-day trend, and the top 10 outlets are three quick aggregations over a few thousand rows.

scoopjoy/scoopjoy/api/dashboard.py
import frappe
from frappe.utils import today, add_days, get_first_day
from frappe.query_builder.functions import Sum, Avg
@frappe.whitelist()
def get_dashboard_metrics():
"""
Dashboard reads from Daily Sales Summary (pre-aggregated).
Sub-100ms response time vs 3+ seconds from raw Sales Invoice.
"""
DSS = frappe.qb.DocType("Daily Sales Summary")
month_start = get_first_day(today())
# Month-to-date metrics -- single query on the summary table
mtd = (
frappe.qb.from_(DSS)
.select(
Sum(DSS.gross_revenue).as_("total_revenue"),
Sum(DSS.order_count).as_("total_orders"),
Avg(DSS.avg_order_value).as_("avg_order_value"),
Sum(DSS.unique_customers).as_("total_customers"),
)
.where(DSS.posting_date >= month_start)
).run(as_dict=True)[0]
# Daily trend for the last 30 days
trend = (
frappe.qb.from_(DSS)
.select(
DSS.posting_date,
Sum(DSS.gross_revenue).as_("revenue"),
Sum(DSS.order_count).as_("orders"),
)
.where(DSS.posting_date >= add_days(today(), -30))
.groupby(DSS.posting_date)
.orderby(DSS.posting_date)
).run(as_dict=True)
# Top outlets this month
top_outlets = (
frappe.qb.from_(DSS)
.select(
DSS.franchise_outlet,
DSS.outlet_name,
DSS.region,
Sum(DSS.gross_revenue).as_("revenue"),
Sum(DSS.order_count).as_("orders"),
)
.where(DSS.posting_date >= month_start)
.groupby(DSS.franchise_outlet)
.orderby(Sum(DSS.gross_revenue), order=frappe.qb.desc)
.limit(10)
).run(as_dict=True)
return {
"mtd": mtd,
"trend": trend,
"top_outlets": top_outlets,
}

Register the incremental update on the Sales Invoice lifecycle events and the nightly refresh under scheduler_events.

scoopjoy/hooks.py
doc_events = {
"Sales Invoice": {
"on_submit": [
"scoopjoy.scoopjoy.utils.cache_manager.invalidate_sales_cache",
"scoopjoy.scoopjoy.utils.daily_summary.on_sales_invoice_change",
],
"on_cancel": [
"scoopjoy.scoopjoy.utils.cache_manager.invalidate_sales_cache",
"scoopjoy.scoopjoy.utils.daily_summary.on_sales_invoice_change",
],
}
}
scheduler_events = {
"daily": [
"scoopjoy.scoopjoy.utils.cache_manager.warm_caches",
"scoopjoy.scoopjoy.utils.daily_summary.scheduled_daily_refresh",
]
}