Skip to content

Scheduled Job Orchestration

Solution: Model each run as a Batch Run document with a child table of steps, drive the whole thing from a single scheduler_events cron entry, and execute the pipeline inside one long-queue background job that resolves dependencies and isolates each step in its own try/except + frappe.db.commit().

The dependency graph below is what the orchestrator walks: solid arrows are depends_on edges; Send Reports and Sync to HQ are independent and run even if invoice generation fails.

Nightly batch pipeline
Rendering diagram…

Two DocTypes track state: a Batch Run parent that holds the overall status and an execution log, and a Batch Run Step child table (istable: 1) with one row per pipeline step. The step row carries is_critical, retry_count, and a per-step status so the dashboard can show exactly where a run stalled.

scoopjoy/scoopjoy/doctype/batch_run/batch_run.json
{
"name": "Batch Run",
"module": "ScoopJoy",
"autoname": "SJ-BATCH-.YYYY.-.#####",
"fields": [
{ "fieldname": "run_date", "fieldtype": "Date", "label": "Run Date", "reqd": 1, "default": "Today" },
{
"fieldname": "status",
"fieldtype": "Select",
"label": "Status",
"options": "\nQueued\nRunning\nCompleted\nCompleted with Errors\nFailed",
"default": "Queued",
"in_list_view": 1
},
{ "fieldname": "started_at", "fieldtype": "Datetime", "label": "Started At", "read_only": 1 },
{ "fieldname": "completed_at", "fieldtype": "Datetime", "label": "Completed At", "read_only": 1 },
{ "fieldname": "triggered_by", "fieldtype": "Link", "label": "Triggered By", "options": "User", "read_only": 1 },
{ "fieldname": "section_steps", "fieldtype": "Section Break", "label": "Pipeline Steps" },
{ "fieldname": "steps", "fieldtype": "Table", "label": "Steps", "options": "Batch Run Step" },
{ "fieldname": "section_log", "fieldtype": "Section Break", "label": "Execution Log", "collapsible": 1 },
{ "fieldname": "execution_log", "fieldtype": "Long Text", "label": "Execution Log", "read_only": 1 }
]
}
scoopjoy/scoopjoy/doctype/batch_run_step/batch_run_step.json
{
"name": "Batch Run Step",
"module": "ScoopJoy",
"istable": 1,
"fields": [
{ "fieldname": "step_name", "fieldtype": "Data", "label": "Step", "in_list_view": 1, "read_only": 1 },
{ "fieldname": "step_order", "fieldtype": "Int", "label": "Order", "in_list_view": 1, "read_only": 1 },
{
"fieldname": "status",
"fieldtype": "Select",
"label": "Status",
"options": "\nPending\nRunning\nSuccess\nFailed\nSkipped",
"default": "Pending",
"in_list_view": 1
},
{ "fieldname": "started_at", "fieldtype": "Datetime", "label": "Started At", "read_only": 1 },
{ "fieldname": "completed_at", "fieldtype": "Datetime", "label": "Completed At", "read_only": 1 },
{ "fieldname": "error_message", "fieldtype": "Long Text", "label": "Error", "read_only": 1 },
{ "fieldname": "is_critical", "fieldtype": "Check", "label": "Critical (stops pipeline on failure)", "read_only": 1 },
{ "fieldname": "retry_count", "fieldtype": "Int", "label": "Retry Count", "default": 0, "read_only": 1 }
]
}

The orchestrator is plain Python. PIPELINE_STEPS is the source of truth: each entry declares a display name, the dotted method path to run, a critical flag, and a depends_on list. The entry point run_nightly_pipeline creates the Batch Run record then hands execution to a long-queue job — the cron call returns immediately while the work happens in the background.

