Skip to content

Bulk Data Import

The Knosc API supports bulk data imports via file upload. This is the recommended approach for loading large volumes of master data or transactional records from an ERP or data warehouse - it is significantly faster than creating records one at a time through the REST endpoints.


How It Works

  1. Upload a file (XLSX or CSV) to POST /api/job/{trigger}
  2. The server queues the file for processing and returns a job_id
  3. Poll GET /api/job?jobs=[...] until the job status is completed or failed
  4. On failure, inspect the errors array to identify and correct bad rows

Supported Upload Types

Trigger What It Imports
item-master Item catalogue
customer-master Customer records
supplier-master Supplier records
warehouse-master Warehouse records
unit-master Units of measure
external-item-master External item ID mappings
procurement Purchase orders + line items
demand Sales orders + line items
manufacturing Manufacturing orders
inventory Inventory snapshot
forecast Forecast data
bill-of-materials BOM structure
supplier-allocation Supplier allocation percentages

Step 1 - Download the Template

Before uploading, download the blank template for the trigger you intend to use:

curl -b cookies.txt "https://acme.knosc.com/api/templates" -o templates.json

The response lists download links for each template type. Use the correct template - the column layout must match exactly.


Step 2 - Upload the File

CSRF=$(grep csrf_access_token cookies.txt | awk '{print $NF}')

curl -b cookies.txt -X POST \
  "https://acme.knosc.com/api/job/item-master" \
  -H "X-XSRF-TOKEN: $CSRF" \
  -F "file=@item-master-export.xlsx"

Response:

{
  "message": "File uploaded. Processing started.",
  "job_id": "item-master-upload-a1b2c3",
  "status": "queued"
}

Save the job_id - you need it to check status.


Step 3 - Poll for Completion

curl -b cookies.txt \
  "https://acme.knosc.com/api/job?jobs=%5B%7B%22trigger%22%3A%22item-master%22%2C%22job_id%22%3A%22item-master-upload-a1b2c3%22%7D%5D"

Response (running):

{
  "jobs": [
    {
      "trigger": "item-master",
      "job_id": "item-master-upload-a1b2c3",
      "status": "running",
      "rows_processed": 340,
      "errors": []
    }
  ]
}

Response (completed):

{
  "jobs": [
    {
      "trigger": "item-master",
      "job_id": "item-master-upload-a1b2c3",
      "status": "completed",
      "rows_processed": 1500,
      "errors": [],
      "completed_at": "2024-06-01T08:45:22Z"
    }
  ]
}

Response (failed):

{
  "jobs": [
    {
      "trigger": "item-master",
      "job_id": "item-master-upload-a1b2c3",
      "status": "failed",
      "rows_processed": 412,
      "errors": [
        { "row": 413, "message": "Item Unit 'EACH' not found in Unit Master" },
        { "row": 419, "message": "Duplicate Item Number: WIDGET-99" }
      ]
    }
  ]
}
Status Description
queued Waiting for a worker
running Actively processing rows
completed All rows imported successfully
failed Import halted - check errors

Python - Full Upload + Poll Workflow

import json
import time
import requests

BASE_URL = "https://acme.knosc.com/api"

def upload_and_wait(session, trigger: str, file_path: str, poll_interval: int = 5) -> dict:
    """Upload a file and block until the job completes or fails."""
    csrf = session.cookies.get("csrf_access_token")

    # Upload
    with open(file_path, "rb") as f:
        response = session.post(
            f"{BASE_URL}/job/{trigger}",
            headers={"X-XSRF-TOKEN": csrf},
            files={"file": (file_path, f)},
        )
    response.raise_for_status()
    job_id = response.json()["job_id"]
    print(f"[{trigger}] Uploaded. Job ID: {job_id}")

    # Poll
    while True:
        time.sleep(poll_interval)
        status_response = session.get(
            f"{BASE_URL}/job",
            params={"jobs": json.dumps([{"trigger": trigger, "job_id": job_id}])},
        )
        job = status_response.json()["jobs"][0]
        status = job["status"]
        print(f"[{trigger}] Status: {status} ({job.get('rows_processed', 0)} rows processed)")

        if status == "completed":
            print(f"[{trigger}] Done. {job['rows_processed']} rows imported.")
            return job
        elif status == "failed":
            print(f"[{trigger}] Failed. Errors:")
            for err in job.get("errors", []):
                print(f"  Row {err['row']}: {err['message']}")
            raise RuntimeError(f"Import failed for trigger '{trigger}'")


# Example: load master data in dependency order
upload_and_wait(session, "unit-master",      "exports/units.xlsx")
upload_and_wait(session, "warehouse-master", "exports/warehouses.xlsx")
upload_and_wait(session, "supplier-master",  "exports/suppliers.xlsx")
upload_and_wait(session, "customer-master",  "exports/customers.xlsx")
upload_and_wait(session, "item-master",      "exports/items.xlsx")

# Then transactional data
upload_and_wait(session, "inventory",   "exports/inventory.xlsx")
upload_and_wait(session, "procurement", "exports/purchase-orders.xlsx")
upload_and_wait(session, "demand",      "exports/sales-orders.xlsx")

Import Order - Dependencies

Some uploads depend on master data records existing first. Load in this order to avoid NotFound errors:

1. unit-master
2. warehouse-master
3. supplier-master  ←  procurement depends on this
4. customer-master  ←  demand depends on this
5. item-master      ←  all transactional data depends on this
─────────────────────────────────────────────
6. external-item-master
7. bill-of-materials
8. supplier-allocation
─────────────────────────────────────────────
9. inventory
10. procurement
11. demand
12. manufacturing
13. forecast

Queued Jobs During Integration Runs

If a data integration is currently running, upload jobs are queued rather than rejected. The response still returns a job_id and a queued status. Polling will show the job transitioning through queued → running → completed once the integration releases its table locks.

Do not retry the upload if you receive queued - the original job is already in the queue.