Scheduled Task Recipes
Problem: ScoopJoy’s franchise operation needs five different recurring jobs — daily royalty invoicing, hourly POS stock sync, a 15-minute outlet heartbeat check, a weekly performance digest, and monthly franchise-fee invoices. Each runs on its own cadence and must be safe to re-run.
Solution: Declare every task in scheduler_events inside hooks.py, mixing
named intervals (hourly, weekly, monthly) with cron expressions for finer
control. Frappe’s scheduler dispatches each dotted path on its cadence; the task
functions themselves stay idempotent and lightweight.
Registering the schedule
Section titled “Registering the schedule”The scheduler_events dict maps a cadence to a list of dotted Python paths. The
cron key takes a nested dict of cron expressions — here 0 6 * * * (daily at
6 AM) and */15 * * * * (every 15 minutes) — while hourly, weekly, and
monthly are Frappe’s named intervals.
scheduler_events = { # (a) Daily at 6 AM: generate franchise royalty invoices "cron": { "0 6 * * *": [ "scoopjoy.tasks.royalty.generate_daily_royalty_invoices" ], # (c) Every 15 minutes: check outlet health/heartbeat "*/15 * * * *": [ "scoopjoy.tasks.health.check_outlet_heartbeat" ], }, # (b) Hourly: sync stock levels from POS terminals "hourly": [ "scoopjoy.tasks.stock.hourly_pos_stock_sync" ], # (d) Weekly: send franchise performance digest "weekly": [ "scoopjoy.tasks.reports.send_weekly_performance_digest" ], # (e) Monthly: auto-create recurring franchise fee invoices "monthly": [ "scoopjoy.tasks.billing.create_monthly_franchise_fee_invoices" ],}Task (a): Daily Royalty Invoice Generation
Section titled “Task (a): Daily Royalty Invoice Generation”Driven by the 0 6 * * * cron entry. It pulls active franchise agreements, then
creates one royalty Sales Invoice per franchise from yesterday’s POS sales. The
frappe.db.exists guard at the top of create_royalty_invoice makes the whole run
idempotent — a re-run on the same date creates nothing.
import frappefrom frappe import _from frappe.utils import today, add_days, flt, getdate
def generate_daily_royalty_invoices(): """ Scheduled daily at 6 AM via cron. Creates Sales Invoices for franchise royalty based on yesterday's POS sales. """ yesterday = add_days(today(), -1)
active_agreements = frappe.get_all( "Franchise Agreement", filters={ "docstatus": 1, "agreement_status": "Active", "start_date": ("<=", yesterday), "end_date": (">=", yesterday), }, fields=["name", "franchise_name", "franchisee", "company", "royalty_percentage", "territory"], )
created = 0 for agreement in active_agreements: try: invoice = create_royalty_invoice(agreement, yesterday) if invoice: created += 1 except Exception: frappe.log_error( title=_("Royalty Invoice Error: {0}").format(agreement.franchise_name), )
frappe.db.commit()
frappe.logger("scoopjoy").info( f"Royalty invoices: {created}/{len(active_agreements)} created for {yesterday}" )
def create_royalty_invoice(agreement, date): """Create a single royalty invoice for one franchise agreement.""" # Check if already created (idempotent) existing = frappe.db.exists( "Sales Invoice", { "custom_franchise_agreement": agreement.name, "custom_royalty_date": date, "docstatus": ("!=", 2), }, ) if existing: return None
# Calculate yesterday's total POS sales for this franchise total_sales = frappe.db.sql( """ SELECT COALESCE(SUM(grand_total), 0) as total FROM `tabPOS Invoice` WHERE posting_date = %(date)s AND docstatus = 1 AND custom_franchise_agreement = %(agreement)s """, {"date": date, "agreement": agreement.name}, as_dict=True, )[0].total
if flt(total_sales) == 0: return None
royalty_amount = flt(total_sales) * flt(agreement.royalty_percentage) / 100
invoice = frappe.get_doc( { "doctype": "Sales Invoice", "customer": agreement.franchisee, "company": agreement.company, "posting_date": date, "due_date": add_days(date, 15), "custom_franchise_agreement": agreement.name, "custom_royalty_date": date, "items": [ { "item_code": "Franchise Royalty Fee", "qty": 1, "rate": royalty_amount, "description": _("Royalty at {0}% on sales of {1} for {2} ({3})").format( agreement.royalty_percentage, frappe.format_value(total_sales, {"fieldtype": "Currency"}), agreement.franchise_name, date, ), } ], } ) invoice.insert(ignore_permissions=True) invoice.submit() return invoiceTask (b): Hourly POS Stock Sync
Section titled “Task (b): Hourly POS Stock Sync”The hourly task is the textbook lightweight dispatcher: it counts active outlets,
then hands the heavy work to a background job via frappe.enqueue on the long
queue. Using a fixed job_id plus deduplicate=True means a slow run won’t pile up
duplicate jobs.
import frappefrom frappe import _
def hourly_pos_stock_sync(): """ Hourly scheduler task. Enqueues a background job for the heavy lifting — scheduler tasks should be lightweight dispatchers, not heavy processors. """ active_count = frappe.db.count( "Franchise Agreement", {"docstatus": 1, "agreement_status": "Active"}, )
if active_count == 0: return
frappe.enqueue( "scoopjoy.stock_sync.process_stock_sync", queue="long", timeout=1800, job_id="scoopjoy_stock_sync::hourly", deduplicate=True, )
frappe.db.commit() frappe.logger("scoopjoy").info(f"Hourly stock sync enqueued for {active_count} outlets.")Task (c): Outlet Heartbeat Check (Every 15 Minutes)
Section titled “Task (c): Outlet Heartbeat Check (Every 15 Minutes)”Driven by */15 * * * *. Each run flags outlets whose last heartbeat is older than
30 minutes, flips their status to Offline only on a transition, and sends a single
consolidated alert rather than one email per outlet.
import frappefrom frappe import _from frappe.utils import now_datetime, time_diff_in_seconds
def check_outlet_heartbeat(): """ Cron: every 15 minutes. Check if franchise POS terminals have reported in recently. Flag outlets that haven't sent data in 30+ minutes. """ threshold_seconds = 1800 # 30 minutes
active_outlets = frappe.get_all( "Franchise Agreement", filters={"docstatus": 1, "agreement_status": "Active"}, fields=["name", "franchise_name", "franchise_manager_email", "custom_last_heartbeat"], )
now = now_datetime() offline_outlets = []
for outlet in active_outlets: if not outlet.custom_last_heartbeat: offline_outlets.append(outlet) continue
seconds_since = time_diff_in_seconds(now, outlet.custom_last_heartbeat)
if seconds_since > threshold_seconds: offline_outlets.append(outlet)
# Update status only if it was previously online current_status = frappe.db.get_value( "Franchise Agreement", outlet.name, "custom_outlet_status" ) if current_status != "Offline": frappe.db.set_value( "Franchise Agreement", outlet.name, "custom_outlet_status", "Offline" )
if offline_outlets: notify_operations_team(offline_outlets)
frappe.db.commit()
frappe.logger("scoopjoy").info( f"Heartbeat check: {len(offline_outlets)}/{len(active_outlets)} outlets offline." )
def notify_operations_team(offline_outlets): """Send a single consolidated alert to the operations team.""" outlet_names = [o.franchise_name for o in offline_outlets]
frappe.sendmail( recipients=get_operations_team_emails(), subject=_("ScoopJoy Alert: {0} Outlet(s) Offline").format(len(offline_outlets)), template="outlet_offline_alert", args={ "offline_outlets": outlet_names, "count": len(offline_outlets), "check_time": now_datetime(), }, now=True, )
def get_operations_team_emails(): """Get emails of users with the Operations Manager role.""" return frappe.get_all( "Has Role", filters={"role": "Operations Manager", "parenttype": "User"}, pluck="parent", )Task (d): Weekly Performance Digest
Section titled “Task (d): Weekly Performance Digest”The weekly task builds a per-franchise sales summary for the previous seven days,
emails each franchise manager their digest, then sends HQ a consolidated report.
Each agreement is wrapped in its own try/except so one bad franchise doesn’t abort
the whole batch.
import frappefrom frappe import _from frappe.utils import today, add_days, flt, fmt_money
def send_weekly_performance_digest(): """ Weekly scheduler task. Sends a performance summary email to each franchise owner and a consolidated report to HQ management. """ week_start = add_days(today(), -7) week_end = add_days(today(), -1)
agreements = frappe.get_all( "Franchise Agreement", filters={"docstatus": 1, "agreement_status": "Active"}, fields=[ "name", "franchise_name", "franchisee", "territory", "franchise_manager_email", "franchise_warehouse", ], )
all_summaries = []
for agreement in agreements: try: summary = build_franchise_summary(agreement, week_start, week_end) all_summaries.append(summary)
# Send individual digest to franchise manager frappe.sendmail( recipients=[agreement.franchise_manager_email], subject=_("ScoopJoy Weekly Digest: {0} ({1} to {2})").format( agreement.franchise_name, week_start, week_end ), template="franchise_weekly_digest", args=summary, ) except Exception: frappe.log_error( title=_("Weekly Digest Error: {0}").format(agreement.franchise_name), )
# Consolidated HQ report if all_summaries: send_hq_consolidated_report(all_summaries, week_start, week_end)
frappe.db.commit()
def build_franchise_summary(agreement, week_start, week_end): """Build sales/stock summary for one franchise.""" sales_data = frappe.db.sql( """ SELECT COUNT(*) as invoice_count, COALESCE(SUM(grand_total), 0) as total_revenue, COALESCE(AVG(grand_total), 0) as avg_ticket_size FROM `tabPOS Invoice` WHERE posting_date BETWEEN %(start)s AND %(end)s AND docstatus = 1 AND custom_franchise_agreement = %(agreement)s """, {"start": week_start, "end": week_end, "agreement": agreement.name}, as_dict=True, )[0]
return { "franchise_name": agreement.franchise_name, "territory": agreement.territory, "invoice_count": sales_data.invoice_count, "total_revenue": flt(sales_data.total_revenue), "total_revenue_formatted": fmt_money(sales_data.total_revenue, currency="INR"), "avg_ticket_size_formatted": fmt_money(sales_data.avg_ticket_size, currency="INR"), "week_start": week_start, "week_end": week_end, }
def send_hq_consolidated_report(summaries, week_start, week_end): """Send consolidated report to HQ management.""" total_revenue = sum(s["total_revenue"] for s in summaries)
hq_emails = frappe.get_all( "Has Role", filters={"role": "Franchise Director", "parenttype": "User"}, pluck="parent", )
if hq_emails: frappe.sendmail( recipients=hq_emails, subject=_("ScoopJoy HQ Weekly Report: {0} to {1}").format(week_start, week_end), template="hq_weekly_digest", args={ "summaries": summaries, "total_revenue_formatted": fmt_money(total_revenue, currency="INR"), "outlet_count": len(summaries), "week_start": week_start, "week_end": week_end, }, )Task (e): Monthly Franchise Fee Invoices
Section titled “Task (e): Monthly Franchise Fee Invoices”The monthly task creates one recurring franchise-fee invoice per active agreement.
The frappe.db.exists check — keyed on the agreement, the Monthly Franchise Fee
invoice type, and a between filter on the current month — keeps it idempotent and
tracks how many were skipped.
import frappefrom frappe import _from frappe.utils import today, get_first_day, get_last_day, add_days, flt
def create_monthly_franchise_fee_invoices(): """ Monthly scheduler task. Creates recurring franchise fee invoices for all active agreements. """ current_month_start = get_first_day(today()) current_month_end = get_last_day(today())
agreements = frappe.get_all( "Franchise Agreement", filters={ "docstatus": 1, "agreement_status": "Active", "start_date": ("<=", current_month_end), "end_date": (">=", current_month_start), }, fields=["name", "franchise_name", "franchisee", "company", "franchise_fee"], )
created = 0 skipped = 0
for agreement in agreements: try: # Idempotency: check if invoice already exists for this month existing = frappe.db.exists( "Sales Invoice", { "custom_franchise_agreement": agreement.name, "custom_invoice_type": "Monthly Franchise Fee", "posting_date": ("between", [current_month_start, current_month_end]), "docstatus": ("!=", 2), }, )
if existing: skipped += 1 continue
invoice = frappe.get_doc( { "doctype": "Sales Invoice", "customer": agreement.franchisee, "company": agreement.company, "posting_date": current_month_start, "due_date": add_days(current_month_start, 30), "custom_franchise_agreement": agreement.name, "custom_invoice_type": "Monthly Franchise Fee", "items": [ { "item_code": "Monthly Franchise Fee", "qty": 1, "rate": flt(agreement.franchise_fee), "description": _("Monthly franchise fee for {0} — {1}").format( agreement.franchise_name, current_month_start.strftime("%B %Y") if hasattr(current_month_start, "strftime") else current_month_start, ), } ], } ) invoice.insert(ignore_permissions=True) invoice.submit() created += 1
except Exception: frappe.log_error( title=_("Monthly Fee Invoice Error: {0}").format(agreement.franchise_name), )
frappe.db.commit()
frappe.logger("scoopjoy").info( f"Monthly fee invoices: {created} created, {skipped} skipped (already exist)." )