Skip to content

File Upload & Processing Pipeline

Problem: ScoopJoy outlets submit daily sales as CSV files. You need to accept each upload, validate it, turn the rows into Sales Invoices in bulk, and report progress and errors back to the uploader without timing out the request.

Solution: Split the work into four pieces — a thin upload endpoint that accepts the file and enqueues a job, a background worker that parses and posts invoices, a real-time progress channel over WebSocket, and a status endpoint the client can poll. The endpoint returns a job_id immediately; the heavy lifting happens on a long queue worker.

Bulk upload pipeline
Rendering diagram…

This recipe lives in one module plus a client script:

  • Directoryice_cream_shop/
    • Directoryapi/
      • Directoryv1/
        • bulk_upload.py upload endpoint + background processor
    • Directorypublic/
      • Directoryjs/
        • bulk_upload_progress.js realtime progress listener

The endpoint does only fast work — argument checks, a header sniff, persisting the file, and enqueuing the job. It rate-limits to 10 uploads per 5 minutes, validates that the outlet exists, confirms the file is a .csv, and reads it with utf-8-sig so a leading BOM from Excel exports is stripped. It then stashes a job record in Redis (keyed bulk_upload:{job_id}) and returns a status_url for polling.

ice_cream_shop/api/v1/bulk_upload.py
import frappe
from frappe import _
from ice_cream_shop.utils.api_response import success, bad_request, handle_api_errors
from ice_cream_shop.utils.rate_limiter import rate_limit
@frappe.whitelist(methods=["POST"])
@handle_api_errors
@rate_limit(max_requests=10, window_seconds=300)
def upload_daily_sales():
"""
Upload a CSV of daily sales for batch processing.
POST /api/method/ice_cream_shop.api.v1.bulk_upload.upload_daily_sales
Header: Authorization: token <api_key>:<api_secret>
Content-Type: multipart/form-data
Body: file=<daily_sales.csv>, outlet=SJ-MUM-001
CSV format:
customer,item_code,qty,rate,payment_mode,date
Walk-in,SCOOP-VAN-001,2,150.00,Cash,2025-03-15
"""
import os
outlet = frappe.form_dict.get("outlet")
if not outlet:
return bad_request("'outlet' parameter is required.")
if not frappe.db.exists("ScoopJoy Outlet", outlet):
return bad_request(f"Outlet '{outlet}' not found.")
files = frappe.request.files
if not files or "file" not in files:
return bad_request("No CSV file uploaded. Send with key 'file'.")
uploaded_file = files["file"]
ext = os.path.splitext(uploaded_file.filename)[1].lower()
if ext != ".csv":
return bad_request(f"Expected .csv file, got '{ext}'.")
content = uploaded_file.read().decode("utf-8-sig") # handle BOM
# Quick validation: check header row
lines = content.strip().split("\n")
if len(lines) < 2:
return bad_request("CSV must have a header row and at least one data row.")
header = [col.strip().lower() for col in lines[0].split(",")]
required_cols = {"customer", "item_code", "qty", "rate", "payment_mode", "date"}
missing = required_cols - set(header)
if missing:
return bad_request(f"Missing required columns: {', '.join(missing)}")
row_count = len(lines) - 1
# Save file for background processing
file_doc = frappe.get_doc({
"doctype": "File",
"file_name": f"daily_sales_{outlet}_{frappe.utils.nowdate()}.csv",
"content": content.encode("utf-8"),
"is_private": 1,
})
file_doc.save(ignore_permissions=False)
# Create a processing job tracker in Redis
job_id = frappe.generate_hash(length=12)
frappe.cache.set_value(f"bulk_upload:{job_id}", {
"status": "queued",
"file": file_doc.name,
"outlet": outlet,
"total_rows": row_count,
"processed": 0,
"succeeded": 0,
"failed": 0,
"errors": [],
"invoices_created": [],
}, expires_in_sec=86400)
# Enqueue background processing
frappe.enqueue(
"ice_cream_shop.api.v1.bulk_upload.process_sales_csv",
queue="long",
timeout=600,
job_id=job_id,
file_name=file_doc.name,
outlet=outlet,
user=frappe.session.user,
)
return success(
data={
"job_id": job_id,
"file_name": file_doc.file_name,
"total_rows": row_count,
"status_url": f"/api/method/ice_cream_shop.api.v1.bulk_upload.get_job_status?job_id={job_id}",
},
message=f"Upload accepted. Processing {row_count} rows in background."
)
@frappe.whitelist(methods=["GET"])
@handle_api_errors
def get_job_status(job_id=None):
"""Check the status of a bulk upload job."""
if not job_id:
return bad_request("'job_id' is required.")
status = frappe.cache.get_value(f"bulk_upload:{job_id}")
if not status:
from ice_cream_shop.utils.api_response import not_found
return not_found(f"Job '{job_id}' not found or expired.")
return success(data=status)

