Skip to content

Scheduled Task Recipes

Problem: ScoopJoy’s franchise operation needs five different recurring jobs — daily royalty invoicing, hourly POS stock sync, a 15-minute outlet heartbeat check, a weekly performance digest, and monthly franchise-fee invoices. Each runs on its own cadence and must be safe to re-run.

Solution: Declare every task in scheduler_events inside hooks.py, mixing named intervals (hourly, weekly, monthly) with cron expressions for finer control. Frappe’s scheduler dispatches each dotted path on its cadence; the task functions themselves stay idempotent and lightweight.

The scheduler_events dict maps a cadence to a list of dotted Python paths. The cron key takes a nested dict of cron expressions — here 0 6 * * * (daily at 6 AM) and */15 * * * * (every 15 minutes) — while hourly, weekly, and monthly are Frappe’s named intervals.

apps/scoopjoy/scoopjoy/hooks.py
scheduler_events = {
# (a) Daily at 6 AM: generate franchise royalty invoices
"cron": {
"0 6 * * *": [
"scoopjoy.tasks.royalty.generate_daily_royalty_invoices"
],
# (c) Every 15 minutes: check outlet health/heartbeat
"*/15 * * * *": [
"scoopjoy.tasks.health.check_outlet_heartbeat"
],
},
# (b) Hourly: sync stock levels from POS terminals
"hourly": [
"scoopjoy.tasks.stock.hourly_pos_stock_sync"
],
# (d) Weekly: send franchise performance digest
"weekly": [
"scoopjoy.tasks.reports.send_weekly_performance_digest"
],
# (e) Monthly: auto-create recurring franchise fee invoices
"monthly": [
"scoopjoy.tasks.billing.create_monthly_franchise_fee_invoices"
],
}

Task (a): Daily Royalty Invoice Generation

Section titled “Task (a): Daily Royalty Invoice Generation”

Driven by the 0 6 * * * cron entry. It pulls active franchise agreements, then creates one royalty Sales Invoice per franchise from yesterday’s POS sales. The frappe.db.exists guard at the top of create_royalty_invoice makes the whole run idempotent — a re-run on the same date creates nothing.

apps/scoopjoy/scoopjoy/tasks/royalty.py
import frappe
from frappe import _
from frappe.utils import today, add_days, flt, getdate
def generate_daily_royalty_invoices():
"""
Scheduled daily at 6 AM via cron.
Creates Sales Invoices for franchise royalty based on yesterday's POS sales.
"""
yesterday = add_days(today(), -1)
active_agreements = frappe.get_all(
"Franchise Agreement",
filters={
"docstatus": 1,
"agreement_status": "Active",
"start_date": ("<=", yesterday),
"end_date": (">=", yesterday),
},
fields=["name", "franchise_name", "franchisee", "company", "royalty_percentage", "territory"],
)
created = 0
for agreement in active_agreements:
try:
invoice = create_royalty_invoice(agreement, yesterday)
if invoice:
created += 1
except Exception:
frappe.log_error(
title=_("Royalty Invoice Error: {0}").format(agreement.franchise_name),
)
frappe.db.commit()
frappe.logger("scoopjoy").info(
f"Royalty invoices: {created}/{len(active_agreements)} created for {yesterday}"
)
def create_royalty_invoice(agreement, date):
"""Create a single royalty invoice for one franchise agreement."""
# Check if already created (idempotent)
existing = frappe.db.exists(
"Sales Invoice",
{
"custom_franchise_agreement": agreement.name,
"custom_royalty_date": date,
"docstatus": ("!=", 2),
},
)
if existing:
return None
# Calculate yesterday's total POS sales for this franchise
total_sales = frappe.db.sql(
"""
SELECT COALESCE(SUM(grand_total), 0) as total
FROM `tabPOS Invoice`
WHERE posting_date = %(date)s
AND docstatus = 1
AND custom_franchise_agreement = %(agreement)s
""",
{"date": date, "agreement": agreement.name},
as_dict=True,
)[0].total
if flt(total_sales) == 0:
return None
royalty_amount = flt(total_sales) * flt(agreement.royalty_percentage) / 100
invoice = frappe.get_doc(
{
"doctype": "Sales Invoice",
"customer": agreement.franchisee,
"company": agreement.company,
"posting_date": date,
"due_date": add_days(date, 15),
"custom_franchise_agreement": agreement.name,
"custom_royalty_date": date,
"items": [
{
"item_code": "Franchise Royalty Fee",
"qty": 1,
"rate": royalty_amount,
"description": _("Royalty at {0}% on sales of {1} for {2} ({3})").format(
agreement.royalty_percentage,
frappe.format_value(total_sales, {"fieldtype": "Currency"}),
agreement.franchise_name,
date,
),
}
],
}
)
invoice.insert(ignore_permissions=True)
invoice.submit()
return invoice

