Skip to content

Bulk Data Operations

Problem: Import 50,000 historical ScoopJoy transactions from a CSV file without timing out, crashing, or corrupting data.

Solution: A complete bulk import pipeline with chunked processing, progress tracking, validation, and error reporting. The work happens in three files: a server-side import pipeline, a benchmark to prove the throughput, and a client page that triggers the job and shows progress.

  • Directoryscoopjoy/scoopjoy/
    • Directoryutils/
      • bulk_import.py the import pipeline (validate → chunk → insert)
      • bulk_benchmark.py compares insert methods
    • Directorypublic/js/
      • bulk_import.js the trigger page with a progress bar

The whole job runs in a background worker so an HTTP request never blocks on a 50,000-row import. In Express you would push this onto a Bull queue; in Frappe you hand it to frappe.enqueue on the long queue. The pipeline reads the CSV, validates every row in a separate pass, then inserts the survivors in chunks — falling back to row-by-row only when a chunk fails, so one bad row never sinks the whole batch.

The entry point is whitelisted and only enqueues — it returns immediately. Note the frappe.has_permission(...) gate: bulk_insert skips permission checks, so you must check up front yourself.

scoopjoy/scoopjoy/utils/bulk_import.py
import frappe
import csv
import io
from frappe.utils import cstr, getdate, flt, now, cint
from frappe.model.document import bulk_insert
@frappe.whitelist()
def enqueue_bulk_import(file_url):
"""
Entry point: enqueue the import as a background job.
Called from a custom button on a page or form.
"""
frappe.has_permission("Sales Invoice", "create", throw=True)
frappe.enqueue(
"scoopjoy.scoopjoy.utils.bulk_import.run_bulk_import",
file_url=file_url,
queue="long",
timeout=3600, # 1 hour max
job_id=f"bulk_import_{frappe.utils.now_datetime().isoformat()}",
)
return {"status": "queued", "message": "Import job has been queued. Check progress bar."}

The worker function orchestrates the four steps. The abort threshold is the key safety valve: if more than 10% of rows fail validation, the import bails out before inserting anything rather than committing a half-broken dataset.

scoopjoy/scoopjoy/utils/bulk_import.py
def run_bulk_import(file_url):
"""Main import function running in a background worker."""
frappe.publish_progress(0, title="Bulk Import", description="Starting...")
# Step 1: Read file (memory-efficient streaming)
rows = _read_csv_streaming(file_url)
# Step 2: Validate all rows first (separate pass)
valid_rows, errors = _validate_rows(rows)
if errors:
_report_validation_errors(errors)
if len(errors) > len(valid_rows) * 0.1: # >10% error rate
frappe.publish_progress(
100, title="Bulk Import",
description=f"Aborted: {len(errors)} validation errors (>{10}% threshold)"
)
return
# Step 3: Bulk insert in chunks
total = len(valid_rows)
chunk_size = 1000
inserted = 0
insert_errors = []
for i in range(0, total, chunk_size):
chunk = valid_rows[i:i + chunk_size]
try:
_insert_chunk(chunk)
inserted += len(chunk)
except Exception as e:
# Fall back to row-by-row for this chunk to isolate bad rows
chunk_inserted, chunk_errors = _insert_chunk_safe(chunk)
inserted += chunk_inserted
insert_errors.extend(chunk_errors)
# Progress update
pct = int((i + len(chunk)) / total * 100)
frappe.publish_progress(
pct,
title="Bulk Import",
description=f"Inserted {inserted}/{total} records..."
)
# Step 4: Final report
_create_import_log(total, inserted, errors, insert_errors)
frappe.publish_progress(
100, title="Bulk Import",
description=f"Complete: {inserted}/{total} inserted, {len(errors) + len(insert_errors)} errors"
)

Reading the CSV is streamed — the DictReader iterates rather than slurping the whole file, with a hard cap at 100,000 rows as a runaway guard.

scoopjoy/scoopjoy/utils/bulk_import.py
def _read_csv_streaming(file_url):
"""Memory-efficient CSV reading -- iterate, don't load all at once."""
file_doc = frappe.get_doc("File", {"file_url": file_url})
file_path = file_doc.get_full_path()
rows = []
with open(file_path, "r", encoding="utf-8-sig") as f:
reader = csv.DictReader(f)
for i, row in enumerate(reader):
rows.append(row)
# Safety: don't load more than 100K rows
if i >= 100000:
frappe.logger().warning("Bulk import: truncated at 100,000 rows")
break
return rows