scoopjoy/scoopjoy/batch_pipeline.py
import frappe
from frappe.utils import now_datetime
import traceback
# Pipeline step definitions. Each step is a dict with:
# name: display name
# method: dotted path to the function
# critical: if True, pipeline stops on failure
# depends_on: list of step names that must succeed first
PIPELINE_STEPS = [
{
"name": "Close POS Shifts",
"method": "scoopjoy.scoopjoy.batch_pipeline.step_close_pos",
"critical": True,
"depends_on": [],
},
{
"name": "Calculate Royalties",
"method": "scoopjoy.scoopjoy.batch_pipeline.step_calculate_royalties",
"critical": True,
"depends_on": ["Close POS Shifts"],
},
{
"name": "Generate Invoices",
"method": "scoopjoy.scoopjoy.batch_pipeline.step_generate_invoices",
"critical": False, # Non-critical: alert but continue
"depends_on": ["Calculate Royalties"],
},
{
"name": "Send Reports",
"method": "scoopjoy.scoopjoy.batch_pipeline.step_send_reports",
"critical": False,
"depends_on": [], # Independent -- can run even if invoices fail
},
{
"name": "Sync to HQ",
"method": "scoopjoy.scoopjoy.batch_pipeline.step_sync_to_hq",
"critical": False,
"depends_on": [], # Independent
},
]
def run_nightly_pipeline():
"""Entry point for the nightly batch pipeline.
Called by scheduler_events -> cron (after midnight).
"""
batch = _create_batch_run()
frappe.enqueue(
"scoopjoy.scoopjoy.batch_pipeline._execute_pipeline",
queue="long",
timeout=3600,
batch_name=batch.name,
job_id=f"nightly_pipeline_{batch.name}",
deduplicate=True,
)
return batch.name
@frappe.whitelist()
def trigger_pipeline_manually():
"""Manual trigger for the nightly pipeline via API or button."""
batch_name = run_nightly_pipeline()
return {"message": f"Pipeline started: {batch_name}", "batch_run": batch_name}
@frappe.whitelist()
def retry_failed_step(batch_name, step_name):
"""Retry a specific failed step."""
batch = frappe.get_doc("Batch Run", batch_name)
step_row = None
for row in batch.steps:
if row.step_name == step_name and row.status == "Failed":
step_row = row
break
if not step_row:
frappe.throw(f"No failed step '{step_name}' found in batch {batch_name}")
frappe.enqueue(
"scoopjoy.scoopjoy.batch_pipeline._execute_single_step",
queue="long",
timeout=600,
batch_name=batch_name,
step_name=step_name,
)
return {"message": f"Retry enqueued for step: {step_name}"}

_create_batch_run materializes the step list into child rows so the document mirrors PIPELINE_STEPS, then commits — the row IDs must exist before the background worker picks the job up.

scoopjoy/scoopjoy/batch_pipeline.py (continued)
def _create_batch_run():
"""Create a new Batch Run document with all pipeline steps."""
batch = frappe.get_doc({
"doctype": "Batch Run",
"run_date": frappe.utils.today(),
"status": "Queued",
"triggered_by": frappe.session.user,
"steps": [],
})
for i, step_def in enumerate(PIPELINE_STEPS, start=1):
batch.append("steps", {
"step_name": step_def["name"],
"step_order": i,
"status": "Pending",
"is_critical": step_def.get("critical", False),
})
batch.insert(ignore_permissions=True)
frappe.db.commit()
return batch

_execute_pipeline is the heart of the orchestrator. It walks the steps in order, tracks each outcome in completed_steps, skips a step whose dependencies did not succeed, and — when a critical step fails — sets pipeline_aborted so every remaining step is marked Skipped.

scoopjoy/scoopjoy/batch_pipeline.py (continued)
def _execute_pipeline(batch_name):
"""Execute all pipeline steps in order with dependency resolution."""
batch = frappe.get_doc("Batch Run", batch_name)
batch.status = "Running"
batch.started_at = now_datetime()
batch.save(ignore_permissions=True)
frappe.db.commit()
completed_steps = {} # step_name -> "Success" | "Failed" | "Skipped"
has_errors = False
pipeline_aborted = False
for step_def in PIPELINE_STEPS:
step_name = step_def["name"]
step_row = _get_step_row(batch, step_name)
if pipeline_aborted:
step_row.status = "Skipped"
step_row.error_message = "Skipped due to critical step failure"
batch.save(ignore_permissions=True)
frappe.db.commit()
completed_steps[step_name] = "Skipped"
continue
# Check dependencies
deps_met = True
for dep in step_def.get("depends_on", []):
if completed_steps.get(dep) != "Success":
deps_met = False
break
if not deps_met:
step_row.status = "Skipped"
step_row.error_message = f"Dependency not met: {', '.join(step_def['depends_on'])}"
batch.save(ignore_permissions=True)
frappe.db.commit()
completed_steps[step_name] = "Skipped"
continue
# Execute the step
success = _run_step(batch, step_row, step_def)
completed_steps[step_name] = "Success" if success else "Failed"
if not success:
has_errors = True
if step_def.get("critical"):
pipeline_aborted = True
_append_log(batch, f"CRITICAL FAILURE in '{step_name}' -- aborting pipeline")
_send_pipeline_alert(batch, step_name, critical=True)
else:
_append_log(batch, f"Non-critical failure in '{step_name}' -- continuing")
_send_pipeline_alert(batch, step_name, critical=False)
# Finalize
batch.reload()
batch.completed_at = now_datetime()
if pipeline_aborted:
batch.status = "Failed"
elif has_errors:
batch.status = "Completed with Errors"
else:
batch.status = "Completed"
batch.save(ignore_permissions=True)
frappe.db.commit()
# Push real-time update to any open dashboard
frappe.publish_realtime(
"scoopjoy_batch_complete",
{"batch": batch.name, "status": batch.status},
after_commit=True,
)