The hourly task is the textbook lightweight dispatcher: it counts active outlets, then hands the heavy work to a background job via frappe.enqueue on the long queue. Using a fixed job_id plus deduplicate=True means a slow run won’t pile up duplicate jobs.

apps/scoopjoy/scoopjoy/tasks/stock.py
import frappe
from frappe import _
def hourly_pos_stock_sync():
"""
Hourly scheduler task.
Enqueues a background job for the heavy lifting — scheduler tasks
should be lightweight dispatchers, not heavy processors.
"""
active_count = frappe.db.count(
"Franchise Agreement",
{"docstatus": 1, "agreement_status": "Active"},
)
if active_count == 0:
return
frappe.enqueue(
"scoopjoy.stock_sync.process_stock_sync",
queue="long",
timeout=1800,
job_id="scoopjoy_stock_sync::hourly",
deduplicate=True,
)
frappe.db.commit()
frappe.logger("scoopjoy").info(f"Hourly stock sync enqueued for {active_count} outlets.")

Task (c): Outlet Heartbeat Check (Every 15 Minutes)

Section titled “Task (c): Outlet Heartbeat Check (Every 15 Minutes)”

Driven by */15 * * * *. Each run flags outlets whose last heartbeat is older than 30 minutes, flips their status to Offline only on a transition, and sends a single consolidated alert rather than one email per outlet.

apps/scoopjoy/scoopjoy/tasks/health.py
import frappe
from frappe import _
from frappe.utils import now_datetime, time_diff_in_seconds
def check_outlet_heartbeat():
"""
Cron: every 15 minutes.
Check if franchise POS terminals have reported in recently.
Flag outlets that haven't sent data in 30+ minutes.
"""
threshold_seconds = 1800 # 30 minutes
active_outlets = frappe.get_all(
"Franchise Agreement",
filters={"docstatus": 1, "agreement_status": "Active"},
fields=["name", "franchise_name", "franchise_manager_email", "custom_last_heartbeat"],
)
now = now_datetime()
offline_outlets = []
for outlet in active_outlets:
if not outlet.custom_last_heartbeat:
offline_outlets.append(outlet)
continue
seconds_since = time_diff_in_seconds(now, outlet.custom_last_heartbeat)
if seconds_since > threshold_seconds:
offline_outlets.append(outlet)
# Update status only if it was previously online
current_status = frappe.db.get_value(
"Franchise Agreement", outlet.name, "custom_outlet_status"
)
if current_status != "Offline":
frappe.db.set_value(
"Franchise Agreement", outlet.name, "custom_outlet_status", "Offline"
)
if offline_outlets:
notify_operations_team(offline_outlets)
frappe.db.commit()
frappe.logger("scoopjoy").info(
f"Heartbeat check: {len(offline_outlets)}/{len(active_outlets)} outlets offline."
)
def notify_operations_team(offline_outlets):
"""Send a single consolidated alert to the operations team."""
outlet_names = [o.franchise_name for o in offline_outlets]
frappe.sendmail(
recipients=get_operations_team_emails(),
subject=_("ScoopJoy Alert: {0} Outlet(s) Offline").format(len(offline_outlets)),
template="outlet_offline_alert",
args={
"offline_outlets": outlet_names,
"count": len(offline_outlets),
"check_time": now_datetime(),
},
now=True,
)
def get_operations_team_emails():
"""Get emails of users with the Operations Manager role."""
return frappe.get_all(
"Has Role",
filters={"role": "Operations Manager", "parenttype": "User"},
pluck="parent",
)

The weekly task builds a per-franchise sales summary for the previous seven days, emails each franchise manager their digest, then sends HQ a consolidated report. Each agreement is wrapped in its own try/except so one bad franchise doesn’t abort the whole batch.