The outlet is sent as a multipart form field, so it arrives in frappe.form_dict; the file itself comes in on frappe.request.files under the key file. Because the job state lives in Redis with a 24-hour TTL, the status endpoint is a cheap cache read — no extra DocType needed just to track progress.

This is the worker frappe.enqueue calls. It runs as the uploading user (frappe.set_user), parses the file with csv.DictReader, validates each row, then groups rows by (customer, date, payment_mode) so each group becomes a single coherent Sales Invoice. The two highlighted lines are the heart of the durability strategy — commit after every successful invoice, roll back a failing one individually.

ice_cream_shop/api/v1/bulk_upload.py (continued)
import csv
import io
def process_sales_csv(job_id, file_name, outlet, user):
"""
Background job: parse CSV and create Sales Invoices.
Reports progress via frappe.publish_realtime.
"""
frappe.set_user(user)
job_data = frappe.cache.get_value(f"bulk_upload:{job_id}")
job_data["status"] = "processing"
frappe.cache.set_value(f"bulk_upload:{job_id}", job_data, expires_in_sec=86400)
# Read the file back out of the File DocType
file_doc = frappe.get_doc("File", file_name)
content = file_doc.get_content().decode("utf-8-sig")
reader = csv.DictReader(io.StringIO(content))
rows = list(reader)
total = len(rows)
# Outlet details
outlet_doc = frappe.get_doc("ScoopJoy Outlet", outlet)
warehouse = outlet_doc.default_warehouse
company = frappe.db.get_value("Warehouse", warehouse, "company")
# Group rows by (customer, date, payment_mode) -> one invoice per group
from collections import defaultdict
from frappe.utils import getdate
groups = defaultdict(list)
row_errors = []
for idx, row in enumerate(rows, start=2): # start=2 because row 1 is header
try:
row = {k.strip().lower(): v.strip() for k, v in row.items()}
for field in ["customer", "item_code", "qty", "rate", "payment_mode", "date"]:
if not row.get(field):
raise ValueError(f"Missing '{field}'")
qty = float(row["qty"])
rate = float(row["rate"])
if qty <= 0:
raise ValueError(f"qty must be positive, got {qty}")
if rate < 0:
raise ValueError(f"rate cannot be negative, got {rate}")
if not frappe.db.exists("Item", row["item_code"]):
raise ValueError(f"Item '{row['item_code']}' not found")
posting_date = getdate(row["date"])
group_key = (row["customer"], str(posting_date), row["payment_mode"])
groups[group_key].append({
"item_code": row["item_code"],
"qty": qty,
"rate": rate,
"row_number": idx,
})
except Exception as e:
row_errors.append({"row": idx, "error": str(e), "data": dict(row)})
# Publish progress every 100 rows
if idx % 100 == 0:
frappe.publish_realtime(
"bulk_upload_progress",
{"job_id": job_id, "processed": idx, "total": total},
user=user,
)
# Create invoices from groups
invoices_created = []
invoice_errors = []
for (customer, posting_date, payment_mode), items in groups.items():
try:
si = frappe.new_doc("Sales Invoice")
si.customer = customer
si.posting_date = posting_date
si.company = company
si.selling_price_list = outlet_doc.custom_selling_price_list or "Standard Selling"
si.set_warehouse = warehouse
si.custom_outlet = outlet
si.custom_order_source = "CSV Upload"
si.is_pos = 1
si.update_stock = 1
for item in items:
si.append("items", {
"item_code": item["item_code"],
"qty": item["qty"],
"rate": item["rate"],
"warehouse": warehouse,
})
si.append("payments", {"mode_of_payment": payment_mode, "amount": 0})
si.flags.ignore_permissions = True
si.insert()
si.calculate_taxes_and_totals()
si.payments[0].amount = si.grand_total
si.paid_amount = si.grand_total
si.save()
si.submit()
invoices_created.append({
"invoice": si.name,
"customer": customer,
"date": posting_date,
"total": si.grand_total,
"items": len(items),
})
frappe.db.commit() # commit per invoice to keep transactions short
except Exception as e:
frappe.db.rollback() # one bad group doesn't sink the batch
affected_rows = [item["row_number"] for item in items]
invoice_errors.append({
"customer": customer,
"date": posting_date,
"error": str(e),
"affected_rows": affected_rows,
})
# Combine all errors
all_errors = row_errors + [
{"row": err["affected_rows"], "error": err["error"]}
for err in invoice_errors
]
# Update final status
final_status = {
"status": "completed",
"total_rows": total,
"processed": total,
"succeeded": len(invoices_created),
"failed": len(all_errors),
"invoices_created": invoices_created,
"errors": all_errors[:100], # cap at 100 errors
}
frappe.cache.set_value(f"bulk_upload:{job_id}", final_status, expires_in_sec=86400)
frappe.publish_realtime(
"bulk_upload_complete",
{"job_id": job_id, **final_status},
user=user,
)
# Clean up the uploaded file
try:
file_doc.delete(ignore_permissions=True)
except Exception:
pass
return final_status

