Skip to content

Error Handling & Retry Strategy

Production integrations with the Knosc API must handle three distinct failure modes: transient server errors, business rule violations, and the queued-write pattern that occurs when procurement tables are locked. This guide covers each case with concrete retry logic.


Error Response Shape

All errors return a consistent JSON body:

{
  "Message": "Item WIDGET-99 not found in Item Master.",
  "Code": "ItemMaster.NotFound"
}

Always branch on Code - not on Message, which is human-readable and may change. Use Message for logging only.


HTTP Status → Retry Decision

Status Category Retry?
200 with notification_type: Warning Write queued No - operation accepted
400 Validation / business rule No - fix the request
401 Session expired Yes - refresh token first
403 Privilege denied No - escalate to admin
422 Missing CSRF token No - include X-XSRF-TOKEN
500 Server error Yes - with backoff
502 / 503 / 504 Gateway / overload Yes - with backoff

Pattern 1 - Automatic Token Refresh on 401

Access tokens expire. A robust client refreshes the token and retries the original request on receiving a 401, rather than aborting.

import requests

class KnoscClient:
    def __init__(self, base_url: str):
        self.base_url = base_url
        self.session = requests.Session()

    def login(self, username: str, password: str):
        r = self.session.post(f"{self.base_url}/login", json={
            "username": username,
            "password": password,
        })
        r.raise_for_status()

    def _refresh(self):
        r = self.session.post(f"{self.base_url}/refresh")
        r.raise_for_status()

    def request(self, method: str, path: str, **kwargs) -> requests.Response:
        """Make a request, refreshing the token once on 401."""
        response = self.session.request(method, f"{self.base_url}{path}", **kwargs)

        if response.status_code == 401:
            self._refresh()
            # Retry with fresh token
            response = self.session.request(method, f"{self.base_url}{path}", **kwargs)

        return response

Pattern 2 - Exponential Backoff on 5xx

Transient server errors should be retried with increasing delays. Cap retries at 3–5 attempts.

import time

def request_with_backoff(client, method: str, path: str, max_retries: int = 4, **kwargs) -> requests.Response:
    """Retry on 5xx with exponential backoff."""
    delay = 1  # seconds

    for attempt in range(max_retries):
        response = client.request(method, path, **kwargs)

        if response.status_code < 500:
            return response  # Success or client error - don't retry

        if attempt < max_retries - 1:
            print(f"[{response.status_code}] Retry {attempt + 1}/{max_retries - 1} in {delay}s…")
            time.sleep(delay)
            delay *= 2  # 1s → 2s → 4s → 8s

    return response  # Return last response after exhausting retries

Pattern 3 - Handling the Queued Write (notification_type: Warning)

Write operations (POST, PUT, DELETE) on orders, inventory, and forecasts return HTTP 200 with a Warning notification when procurement tables are locked during an integration run. The operation has been accepted and queued - do not retry it.

def write_with_queue_awareness(client, method: str, path: str, payload: dict) -> bool:
    """
    Returns True if the write completed immediately, False if it was queued.
    Raises on validation errors.
    """
    csrf = client.session.cookies.get("csrf_access_token")
    response = request_with_backoff(
        client, method, path,
        headers={"X-XSRF-TOKEN": csrf},
        json=payload,
    )

    if response.status_code != 200:
        body = response.json()
        raise ValueError(f"[{body.get('Code')}] {body.get('Message')}")

    body = response.json()
    if body.get("notification_type") == "Warning":
        print("Write queued - tables are locked during integration. Change will apply automatically.")
        return False  # Queued

    return True  # Applied immediately

Pattern 4 - Pre-flight Validation

400 errors from referential integrity failures (e.g. ItemMaster.NotFound) cannot be retried. Validate that referenced master data exists before writing orders.

def validate_item_numbers(client, item_numbers: list[str]) -> list[str]:
    """Return a list of item numbers that do NOT exist in the Item Master."""
    items = client.request("GET", "/item-master").json()["data"]["rows"]
    known = {row["Item Number"] for row in items}
    return [n for n in item_numbers if n not in known]