apps/scoopjoy/scoopjoy/tasks/reports.py
import frappe
from frappe import _
from frappe.utils import today, add_days, flt, fmt_money
def send_weekly_performance_digest():
"""
Weekly scheduler task.
Sends a performance summary email to each franchise owner
and a consolidated report to HQ management.
"""
week_start = add_days(today(), -7)
week_end = add_days(today(), -1)
agreements = frappe.get_all(
"Franchise Agreement",
filters={"docstatus": 1, "agreement_status": "Active"},
fields=[
"name", "franchise_name", "franchisee", "territory",
"franchise_manager_email", "franchise_warehouse",
],
)
all_summaries = []
for agreement in agreements:
try:
summary = build_franchise_summary(agreement, week_start, week_end)
all_summaries.append(summary)
# Send individual digest to franchise manager
frappe.sendmail(
recipients=[agreement.franchise_manager_email],
subject=_("ScoopJoy Weekly Digest: {0} ({1} to {2})").format(
agreement.franchise_name, week_start, week_end
),
template="franchise_weekly_digest",
args=summary,
)
except Exception:
frappe.log_error(
title=_("Weekly Digest Error: {0}").format(agreement.franchise_name),
)
# Consolidated HQ report
if all_summaries:
send_hq_consolidated_report(all_summaries, week_start, week_end)
frappe.db.commit()
def build_franchise_summary(agreement, week_start, week_end):
"""Build sales/stock summary for one franchise."""
sales_data = frappe.db.sql(
"""
SELECT
COUNT(*) as invoice_count,
COALESCE(SUM(grand_total), 0) as total_revenue,
COALESCE(AVG(grand_total), 0) as avg_ticket_size
FROM `tabPOS Invoice`
WHERE posting_date BETWEEN %(start)s AND %(end)s
AND docstatus = 1
AND custom_franchise_agreement = %(agreement)s
""",
{"start": week_start, "end": week_end, "agreement": agreement.name},
as_dict=True,
)[0]
return {
"franchise_name": agreement.franchise_name,
"territory": agreement.territory,
"invoice_count": sales_data.invoice_count,
"total_revenue": flt(sales_data.total_revenue),
"total_revenue_formatted": fmt_money(sales_data.total_revenue, currency="INR"),
"avg_ticket_size_formatted": fmt_money(sales_data.avg_ticket_size, currency="INR"),
"week_start": week_start,
"week_end": week_end,
}
def send_hq_consolidated_report(summaries, week_start, week_end):
"""Send consolidated report to HQ management."""
total_revenue = sum(s["total_revenue"] for s in summaries)
hq_emails = frappe.get_all(
"Has Role",
filters={"role": "Franchise Director", "parenttype": "User"},
pluck="parent",
)
if hq_emails:
frappe.sendmail(
recipients=hq_emails,
subject=_("ScoopJoy HQ Weekly Report: {0} to {1}").format(week_start, week_end),
template="hq_weekly_digest",
args={
"summaries": summaries,
"total_revenue_formatted": fmt_money(total_revenue, currency="INR"),
"outlet_count": len(summaries),
"week_start": week_start,
"week_end": week_end,
},
)

The monthly task creates one recurring franchise-fee invoice per active agreement. The frappe.db.exists check — keyed on the agreement, the Monthly Franchise Fee invoice type, and a between filter on the current month — keeps it idempotent and tracks how many were skipped.

apps/scoopjoy/scoopjoy/tasks/billing.py
import frappe
from frappe import _
from frappe.utils import today, get_first_day, get_last_day, add_days, flt
def create_monthly_franchise_fee_invoices():
"""
Monthly scheduler task.
Creates recurring franchise fee invoices for all active agreements.
"""
current_month_start = get_first_day(today())
current_month_end = get_last_day(today())
agreements = frappe.get_all(
"Franchise Agreement",
filters={
"docstatus": 1,
"agreement_status": "Active",
"start_date": ("<=", current_month_end),
"end_date": (">=", current_month_start),
},
fields=["name", "franchise_name", "franchisee", "company", "franchise_fee"],
)
created = 0
skipped = 0
for agreement in agreements:
try:
# Idempotency: check if invoice already exists for this month
existing = frappe.db.exists(
"Sales Invoice",
{
"custom_franchise_agreement": agreement.name,
"custom_invoice_type": "Monthly Franchise Fee",
"posting_date": ("between", [current_month_start, current_month_end]),
"docstatus": ("!=", 2),
},
)
if existing:
skipped += 1
continue
invoice = frappe.get_doc(
{
"doctype": "Sales Invoice",
"customer": agreement.franchisee,
"company": agreement.company,
"posting_date": current_month_start,
"due_date": add_days(current_month_start, 30),
"custom_franchise_agreement": agreement.name,
"custom_invoice_type": "Monthly Franchise Fee",
"items": [
{
"item_code": "Monthly Franchise Fee",
"qty": 1,
"rate": flt(agreement.franchise_fee),
"description": _("Monthly franchise fee for {0}{1}").format(
agreement.franchise_name,
current_month_start.strftime("%B %Y")
if hasattr(current_month_start, "strftime")
else current_month_start,
),
}
],
}
)
invoice.insert(ignore_permissions=True)
invoice.submit()
created += 1
except Exception:
frappe.log_error(
title=_("Monthly Fee Invoice Error: {0}").format(agreement.franchise_name),
)
frappe.db.commit()
frappe.logger("scoopjoy").info(
f"Monthly fee invoices: {created} created, {skipped} skipped (already exist)."
)