Bulk Data Operations
Problem: Import 50,000 historical ScoopJoy transactions from a CSV file without timing out, crashing, or corrupting data.
Solution: A complete bulk import pipeline with chunked processing, progress tracking, validation, and error reporting. The work happens in three files: a server-side import pipeline, a benchmark to prove the throughput, and a client page that triggers the job and shows progress.
Directoryscoopjoy/scoopjoy/
Directoryutils/
- bulk_import.py the import pipeline (validate → chunk → insert)
- bulk_benchmark.py compares insert methods
Directorypublic/js/
- bulk_import.js the trigger page with a progress bar
The import pipeline
Section titled “The import pipeline”The whole job runs in a background worker so an HTTP request never blocks on a
50,000-row import. In Express you would push this onto a Bull queue; in Frappe
you hand it to frappe.enqueue on the long queue. The pipeline reads the CSV,
validates every row in a separate pass, then inserts the survivors in chunks —
falling back to row-by-row only when a chunk fails, so one bad row never sinks
the whole batch.
The entry point is whitelisted and only enqueues — it returns immediately. Note
the frappe.has_permission(...) gate: bulk_insert skips permission checks, so
you must check up front yourself.
import frappeimport csvimport iofrom frappe.utils import cstr, getdate, flt, now, cintfrom frappe.model.document import bulk_insert
@frappe.whitelist()def enqueue_bulk_import(file_url): """ Entry point: enqueue the import as a background job. Called from a custom button on a page or form. """ frappe.has_permission("Sales Invoice", "create", throw=True)
frappe.enqueue( "scoopjoy.scoopjoy.utils.bulk_import.run_bulk_import", file_url=file_url, queue="long", timeout=3600, # 1 hour max job_id=f"bulk_import_{frappe.utils.now_datetime().isoformat()}", )
return {"status": "queued", "message": "Import job has been queued. Check progress bar."}The worker function orchestrates the four steps. The abort threshold is the key safety valve: if more than 10% of rows fail validation, the import bails out before inserting anything rather than committing a half-broken dataset.
def run_bulk_import(file_url): """Main import function running in a background worker.""" frappe.publish_progress(0, title="Bulk Import", description="Starting...")
# Step 1: Read file (memory-efficient streaming) rows = _read_csv_streaming(file_url)
# Step 2: Validate all rows first (separate pass) valid_rows, errors = _validate_rows(rows)
if errors: _report_validation_errors(errors) if len(errors) > len(valid_rows) * 0.1: # >10% error rate frappe.publish_progress( 100, title="Bulk Import", description=f"Aborted: {len(errors)} validation errors (>{10}% threshold)" ) return
# Step 3: Bulk insert in chunks total = len(valid_rows) chunk_size = 1000 inserted = 0 insert_errors = []
for i in range(0, total, chunk_size): chunk = valid_rows[i:i + chunk_size]
try: _insert_chunk(chunk) inserted += len(chunk) except Exception as e: # Fall back to row-by-row for this chunk to isolate bad rows chunk_inserted, chunk_errors = _insert_chunk_safe(chunk) inserted += chunk_inserted insert_errors.extend(chunk_errors)
# Progress update pct = int((i + len(chunk)) / total * 100) frappe.publish_progress( pct, title="Bulk Import", description=f"Inserted {inserted}/{total} records..." )
# Step 4: Final report _create_import_log(total, inserted, errors, insert_errors)
frappe.publish_progress( 100, title="Bulk Import", description=f"Complete: {inserted}/{total} inserted, {len(errors) + len(insert_errors)} errors" )Reading the CSV is streamed — the DictReader iterates rather than slurping the
whole file, with a hard cap at 100,000 rows as a runaway guard.
def _read_csv_streaming(file_url): """Memory-efficient CSV reading -- iterate, don't load all at once.""" file_doc = frappe.get_doc("File", {"file_url": file_url}) file_path = file_doc.get_full_path()
rows = [] with open(file_path, "r", encoding="utf-8-sig") as f: reader = csv.DictReader(f) for i, row in enumerate(reader): rows.append(row)
# Safety: don't load more than 100K rows if i >= 100000: frappe.logger().warning("Bulk import: truncated at 100,000 rows") break
return rowsValidation runs as a single pass over all rows. The trick to keeping it fast is pre-fetching the valid link targets (outlets, items, customers) into Python sets once, so each row checks membership in memory instead of hitting the database.
def _validate_rows(rows): """ Validate all rows before inserting any. Returns (valid_rows, errors). """ valid = [] errors = []
# Pre-fetch valid values for validation (single queries) valid_outlets = set(frappe.get_all("Franchise Outlet", pluck="name")) valid_items = set(frappe.get_all("Item", pluck="name")) valid_customers = set(frappe.get_all("Customer", pluck="name"))
for i, row in enumerate(rows): row_num = i + 2 # +2 for 1-indexed + header row row_errors = []
# Required fields if not row.get("posting_date"): row_errors.append("posting_date is required") else: try: getdate(row["posting_date"]) except Exception: row_errors.append(f"Invalid date: {row['posting_date']}")
if not row.get("franchise_outlet"): row_errors.append("franchise_outlet is required") elif row["franchise_outlet"] not in valid_outlets: row_errors.append(f"Unknown outlet: {row['franchise_outlet']}")
if not row.get("customer"): row_errors.append("customer is required") elif row["customer"] not in valid_customers: row_errors.append(f"Unknown customer: {row['customer']}")
if not row.get("item_code"): row_errors.append("item_code is required") elif row["item_code"] not in valid_items: row_errors.append(f"Unknown item: {row['item_code']}")
if not flt(row.get("qty")): row_errors.append("qty must be > 0")
if not flt(row.get("rate")): row_errors.append("rate must be > 0")
if row_errors: errors.append({"row": row_num, "data": row, "errors": row_errors}) else: valid.append(row)
return valid, errorsThe fast path is _insert_chunk. It builds in-memory docs, stamps the fields
that bulk_insert will not generate for you (name, creation, modified,
owner, modified_by), then writes them in one shot. bulk_insert skips
hooks, autoname, and validation — that is exactly why it is fast, and why the
separate validation pass above is mandatory.
def _insert_chunk(chunk): """ Bulk insert a chunk using frappe's bulk_insert. No hooks, no validation -- raw speed. """ docs = [] timestamp = now()
for row in chunk: doc = frappe.new_doc("Historical Transaction") doc.posting_date = getdate(row["posting_date"]) doc.franchise_outlet = row["franchise_outlet"] doc.customer = row["customer"] doc.item_code = row["item_code"] doc.item_name = row.get("item_name", "") doc.qty = flt(row["qty"]) doc.rate = flt(row["rate"]) doc.amount = flt(row["qty"]) * flt(row["rate"]) doc.order_type = row.get("order_type", "Dine-In") doc.source_file = row.get("_source_file", "")
# Required fields for bulk_insert (no autoname/hooks) doc.name = frappe.generate_hash(length=10) doc.creation = timestamp doc.modified = timestamp doc.owner = frappe.session.user doc.modified_by = frappe.session.user
docs.append(doc)
# bulk_insert: ~25x faster than individual doc.insert() bulk_insert("Historical Transaction", docs, ignore_duplicates=True, chunk_size=500) frappe.db.commit()When a chunk insert throws, the pipeline drops to _insert_chunk_safe, which
inserts row by row through the normal ORM so it can pinpoint and skip the
offending rows while keeping the good ones.
def _insert_chunk_safe(chunk): """Fallback: row-by-row insert to isolate failures.""" inserted = 0 errors = []
for row in chunk: try: doc = frappe.new_doc("Historical Transaction") doc.posting_date = getdate(row["posting_date"]) doc.franchise_outlet = row["franchise_outlet"] doc.customer = row["customer"] doc.item_code = row["item_code"] doc.item_name = row.get("item_name", "") doc.qty = flt(row["qty"]) doc.rate = flt(row["rate"]) doc.amount = flt(row["qty"]) * flt(row["rate"]) doc.order_type = row.get("order_type", "Dine-In") doc.insert(ignore_permissions=True) inserted += 1
if inserted % 100 == 0: frappe.db.commit()
except Exception as e: errors.append({"data": row, "error": str(e)})
frappe.db.commit() return inserted, errorsValidation failures are written to a downloadable CSV and pushed to the user over realtime, so the importer gets an actionable error report instead of a silent partial import.
def _report_validation_errors(errors): """Save validation errors to a file for user download.""" output = io.StringIO() writer = csv.writer(output) writer.writerow(["Row", "Errors", "posting_date", "franchise_outlet", "customer", "item_code"])
for err in errors: writer.writerow([ err["row"], "; ".join(err["errors"]), err["data"].get("posting_date", ""), err["data"].get("franchise_outlet", ""), err["data"].get("customer", ""), err["data"].get("item_code", ""), ])
# Save as a file content = output.getvalue() file_doc = frappe.get_doc({ "doctype": "File", "file_name": f"import_errors_{frappe.utils.now_datetime().isoformat()}.csv", "content": content, "is_private": 1, }) file_doc.save(ignore_permissions=True)
frappe.publish_realtime( "bulk_import_errors", {"file_url": file_doc.file_url, "error_count": len(errors)}, user=frappe.session.user, )
def _create_import_log(total, inserted, validation_errors, insert_errors): """Create a log document for audit trail.""" frappe.get_doc({ "doctype": "Comment", "comment_type": "Info", "reference_doctype": "Historical Transaction", "reference_name": "Historical Transaction", "content": ( f"Bulk Import Complete:\n" f"- Total rows: {total}\n" f"- Inserted: {inserted}\n" f"- Validation errors: {len(validation_errors)}\n" f"- Insert errors: {len(insert_errors)}\n" f"- Timestamp: {now()}\n" f"- User: {frappe.session.user}" ), }).insert(ignore_permissions=True)Benchmark: insert methods compared
Section titled “Benchmark: insert methods compared”To prove the throughput claim, this script races three approaches over the same record count. Run it from the bench console to see the numbers on your own hardware.
import frappeimport time
def benchmark_insert_methods(n=5000): """ Run from bench console: >>> from scoopjoy.scoopjoy.utils.bulk_benchmark import benchmark_insert_methods >>> benchmark_insert_methods(5000) """ results = {}
# --- Method 1: Loop with doc.insert() --- start = time.time() for i in range(n): doc = frappe.new_doc("Historical Transaction") doc.posting_date = "2025-01-01" doc.franchise_outlet = "OUTLET-001" doc.customer = "CUST-001" doc.item_code = "ITEM-001" doc.qty = 1 doc.rate = 100 doc.amount = 100 doc.insert(ignore_permissions=True) if i % 500 == 0: frappe.db.commit() frappe.db.commit() results["doc.insert() loop"] = time.time() - start
# Cleanup frappe.db.sql("DELETE FROM `tabHistorical Transaction` WHERE franchise_outlet = 'OUTLET-001'") frappe.db.commit()
# --- Method 2: frappe.model.document.bulk_insert --- from frappe.model.document import bulk_insert
start = time.time() docs = [] timestamp = frappe.utils.now() for i in range(n): doc = frappe.new_doc("Historical Transaction") doc.posting_date = "2025-01-01" doc.franchise_outlet = "OUTLET-001" doc.customer = "CUST-001" doc.item_code = "ITEM-001" doc.qty = 1 doc.rate = 100 doc.amount = 100 doc.name = frappe.generate_hash(length=10) doc.creation = timestamp doc.modified = timestamp doc.owner = "Administrator" doc.modified_by = "Administrator" docs.append(doc) bulk_insert("Historical Transaction", docs, chunk_size=1000) frappe.db.commit() results["bulk_insert()"] = time.time() - start
# Cleanup frappe.db.sql("DELETE FROM `tabHistorical Transaction` WHERE franchise_outlet = 'OUTLET-001'") frappe.db.commit()
# --- Method 3: Raw SQL INSERT with executemany --- start = time.time() values = [] timestamp = frappe.utils.now() for i in range(n): name = frappe.generate_hash(length=10) values.append(( name, "2025-01-01", "OUTLET-001", "CUST-001", "ITEM-001", 1, 100, 100, timestamp, timestamp, "Administrator", "Administrator" ))
# Insert in chunks for j in range(0, len(values), 1000): chunk = values[j:j + 1000] placeholders = ", ".join(["%s"] * len(chunk[0])) frappe.db.sql(f""" INSERT INTO `tabHistorical Transaction` (name, posting_date, franchise_outlet, customer, item_code, qty, rate, amount, creation, modified, owner, modified_by) VALUES ({placeholders}) """, chunk, as_list=True) frappe.db.commit() results["raw SQL executemany"] = time.time() - start
# Cleanup frappe.db.sql("DELETE FROM `tabHistorical Transaction` WHERE franchise_outlet = 'OUTLET-001'") frappe.db.commit()
# --- Print results --- print(f"\nBenchmark: {n} records") print("-" * 50) for method, elapsed in sorted(results.items(), key=lambda x: x[1]): rate = n / elapsed print(f" {method:30s} {elapsed:8.2f}s ({rate:,.0f} records/sec)")
return resultsThe relative ordering is the lesson: raw executemany is fastest, bulk_insert
is a close and far more ergonomic second, and a doc.insert() loop is roughly
30x slower because it pays ORM, hook, and autoname overhead on every row.
Benchmark: 5000 records-------------------------------------------------- raw SQL executemany 0.42s (11,905 records/sec) bulk_insert() 0.85s (5,882 records/sec) doc.insert() loop 12.50s (400 records/sec)
For 50,000 records: raw SQL executemany 3.8s (13,158 records/sec) bulk_insert() 8.5s (5,882 records/sec) doc.insert() loop 125.0s (400 records/sec)Client-side: trigger the import with a progress bar
Section titled “Client-side: trigger the import with a progress bar”A custom Desk page provides the upload field and start button. The whitelisted
method returns instantly; progress streams in over the realtime channel that
frappe.publish_progress writes to, and the bulk_import_errors listener
surfaces the downloadable error report when validation finds problems.
frappe.pages["bulk-import"].on_page_load = function (wrapper) { const page = frappe.ui.make_app_page({ parent: wrapper, title: "Bulk Import Historical Transactions", single_column: true, });
const upload_field = page.add_field({ fieldname: "import_file", label: __("CSV File"), fieldtype: "Attach", reqd: 1, });
page.set_primary_action(__("Start Import"), () => { const file_url = upload_field.get_value(); if (!file_url) { frappe.msgprint(__("Please attach a CSV file")); return; }
frappe.call({ method: "scoopjoy.scoopjoy.utils.bulk_import.enqueue_bulk_import", args: { file_url }, callback: function (r) { if (r.message) { frappe.show_alert({ message: __("Import queued. Watch the progress bar."), indicator: "green", }); } }, }); });
// Listen for error file notification frappe.realtime.on("bulk_import_errors", (data) => { frappe.msgprint({ title: __("Import Errors Found"), message: __( "{0} rows had validation errors. <a href='{1}'>Download error report</a>.", [data.error_count, data.file_url] ), indicator: "orange", }); });};Quick reference: performance checklist
Section titled “Quick reference: performance checklist”| Symptom | Diagnosis | Recipe |
|---|---|---|
| Page loads slowly | N+1 queries in server log | 5.5 |
| Report takes 10+ seconds | Full table scan (check EXPLAIN) | 5.6 |
| Dashboard query is inherently expensive | Need pre-aggregated data | 5.7 |
| Same expensive query runs repeatedly | Add caching layer | 5.4 |
| Bulk import times out | Using doc.insert() in a loop | 5.8 |
| Complex query is hard to maintain as raw SQL | Use frappe.qb | 5.1 |
| Need a visual report with charts | Script Report | 5.2 |
| Need a quick SQL-based report | Query Report | 5.3 |