Validation runs as a single pass over all rows. The trick to keeping it fast is pre-fetching the valid link targets (outlets, items, customers) into Python sets once, so each row checks membership in memory instead of hitting the database.

scoopjoy/scoopjoy/utils/bulk_import.py
def _validate_rows(rows):
"""
Validate all rows before inserting any.
Returns (valid_rows, errors).
"""
valid = []
errors = []
# Pre-fetch valid values for validation (single queries)
valid_outlets = set(frappe.get_all("Franchise Outlet", pluck="name"))
valid_items = set(frappe.get_all("Item", pluck="name"))
valid_customers = set(frappe.get_all("Customer", pluck="name"))
for i, row in enumerate(rows):
row_num = i + 2 # +2 for 1-indexed + header row
row_errors = []
# Required fields
if not row.get("posting_date"):
row_errors.append("posting_date is required")
else:
try:
getdate(row["posting_date"])
except Exception:
row_errors.append(f"Invalid date: {row['posting_date']}")
if not row.get("franchise_outlet"):
row_errors.append("franchise_outlet is required")
elif row["franchise_outlet"] not in valid_outlets:
row_errors.append(f"Unknown outlet: {row['franchise_outlet']}")
if not row.get("customer"):
row_errors.append("customer is required")
elif row["customer"] not in valid_customers:
row_errors.append(f"Unknown customer: {row['customer']}")
if not row.get("item_code"):
row_errors.append("item_code is required")
elif row["item_code"] not in valid_items:
row_errors.append(f"Unknown item: {row['item_code']}")
if not flt(row.get("qty")):
row_errors.append("qty must be > 0")
if not flt(row.get("rate")):
row_errors.append("rate must be > 0")
if row_errors:
errors.append({"row": row_num, "data": row, "errors": row_errors})
else:
valid.append(row)
return valid, errors

The fast path is _insert_chunk. It builds in-memory docs, stamps the fields that bulk_insert will not generate for you (name, creation, modified, owner, modified_by), then writes them in one shot. bulk_insert skips hooks, autoname, and validation — that is exactly why it is fast, and why the separate validation pass above is mandatory.

scoopjoy/scoopjoy/utils/bulk_import.py
def _insert_chunk(chunk):
"""
Bulk insert a chunk using frappe's bulk_insert.
No hooks, no validation -- raw speed.
"""
docs = []
timestamp = now()
for row in chunk:
doc = frappe.new_doc("Historical Transaction")
doc.posting_date = getdate(row["posting_date"])
doc.franchise_outlet = row["franchise_outlet"]
doc.customer = row["customer"]
doc.item_code = row["item_code"]
doc.item_name = row.get("item_name", "")
doc.qty = flt(row["qty"])
doc.rate = flt(row["rate"])
doc.amount = flt(row["qty"]) * flt(row["rate"])
doc.order_type = row.get("order_type", "Dine-In")
doc.source_file = row.get("_source_file", "")
# Required fields for bulk_insert (no autoname/hooks)
doc.name = frappe.generate_hash(length=10)
doc.creation = timestamp
doc.modified = timestamp
doc.owner = frappe.session.user
doc.modified_by = frappe.session.user
docs.append(doc)
# bulk_insert: ~25x faster than individual doc.insert()
bulk_insert("Historical Transaction", docs, ignore_duplicates=True, chunk_size=500)
frappe.db.commit()

When a chunk insert throws, the pipeline drops to _insert_chunk_safe, which inserts row by row through the normal ORM so it can pinpoint and skip the offending rows while keeping the good ones.

