Error Handling & Retry Strategy¶
Production integrations with the Knosc API must handle three distinct failure modes: transient server errors, business rule violations, and the queued-write pattern that occurs when procurement tables are locked. This guide covers each case with concrete retry logic.
Error Response Shape¶
All errors return a consistent JSON body:
Always branch on Code - not on Message, which is human-readable and may change. Use Message for logging only.
HTTP Status → Retry Decision¶
| Status | Category | Retry? |
|---|---|---|
200 with notification_type: Warning |
Write queued | No - operation accepted |
400 |
Validation / business rule | No - fix the request |
401 |
Session expired | Yes - refresh token first |
403 |
Privilege denied | No - escalate to admin |
422 |
Missing CSRF token | No - include X-XSRF-TOKEN |
500 |
Server error | Yes - with backoff |
502 / 503 / 504 |
Gateway / overload | Yes - with backoff |
Pattern 1 - Automatic Token Refresh on 401¶
Access tokens expire. A robust client refreshes the token and retries the original request on receiving a 401, rather than aborting.
import requests
class KnoscClient:
def __init__(self, base_url: str):
self.base_url = base_url
self.session = requests.Session()
def login(self, username: str, password: str):
r = self.session.post(f"{self.base_url}/login", json={
"username": username,
"password": password,
})
r.raise_for_status()
def _refresh(self):
r = self.session.post(f"{self.base_url}/refresh")
r.raise_for_status()
def request(self, method: str, path: str, **kwargs) -> requests.Response:
"""Make a request, refreshing the token once on 401."""
response = self.session.request(method, f"{self.base_url}{path}", **kwargs)
if response.status_code == 401:
self._refresh()
# Retry with fresh token
response = self.session.request(method, f"{self.base_url}{path}", **kwargs)
return response
Pattern 2 - Exponential Backoff on 5xx¶
Transient server errors should be retried with increasing delays. Cap retries at 3–5 attempts.
import time
def request_with_backoff(client, method: str, path: str, max_retries: int = 4, **kwargs) -> requests.Response:
"""Retry on 5xx with exponential backoff."""
delay = 1 # seconds
for attempt in range(max_retries):
response = client.request(method, path, **kwargs)
if response.status_code < 500:
return response # Success or client error - don't retry
if attempt < max_retries - 1:
print(f"[{response.status_code}] Retry {attempt + 1}/{max_retries - 1} in {delay}s…")
time.sleep(delay)
delay *= 2 # 1s → 2s → 4s → 8s
return response # Return last response after exhausting retries
Pattern 3 - Handling the Queued Write (notification_type: Warning)¶
Write operations (POST, PUT, DELETE) on orders, inventory, and forecasts return HTTP 200 with a Warning notification when procurement tables are locked during an integration run. The operation has been accepted and queued - do not retry it.
def write_with_queue_awareness(client, method: str, path: str, payload: dict) -> bool:
"""
Returns True if the write completed immediately, False if it was queued.
Raises on validation errors.
"""
csrf = client.session.cookies.get("csrf_access_token")
response = request_with_backoff(
client, method, path,
headers={"X-XSRF-TOKEN": csrf},
json=payload,
)
if response.status_code != 200:
body = response.json()
raise ValueError(f"[{body.get('Code')}] {body.get('Message')}")
body = response.json()
if body.get("notification_type") == "Warning":
print("Write queued - tables are locked during integration. Change will apply automatically.")
return False # Queued
return True # Applied immediately
Pattern 4 - Pre-flight Validation¶
400 errors from referential integrity failures (e.g. ItemMaster.NotFound) cannot be retried. Validate that referenced master data exists before writing orders.
def validate_item_numbers(client, item_numbers: list[str]) -> list[str]:
"""Return a list of item numbers that do NOT exist in the Item Master."""
items = client.request("GET", "/item-master").json()["data"]["rows"]
known = {row["Item Number"] for row in items}
return [n for n in item_numbers if n not in known]
def validate_supplier(client, supplier_number: str) -> bool:
suppliers = client.request("GET", "/supplier-master").json()["data"]["rows"]
known = {row["Supplier Number"] for row in suppliers}
return supplier_number in known
# Before creating a PO:
missing_items = validate_item_numbers(client, ["ITEM-A", "WIDGET-99"])
if missing_items:
raise ValueError(f"Items not in Item Master: {missing_items}")
Pattern 5 - Structured Error Logging¶
Log enough context to reproduce and fix failures without re-running the entire integration.
import logging
import json
from datetime import datetime, timezone
logger = logging.getLogger("knosc.integration")
def safe_write(client, method: str, path: str, payload: dict, context: dict = None) -> dict:
"""
Write with structured error capture.
context: arbitrary metadata attached to the log record (e.g. source ERP id)
"""
csrf = client.session.cookies.get("csrf_access_token")
response = client.request(
method, path,
headers={"X-XSRF-TOKEN": csrf},
json=payload,
)
log_record = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"method": method,
"path": path,
"status": response.status_code,
"context": context or {},
}
if response.status_code == 200:
body = response.json()
log_record["queued"] = body.get("notification_type") == "Warning"
logger.info("write_ok", extra=log_record)
return body
body = response.json()
log_record["error_code"] = body.get("Code")
log_record["error_message"] = body.get("Message")
log_record["payload_snippet"] = json.dumps(payload)[:500]
logger.error("write_failed", extra=log_record)
raise ValueError(f"[{body.get('Code')}] {body.get('Message')}")
Full Client with All Patterns Combined¶
import requests
import time
import logging
logger = logging.getLogger("knosc")
class KnoscClient:
def __init__(self, base_url: str, max_retries: int = 4):
self.base_url = base_url
self.max_retries = max_retries
self.session = requests.Session()
def login(self, username: str, password: str):
r = self.session.post(f"{self.base_url}/login", json={"username": username, "password": password})
r.raise_for_status()
def _refresh(self):
self.session.post(f"{self.base_url}/refresh").raise_for_status()
@property
def csrf(self) -> str:
return self.session.cookies.get("csrf_access_token", "")
def get(self, path: str, **kwargs) -> dict:
return self._request("GET", path, **kwargs).json()
def post(self, path: str, payload: dict) -> dict:
return self._request("POST", path, headers={"X-XSRF-TOKEN": self.csrf}, json=payload).json()
def put(self, path: str, payload: dict) -> dict:
return self._request("PUT", path, headers={"X-XSRF-TOKEN": self.csrf}, json=payload).json()
def delete(self, path: str) -> dict:
return self._request("DELETE", path, headers={"X-XSRF-TOKEN": self.csrf}).json()
def _request(self, method: str, path: str, **kwargs) -> requests.Response:
url = f"{self.base_url}{path}"
delay = 1
for attempt in range(self.max_retries):
response = self.session.request(method, url, **kwargs)
# Refresh once on 401
if response.status_code == 401 and attempt == 0:
logger.info("Token expired - refreshing")
self._refresh()
response = self.session.request(method, url, **kwargs)
# Retry on 5xx
if response.status_code >= 500 and attempt < self.max_retries - 1:
logger.warning(f"[{response.status_code}] {method} {path} - retry {attempt + 1} in {delay}s")
time.sleep(delay)
delay *= 2
continue
return response
return response
Common Error Codes - Quick Reference¶
| Code | HTTP | Action |
|---|---|---|
ItemMaster.NotFound |
400 | Ensure item exists before writing orders |
SupplierMaster.NotFound |
400 | Ensure supplier exists before writing POs |
WarehouseMaster.NotFound |
400 | Verify warehouse code is active |
OrderList.MultipleWarehouses |
400 | Split the order by warehouse |
OrderDetails.DuplicateRows |
400 | Deduplicate item/sub-item combinations |
User.NotPrivileged |
403 | Contact administrator to update role |
JSON.Invalid |
400 | Validate JSON before sending |