def validate_supplier(client, supplier_number: str) -> bool:
    suppliers = client.request("GET", "/supplier-master").json()["data"]["rows"]
    known = {row["Supplier Number"] for row in suppliers}
    return supplier_number in known


# Before creating a PO:
missing_items = validate_item_numbers(client, ["ITEM-A", "WIDGET-99"])
if missing_items:
    raise ValueError(f"Items not in Item Master: {missing_items}")

Pattern 5 - Structured Error Logging

Log enough context to reproduce and fix failures without re-running the entire integration.

import logging
import json
from datetime import datetime, timezone

logger = logging.getLogger("knosc.integration")

def safe_write(client, method: str, path: str, payload: dict, context: dict = None) -> dict:
    """
    Write with structured error capture.
    context: arbitrary metadata attached to the log record (e.g. source ERP id)
    """
    csrf = client.session.cookies.get("csrf_access_token")
    response = client.request(
        method, path,
        headers={"X-XSRF-TOKEN": csrf},
        json=payload,
    )

    log_record = {
        "timestamp": datetime.now(timezone.utc).isoformat(),
        "method": method,
        "path": path,
        "status": response.status_code,
        "context": context or {},
    }

    if response.status_code == 200:
        body = response.json()
        log_record["queued"] = body.get("notification_type") == "Warning"
        logger.info("write_ok", extra=log_record)
        return body

    body = response.json()
    log_record["error_code"] = body.get("Code")
    log_record["error_message"] = body.get("Message")
    log_record["payload_snippet"] = json.dumps(payload)[:500]
    logger.error("write_failed", extra=log_record)
    raise ValueError(f"[{body.get('Code')}] {body.get('Message')}")

Full Client with All Patterns Combined

import requests
import time
import logging

logger = logging.getLogger("knosc")

class KnoscClient:
    def __init__(self, base_url: str, max_retries: int = 4):
        self.base_url = base_url
        self.max_retries = max_retries
        self.session = requests.Session()

    def login(self, username: str, password: str):
        r = self.session.post(f"{self.base_url}/login", json={"username": username, "password": password})
        r.raise_for_status()

    def _refresh(self):
        self.session.post(f"{self.base_url}/refresh").raise_for_status()

    @property
    def csrf(self) -> str:
        return self.session.cookies.get("csrf_access_token", "")

    def get(self, path: str, **kwargs) -> dict:
        return self._request("GET", path, **kwargs).json()

    def post(self, path: str, payload: dict) -> dict:
        return self._request("POST", path, headers={"X-XSRF-TOKEN": self.csrf}, json=payload).json()

    def put(self, path: str, payload: dict) -> dict:
        return self._request("PUT", path, headers={"X-XSRF-TOKEN": self.csrf}, json=payload).json()

    def delete(self, path: str) -> dict:
        return self._request("DELETE", path, headers={"X-XSRF-TOKEN": self.csrf}).json()

    def _request(self, method: str, path: str, **kwargs) -> requests.Response:
        url = f"{self.base_url}{path}"
        delay = 1

        for attempt in range(self.max_retries):
            response = self.session.request(method, url, **kwargs)

            # Refresh once on 401
            if response.status_code == 401 and attempt == 0:
                logger.info("Token expired - refreshing")
                self._refresh()
                response = self.session.request(method, url, **kwargs)

            # Retry on 5xx
            if response.status_code >= 500 and attempt < self.max_retries - 1:
                logger.warning(f"[{response.status_code}] {method} {path} - retry {attempt + 1} in {delay}s")
                time.sleep(delay)
                delay *= 2
                continue

            return response

        return response

Common Error Codes - Quick Reference

Code HTTP Action
ItemMaster.NotFound 400 Ensure item exists before writing orders
SupplierMaster.NotFound 400 Ensure supplier exists before writing POs
WarehouseMaster.NotFound 400 Verify warehouse code is active
OrderList.MultipleWarehouses 400 Split the order by warehouse
OrderDetails.DuplicateRows 400 Deduplicate item/sub-item combinations
User.NotPrivileged 403 Contact administrator to update role
JSON.Invalid 400 Validate JSON before sending