scoopjoy/scoopjoy/utils/bulk_import.py
def _insert_chunk_safe(chunk):
"""Fallback: row-by-row insert to isolate failures."""
inserted = 0
errors = []
for row in chunk:
try:
doc = frappe.new_doc("Historical Transaction")
doc.posting_date = getdate(row["posting_date"])
doc.franchise_outlet = row["franchise_outlet"]
doc.customer = row["customer"]
doc.item_code = row["item_code"]
doc.item_name = row.get("item_name", "")
doc.qty = flt(row["qty"])
doc.rate = flt(row["rate"])
doc.amount = flt(row["qty"]) * flt(row["rate"])
doc.order_type = row.get("order_type", "Dine-In")
doc.insert(ignore_permissions=True)
inserted += 1
if inserted % 100 == 0:
frappe.db.commit()
except Exception as e:
errors.append({"data": row, "error": str(e)})
frappe.db.commit()
return inserted, errors

Validation failures are written to a downloadable CSV and pushed to the user over realtime, so the importer gets an actionable error report instead of a silent partial import.

scoopjoy/scoopjoy/utils/bulk_import.py
def _report_validation_errors(errors):
"""Save validation errors to a file for user download."""
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(["Row", "Errors", "posting_date", "franchise_outlet", "customer", "item_code"])
for err in errors:
writer.writerow([
err["row"],
"; ".join(err["errors"]),
err["data"].get("posting_date", ""),
err["data"].get("franchise_outlet", ""),
err["data"].get("customer", ""),
err["data"].get("item_code", ""),
])
# Save as a file
content = output.getvalue()
file_doc = frappe.get_doc({
"doctype": "File",
"file_name": f"import_errors_{frappe.utils.now_datetime().isoformat()}.csv",
"content": content,
"is_private": 1,
})
file_doc.save(ignore_permissions=True)
frappe.publish_realtime(
"bulk_import_errors",
{"file_url": file_doc.file_url, "error_count": len(errors)},
user=frappe.session.user,
)
def _create_import_log(total, inserted, validation_errors, insert_errors):
"""Create a log document for audit trail."""
frappe.get_doc({
"doctype": "Comment",
"comment_type": "Info",
"reference_doctype": "Historical Transaction",
"reference_name": "Historical Transaction",
"content": (
f"Bulk Import Complete:\n"
f"- Total rows: {total}\n"
f"- Inserted: {inserted}\n"
f"- Validation errors: {len(validation_errors)}\n"
f"- Insert errors: {len(insert_errors)}\n"
f"- Timestamp: {now()}\n"
f"- User: {frappe.session.user}"
),
}).insert(ignore_permissions=True)

To prove the throughput claim, this script races three approaches over the same record count. Run it from the bench console to see the numbers on your own hardware.

scoopjoy/scoopjoy/utils/bulk_benchmark.py
import frappe
import time
def benchmark_insert_methods(n=5000):
"""
Run from bench console:
>>> from scoopjoy.scoopjoy.utils.bulk_benchmark import benchmark_insert_methods
>>> benchmark_insert_methods(5000)
"""
results = {}
# --- Method 1: Loop with doc.insert() ---
start = time.time()
for i in range(n):
doc = frappe.new_doc("Historical Transaction")
doc.posting_date = "2025-01-01"
doc.franchise_outlet = "OUTLET-001"
doc.customer = "CUST-001"
doc.item_code = "ITEM-001"
doc.qty = 1
doc.rate = 100
doc.amount = 100
doc.insert(ignore_permissions=True)
if i % 500 == 0:
frappe.db.commit()
frappe.db.commit()
results["doc.insert() loop"] = time.time() - start
# Cleanup
frappe.db.sql("DELETE FROM `tabHistorical Transaction` WHERE franchise_outlet = 'OUTLET-001'")
frappe.db.commit()
# --- Method 2: frappe.model.document.bulk_insert ---
from frappe.model.document import bulk_insert
start = time.time()
docs = []
timestamp = frappe.utils.now()
for i in range(n):
doc = frappe.new_doc("Historical Transaction")
doc.posting_date = "2025-01-01"
doc.franchise_outlet = "OUTLET-001"
doc.customer = "CUST-001"
doc.item_code = "ITEM-001"
doc.qty = 1
doc.rate = 100
doc.amount = 100
doc.name = frappe.generate_hash(length=10)
doc.creation = timestamp
doc.modified = timestamp
doc.owner = "Administrator"
doc.modified_by = "Administrator"
docs.append(doc)
bulk_insert("Historical Transaction", docs, chunk_size=1000)
frappe.db.commit()
results["bulk_insert()"] = time.time() - start
# Cleanup
frappe.db.sql("DELETE FROM `tabHistorical Transaction` WHERE franchise_outlet = 'OUTLET-001'")
frappe.db.commit()
# --- Method 3: Raw SQL INSERT with executemany ---
start = time.time()
values = []
timestamp = frappe.utils.now()
for i in range(n):
name = frappe.generate_hash(length=10)
values.append((
name, "2025-01-01", "OUTLET-001", "CUST-001", "ITEM-001",
1, 100, 100, timestamp, timestamp, "Administrator", "Administrator"
))
# Insert in chunks
for j in range(0, len(values), 1000):
chunk = values[j:j + 1000]
placeholders = ", ".join(["%s"] * len(chunk[0]))
frappe.db.sql(f"""
INSERT INTO `tabHistorical Transaction`
(name, posting_date, franchise_outlet, customer, item_code,
qty, rate, amount, creation, modified, owner, modified_by)
VALUES ({placeholders})
""", chunk, as_list=True)
frappe.db.commit()
results["raw SQL executemany"] = time.time() - start
# Cleanup
frappe.db.sql("DELETE FROM `tabHistorical Transaction` WHERE franchise_outlet = 'OUTLET-001'")
frappe.db.commit()
# --- Print results ---
print(f"\nBenchmark: {n} records")
print("-" * 50)
for method, elapsed in sorted(results.items(), key=lambda x: x[1]):
rate = n / elapsed
print(f" {method:30s} {elapsed:8.2f}s ({rate:,.0f} records/sec)")
return results

