File Upload & Processing Pipeline
Problem: ScoopJoy outlets submit daily sales as CSV files. You need to accept each upload, validate it, turn the rows into Sales Invoices in bulk, and report progress and errors back to the uploader without timing out the request.
Solution: Split the work into four pieces — a thin upload endpoint that accepts
the file and enqueues a job, a background worker that parses and posts invoices, a
real-time progress channel over WebSocket, and a status endpoint the client can
poll. The endpoint returns a job_id immediately; the heavy lifting happens on a
long queue worker.
flowchart LR C["Outlet client"] -->|"POST CSV + outlet"| EP["Upload endpoint"] EP -->|"save"| F["File DocType"] EP -->|"enqueue"| Q["long queue"] EP -->|"job_id"| C Q --> W["Background worker"] W -->|"create + submit"| SI["Sales Invoices"] W -->|"publish_realtime"| WS["Socket.IO"] WS -->|"progress / complete"| C C -->|"poll status"| ST["get_job_status"] ST --> R["Redis cache"] W --> R
This recipe lives in one module plus a client script:
Directoryice_cream_shop/
Directoryapi/
Directoryv1/
- bulk_upload.py upload endpoint + background processor
Directorypublic/
Directoryjs/
- bulk_upload_progress.js realtime progress listener
Step 1: Upload endpoint
Section titled “Step 1: Upload endpoint”The endpoint does only fast work — argument checks, a header sniff, persisting the
file, and enqueuing the job. It rate-limits to 10 uploads per 5 minutes, validates
that the outlet exists, confirms the file is a .csv, and reads it with
utf-8-sig so a leading BOM from Excel exports is stripped. It then stashes a job
record in Redis (keyed bulk_upload:{job_id}) and returns a status_url for
polling.
import frappefrom frappe import _from ice_cream_shop.utils.api_response import success, bad_request, handle_api_errorsfrom ice_cream_shop.utils.rate_limiter import rate_limit
@frappe.whitelist(methods=["POST"])@handle_api_errors@rate_limit(max_requests=10, window_seconds=300)def upload_daily_sales(): """ Upload a CSV of daily sales for batch processing.
POST /api/method/ice_cream_shop.api.v1.bulk_upload.upload_daily_sales Header: Authorization: token <api_key>:<api_secret> Content-Type: multipart/form-data Body: file=<daily_sales.csv>, outlet=SJ-MUM-001
CSV format: customer,item_code,qty,rate,payment_mode,date Walk-in,SCOOP-VAN-001,2,150.00,Cash,2025-03-15 """ import os
outlet = frappe.form_dict.get("outlet") if not outlet: return bad_request("'outlet' parameter is required.")
if not frappe.db.exists("ScoopJoy Outlet", outlet): return bad_request(f"Outlet '{outlet}' not found.")
files = frappe.request.files if not files or "file" not in files: return bad_request("No CSV file uploaded. Send with key 'file'.")
uploaded_file = files["file"] ext = os.path.splitext(uploaded_file.filename)[1].lower() if ext != ".csv": return bad_request(f"Expected .csv file, got '{ext}'.")
content = uploaded_file.read().decode("utf-8-sig") # handle BOM
# Quick validation: check header row lines = content.strip().split("\n") if len(lines) < 2: return bad_request("CSV must have a header row and at least one data row.")
header = [col.strip().lower() for col in lines[0].split(",")] required_cols = {"customer", "item_code", "qty", "rate", "payment_mode", "date"} missing = required_cols - set(header) if missing: return bad_request(f"Missing required columns: {', '.join(missing)}")
row_count = len(lines) - 1
# Save file for background processing file_doc = frappe.get_doc({ "doctype": "File", "file_name": f"daily_sales_{outlet}_{frappe.utils.nowdate()}.csv", "content": content.encode("utf-8"), "is_private": 1, }) file_doc.save(ignore_permissions=False)
# Create a processing job tracker in Redis job_id = frappe.generate_hash(length=12) frappe.cache.set_value(f"bulk_upload:{job_id}", { "status": "queued", "file": file_doc.name, "outlet": outlet, "total_rows": row_count, "processed": 0, "succeeded": 0, "failed": 0, "errors": [], "invoices_created": [], }, expires_in_sec=86400)
# Enqueue background processing frappe.enqueue( "ice_cream_shop.api.v1.bulk_upload.process_sales_csv", queue="long", timeout=600, job_id=job_id, file_name=file_doc.name, outlet=outlet, user=frappe.session.user, )
return success( data={ "job_id": job_id, "file_name": file_doc.file_name, "total_rows": row_count, "status_url": f"/api/method/ice_cream_shop.api.v1.bulk_upload.get_job_status?job_id={job_id}", }, message=f"Upload accepted. Processing {row_count} rows in background." )
@frappe.whitelist(methods=["GET"])@handle_api_errorsdef get_job_status(job_id=None): """Check the status of a bulk upload job.""" if not job_id: return bad_request("'job_id' is required.")
status = frappe.cache.get_value(f"bulk_upload:{job_id}") if not status: from ice_cream_shop.utils.api_response import not_found return not_found(f"Job '{job_id}' not found or expired.")
return success(data=status)The outlet is sent as a multipart form field, so it arrives in frappe.form_dict;
the file itself comes in on frappe.request.files under the key file. Because the
job state lives in Redis with a 24-hour TTL, the status endpoint is a cheap cache
read — no extra DocType needed just to track progress.
Step 2: Background CSV processor
Section titled “Step 2: Background CSV processor”This is the worker frappe.enqueue calls. It runs as the uploading user
(frappe.set_user), parses the file with csv.DictReader, validates each row, then
groups rows by (customer, date, payment_mode) so each group becomes a single
coherent Sales Invoice. The two highlighted lines are the heart of the durability
strategy — commit after every successful invoice, roll back a failing one
individually.
import csvimport io
def process_sales_csv(job_id, file_name, outlet, user): """ Background job: parse CSV and create Sales Invoices. Reports progress via frappe.publish_realtime. """ frappe.set_user(user)
job_data = frappe.cache.get_value(f"bulk_upload:{job_id}") job_data["status"] = "processing" frappe.cache.set_value(f"bulk_upload:{job_id}", job_data, expires_in_sec=86400)
# Read the file back out of the File DocType file_doc = frappe.get_doc("File", file_name) content = file_doc.get_content().decode("utf-8-sig")
reader = csv.DictReader(io.StringIO(content)) rows = list(reader) total = len(rows)
# Outlet details outlet_doc = frappe.get_doc("ScoopJoy Outlet", outlet) warehouse = outlet_doc.default_warehouse company = frappe.db.get_value("Warehouse", warehouse, "company")
# Group rows by (customer, date, payment_mode) -> one invoice per group from collections import defaultdict from frappe.utils import getdate groups = defaultdict(list) row_errors = []
for idx, row in enumerate(rows, start=2): # start=2 because row 1 is header try: row = {k.strip().lower(): v.strip() for k, v in row.items()}
for field in ["customer", "item_code", "qty", "rate", "payment_mode", "date"]: if not row.get(field): raise ValueError(f"Missing '{field}'")
qty = float(row["qty"]) rate = float(row["rate"]) if qty <= 0: raise ValueError(f"qty must be positive, got {qty}") if rate < 0: raise ValueError(f"rate cannot be negative, got {rate}")
if not frappe.db.exists("Item", row["item_code"]): raise ValueError(f"Item '{row['item_code']}' not found")
posting_date = getdate(row["date"])
group_key = (row["customer"], str(posting_date), row["payment_mode"]) groups[group_key].append({ "item_code": row["item_code"], "qty": qty, "rate": rate, "row_number": idx, })
except Exception as e: row_errors.append({"row": idx, "error": str(e), "data": dict(row)})
# Publish progress every 100 rows if idx % 100 == 0: frappe.publish_realtime( "bulk_upload_progress", {"job_id": job_id, "processed": idx, "total": total}, user=user, )
# Create invoices from groups invoices_created = [] invoice_errors = []
for (customer, posting_date, payment_mode), items in groups.items(): try: si = frappe.new_doc("Sales Invoice") si.customer = customer si.posting_date = posting_date si.company = company si.selling_price_list = outlet_doc.custom_selling_price_list or "Standard Selling" si.set_warehouse = warehouse si.custom_outlet = outlet si.custom_order_source = "CSV Upload" si.is_pos = 1 si.update_stock = 1
for item in items: si.append("items", { "item_code": item["item_code"], "qty": item["qty"], "rate": item["rate"], "warehouse": warehouse, })
si.append("payments", {"mode_of_payment": payment_mode, "amount": 0})
si.flags.ignore_permissions = True si.insert() si.calculate_taxes_and_totals() si.payments[0].amount = si.grand_total si.paid_amount = si.grand_total si.save() si.submit()
invoices_created.append({ "invoice": si.name, "customer": customer, "date": posting_date, "total": si.grand_total, "items": len(items), })
frappe.db.commit() # commit per invoice to keep transactions short
except Exception as e: frappe.db.rollback() # one bad group doesn't sink the batch affected_rows = [item["row_number"] for item in items] invoice_errors.append({ "customer": customer, "date": posting_date, "error": str(e), "affected_rows": affected_rows, })
# Combine all errors all_errors = row_errors + [ {"row": err["affected_rows"], "error": err["error"]} for err in invoice_errors ]
# Update final status final_status = { "status": "completed", "total_rows": total, "processed": total, "succeeded": len(invoices_created), "failed": len(all_errors), "invoices_created": invoices_created, "errors": all_errors[:100], # cap at 100 errors } frappe.cache.set_value(f"bulk_upload:{job_id}", final_status, expires_in_sec=86400)
frappe.publish_realtime( "bulk_upload_complete", {"job_id": job_id, **final_status}, user=user, )
# Clean up the uploaded file try: file_doc.delete(ignore_permissions=True) except Exception: pass
return final_statusValidation errors are collected per row rather than raised, so one malformed line
never aborts the run. Progress is pushed every 100 rows on the
bulk_upload_progress channel, and a final bulk_upload_complete event carries the
counts plus the first 100 errors.
Step 3: curl examples
Section titled “Step 3: curl examples”Upload with a multipart form, then poll the returned status_url. The response is
wrapped in Frappe’s standard message envelope.
# --- Upload CSV ---curl -X POST 'https://erp.scoopjoy.com/api/method/ice_cream_shop.api.v1.bulk_upload.upload_daily_sales' \ -H 'Authorization: token abc1234xyz:secret9876' \ -F 'outlet=SJ-MUM-001' \ -F 'file=@daily_sales_march15.csv'
# Response:# {# "message": {# "status": "success",# "data": {# "job_id": "a1b2c3d4e5f6",# "file_name": "daily_sales_SJ-MUM-001_2025-03-15.csv",# "total_rows": 847,# "status_url": "/api/method/ice_cream_shop.api.v1.bulk_upload.get_job_status?job_id=a1b2c3d4e5f6"# },# "message": "Upload accepted. Processing 847 rows in background."# }# }
# --- Check Job Status ---curl -X GET 'https://erp.scoopjoy.com/api/method/ice_cream_shop.api.v1.bulk_upload.get_job_status?job_id=a1b2c3d4e5f6' \ -H 'Authorization: token abc1234xyz:secret9876'
# Response (in progress):# {# "message": {# "status": "success",# "data": {# "status": "processing",# "total_rows": 847,# "processed": 400,# "succeeded": 12,# "failed": 2,# "errors": [...]# }# }# }A sample CSV looks like this:
customer,item_code,qty,rate,payment_mode,dateWalk-in Customer,SCOOP-VAN-001,2,150.00,Cash,2025-03-15Walk-in Customer,SCOOP-CHOC-001,1,160.00,Cash,2025-03-15Rajesh Sharma,SUNDAE-001,1,250.00,UPI,2025-03-15Walk-in Customer,CONE-WAF-001,3,80.00,Cash,2025-03-15Step 4: Client-side progress listener
Section titled “Step 4: Client-side progress listener”In Express you might open an SSE stream or poll an endpoint; in Frappe the realtime
channel is already wired through Socket.IO, so the client just subscribes to the two
events the worker publishes. Register this script via hooks.py with
app_include_js.
// Include via hooks.py: app_include_js = ["ice_cream_shop/public/js/bulk_upload_progress.js"]
frappe.realtime.on("bulk_upload_progress", (data) => { const pct = Math.round((data.processed / data.total) * 100); frappe.show_progress( `Processing CSV Upload (${data.job_id})`, data.processed, data.total, `${pct}% complete (${data.processed}/${data.total} rows)` );});
frappe.realtime.on("bulk_upload_complete", (data) => { frappe.hide_progress(); if (data.failed > 0) { frappe.msgprint({ title: __("CSV Upload Complete"), message: __(`Created ${data.succeeded} invoices. ${data.failed} errors found.`), indicator: "orange", }); } else { frappe.msgprint({ title: __("CSV Upload Complete"), message: __(`Successfully created ${data.succeeded} invoices.`), indicator: "green", }); }});