_run_step runs one step’s method via frappe.get_attr, wrapping it in try/except. On failure it rolls back, records the traceback on the row, bumps retry_count, and logs the error — but it always commits the status update so the dashboard reflects reality even mid-failure.

scoopjoy/scoopjoy/batch_pipeline.py (continued)
def _run_step(batch, step_row, step_def):
"""Execute a single pipeline step. Returns True on success."""
step_row.status = "Running"
step_row.started_at = now_datetime()
batch.save(ignore_permissions=True)
frappe.db.commit()
try:
method = frappe.get_attr(step_def["method"])
method(batch_name=batch.name, run_date=batch.run_date)
step_row.status = "Success"
step_row.completed_at = now_datetime()
_append_log(batch, f"Step '{step_def['name']}' completed successfully")
batch.save(ignore_permissions=True)
frappe.db.commit()
return True
except Exception as e:
frappe.db.rollback()
step_row.status = "Failed"
step_row.completed_at = now_datetime()
step_row.error_message = f"{str(e)}\n\n{traceback.format_exc()}"
step_row.retry_count = (step_row.retry_count or 0) + 1
_append_log(batch, f"Step '{step_def['name']}' FAILED: {str(e)}")
batch.save(ignore_permissions=True)
frappe.db.commit()
frappe.log_error(
title=f"Batch Pipeline Step Failed: {step_def['name']}",
reference_doctype="Batch Run",
reference_name=batch.name,
)
return False
def _execute_single_step(batch_name, step_name):
"""Execute a single step (used for retries)."""
batch = frappe.get_doc("Batch Run", batch_name)
step_row = _get_step_row(batch, step_name)
step_def = next(s for s in PIPELINE_STEPS if s["name"] == step_name)
_run_step(batch, step_row, step_def)
# Recompute overall batch status
batch.reload()
statuses = [row.status for row in batch.steps]
if all(s == "Success" for s in statuses):
batch.status = "Completed"
elif "Failed" in statuses:
batch.status = "Completed with Errors"
batch.save(ignore_permissions=True)
frappe.db.commit()
def _get_step_row(batch, step_name):
for row in batch.steps:
if row.step_name == step_name:
return row
frappe.throw(f"Step '{step_name}' not found in batch {batch.name}")
def _append_log(batch, message):
timestamp = now_datetime().strftime("%Y-%m-%d %H:%M:%S")
current_log = batch.execution_log or ""
batch.execution_log = f"{current_log}[{timestamp}] {message}\n"
def _send_pipeline_alert(batch, step_name, critical=False):
"""Email System Managers when a step fails."""
subject = (
f"[CRITICAL] Batch pipeline failed at: {step_name}"
if critical
else f"[WARNING] Batch step failed: {step_name}"
)
frappe.sendmail(
recipients=frappe.get_all(
"Has Role",
filters={"role": "System Manager", "parenttype": "User"},
pluck="parent",
),
subject=subject,
message=(
f"<p>Batch Run: <strong>{batch.name}</strong></p>"
f"<p>Failed Step: <strong>{step_name}</strong></p>"
f"<p>{'Pipeline aborted.' if critical else 'Pipeline continuing with remaining steps.'}</p>"
f"<p><a href='{frappe.utils.get_url_to_form('Batch Run', batch.name)}'>View Batch Run</a></p>"
),
now=True,
)

The step implementations are ordinary Frappe document logic — closing POS shifts, posting royalty journal entries, annotating invoices, emailing managers, and syncing totals to HQ. Each ends with its own frappe.db.commit() (the part the orchestrator relies on). Below is the critical-path pair; the remaining steps follow the same shape.