The relative ordering is the lesson: raw executemany is fastest, bulk_insert is a close and far more ergonomic second, and a doc.insert() loop is roughly 30x slower because it pays ORM, hook, and autoname overhead on every row.

Benchmark output
Benchmark: 5000 records
--------------------------------------------------
raw SQL executemany 0.42s (11,905 records/sec)
bulk_insert() 0.85s (5,882 records/sec)
doc.insert() loop 12.50s (400 records/sec)
For 50,000 records:
raw SQL executemany 3.8s (13,158 records/sec)
bulk_insert() 8.5s (5,882 records/sec)
doc.insert() loop 125.0s (400 records/sec)

Client-side: trigger the import with a progress bar

Section titled “Client-side: trigger the import with a progress bar”

A custom Desk page provides the upload field and start button. The whitelisted method returns instantly; progress streams in over the realtime channel that frappe.publish_progress writes to, and the bulk_import_errors listener surfaces the downloadable error report when validation finds problems.

scoopjoy/scoopjoy/public/js/bulk_import.js
frappe.pages["bulk-import"].on_page_load = function (wrapper) {
const page = frappe.ui.make_app_page({
parent: wrapper,
title: "Bulk Import Historical Transactions",
single_column: true,
});
const upload_field = page.add_field({
fieldname: "import_file",
label: __("CSV File"),
fieldtype: "Attach",
reqd: 1,
});
page.set_primary_action(__("Start Import"), () => {
const file_url = upload_field.get_value();
if (!file_url) {
frappe.msgprint(__("Please attach a CSV file"));
return;
}
frappe.call({
method: "scoopjoy.scoopjoy.utils.bulk_import.enqueue_bulk_import",
args: { file_url },
callback: function (r) {
if (r.message) {
frappe.show_alert({
message: __("Import queued. Watch the progress bar."),
indicator: "green",
});
}
},
});
});
// Listen for error file notification
frappe.realtime.on("bulk_import_errors", (data) => {
frappe.msgprint({
title: __("Import Errors Found"),
message: __(
"{0} rows had validation errors. <a href='{1}'>Download error report</a>.",
[data.error_count, data.file_url]
),
indicator: "orange",
});
});
};
SymptomDiagnosisRecipe
Page loads slowlyN+1 queries in server log5.5
Report takes 10+ secondsFull table scan (check EXPLAIN)5.6
Dashboard query is inherently expensiveNeed pre-aggregated data5.7
Same expensive query runs repeatedlyAdd caching layer5.4
Bulk import times outUsing doc.insert() in a loop5.8
Complex query is hard to maintain as raw SQLUse frappe.qb5.1
Need a visual report with chartsScript Report5.2
Need a quick SQL-based reportQuery Report5.3