Validation errors are collected per row rather than raised, so one malformed line never aborts the run. Progress is pushed every 100 rows on the bulk_upload_progress channel, and a final bulk_upload_complete event carries the counts plus the first 100 errors.

Upload with a multipart form, then poll the returned status_url. The response is wrapped in Frappe’s standard message envelope.

Terminal window
# --- Upload CSV ---
curl -X POST 'https://erp.scoopjoy.com/api/method/ice_cream_shop.api.v1.bulk_upload.upload_daily_sales' \
-H 'Authorization: token abc1234xyz:secret9876' \
-F 'outlet=SJ-MUM-001' \
-F 'file=@daily_sales_march15.csv'
# Response:
# {
# "message": {
# "status": "success",
# "data": {
# "job_id": "a1b2c3d4e5f6",
# "file_name": "daily_sales_SJ-MUM-001_2025-03-15.csv",
# "total_rows": 847,
# "status_url": "/api/method/ice_cream_shop.api.v1.bulk_upload.get_job_status?job_id=a1b2c3d4e5f6"
# },
# "message": "Upload accepted. Processing 847 rows in background."
# }
# }
# --- Check Job Status ---
curl -X GET 'https://erp.scoopjoy.com/api/method/ice_cream_shop.api.v1.bulk_upload.get_job_status?job_id=a1b2c3d4e5f6' \
-H 'Authorization: token abc1234xyz:secret9876'
# Response (in progress):
# {
# "message": {
# "status": "success",
# "data": {
# "status": "processing",
# "total_rows": 847,
# "processed": 400,
# "succeeded": 12,
# "failed": 2,
# "errors": [...]
# }
# }
# }

A sample CSV looks like this:

daily_sales_march15.csv
customer,item_code,qty,rate,payment_mode,date
Walk-in Customer,SCOOP-VAN-001,2,150.00,Cash,2025-03-15
Walk-in Customer,SCOOP-CHOC-001,1,160.00,Cash,2025-03-15
Rajesh Sharma,SUNDAE-001,1,250.00,UPI,2025-03-15
Walk-in Customer,CONE-WAF-001,3,80.00,Cash,2025-03-15

In Express you might open an SSE stream or poll an endpoint; in Frappe the realtime channel is already wired through Socket.IO, so the client just subscribes to the two events the worker publishes. Register this script via hooks.py with app_include_js.

ice_cream_shop/public/js/bulk_upload_progress.js
// Include via hooks.py: app_include_js = ["ice_cream_shop/public/js/bulk_upload_progress.js"]
frappe.realtime.on("bulk_upload_progress", (data) => {
const pct = Math.round((data.processed / data.total) * 100);
frappe.show_progress(
`Processing CSV Upload (${data.job_id})`,
data.processed,
data.total,
`${pct}% complete (${data.processed}/${data.total} rows)`
);
});
frappe.realtime.on("bulk_upload_complete", (data) => {
frappe.hide_progress();
if (data.failed > 0) {
frappe.msgprint({
title: __("CSV Upload Complete"),
message: __(`Created ${data.succeeded} invoices. ${data.failed} errors found.`),
indicator: "orange",
});
} else {
frappe.msgprint({
title: __("CSV Upload Complete"),
message: __(`Successfully created ${data.succeeded} invoices.`),
indicator: "green",
});
}
});