scoopjoy/scoopjoy/batch_pipeline.py (continued)
def step_close_pos(batch_name, run_date):
"""Auto-close any open POS shifts from the run date."""
open_entries = frappe.get_all(
"POS Opening Entry",
filters={"posting_date": run_date, "status": "Open"},
pluck="name",
)
for entry_name in open_entries:
entry = frappe.get_doc("POS Opening Entry", entry_name)
closing = frappe.get_doc({
"doctype": "POS Closing Entry",
"pos_opening_entry": entry.name,
"posting_date": run_date,
"user": entry.user,
"pos_profile": entry.pos_profile,
"company": entry.company,
})
closing.get_payment_reconciliation_details()
closing.insert(ignore_permissions=True)
closing.submit()
frappe.db.commit()
def step_calculate_royalties(batch_name, run_date):
"""Calculate franchise royalties based on daily sales."""
franchises = frappe.get_all(
"Customer",
filters={"customer_group": "Franchise", "disabled": 0},
fields=["name", "customer_name"],
)
for franchise in franchises:
total_sales = frappe.db.sql(
"""
SELECT COALESCE(SUM(grand_total), 0) as total
FROM `tabSales Invoice`
WHERE customer = %s AND posting_date = %s AND docstatus = 1
""",
(franchise.name, run_date),
)[0][0]
if total_sales > 0:
royalty_rate = frappe.db.get_value(
"Customer", franchise.name, "custom_royalty_percentage"
) or 5 # Default 5%
royalty_amount = total_sales * (royalty_rate / 100)
frappe.get_doc({
"doctype": "Journal Entry",
"posting_date": run_date,
"voucher_type": "Journal Entry",
"company": frappe.defaults.get_defaults().company,
"user_remark": f"Royalty for {franchise.customer_name} on {run_date}",
"accounts": [
{
"account": "Royalty Income - SJ",
"credit_in_account_currency": royalty_amount,
"party_type": "Customer",
"party": franchise.name,
},
{
"account": "Debtors - SJ",
"debit_in_account_currency": royalty_amount,
"party_type": "Customer",
"party": franchise.name,
},
],
}).insert(ignore_permissions=True).submit()
frappe.db.commit()
# step_generate_invoices, step_send_reports, step_sync_to_hq follow the same
# pattern: do the work, then frappe.db.commit(). (rest unchanged)

A single scheduler_events entry wires the pipeline to a cron expression. Using the cron key lets you pick an exact time — here 0 1 * * * runs the pipeline at 1:00 AM IST every night. (Frappe also offers named buckets like daily and hourly, but cron gives precise control.)

scoopjoy/hooks.py
scheduler_events = {
"cron": {
# Run the nightly pipeline at 1:00 AM IST
"0 1 * * *": [
"scoopjoy.scoopjoy.batch_pipeline.run_nightly_pipeline",
],
},
}

The form script adds a per-failed-step retry button, a primary “Run Pipeline Now” action on new documents, and a 5-second auto-refresh while a run is in progress — turning the Batch Run form into a live dashboard.

scoopjoy/scoopjoy/doctype/batch_run/batch_run.js
frappe.ui.form.on("Batch Run", {
refresh: function (frm) {
// Add retry buttons for failed steps
if (frm.doc.steps) {
frm.doc.steps.forEach((step) => {
if (step.status === "Failed") {
frm.add_custom_button(
`Retry: ${step.step_name}`,
() => {
frappe.call({
method: "scoopjoy.scoopjoy.batch_pipeline.retry_failed_step",
args: {
batch_name: frm.doc.name,
step_name: step.step_name,
},
callback: (r) => {
frappe.show_alert({ message: r.message.message, indicator: "blue" });
setTimeout(() => frm.reload_doc(), 2000);
},
});
},
"Retry Failed Steps"
);
}
});
}
// Manual trigger on a brand-new document
if (frm.is_new()) {
frm.page.set_primary_action("Run Pipeline Now", () => {
frappe.call({
method: "scoopjoy.scoopjoy.batch_pipeline.trigger_pipeline_manually",
callback: (r) => {
frappe.set_route("Form", "Batch Run", r.message.batch_run);
},
});
});
return;
}
// Auto-refresh while running
if (frm.doc.status === "Running") {
frm._refresh_interval = setInterval(() => {
frm.reload_doc();
if (frm.doc.status !== "Running") {
clearInterval(frm._refresh_interval);
}
}, 5000);
}
},
onclose: function (frm) {
if (frm._refresh_interval) {
clearInterval(frm._refresh_interval);
}
},
});