# SimpleQ — llms-full.txt > Complete documentation for SimpleQ, a managed job queue service (Queues as a Service). This file concatenates every concept page, example, and a condensed API reference so that LLM agents can answer implementation questions from a single fetch. Source: https://docs.simpleq.io Manifest: https://docs.simpleq.io/llms.txt Generated: 2026-06-08 --- # Concepts ## Ack mode Source: https://docs.simpleq.io/concepts/ack-mode A queue in **ack mode** acknowledges the webhook delivery immediately and waits for your worker to report the real outcome out of band. Use it for work that might exceed the 15-second webhook ceiling — most LLM calls, anything synchronous that touches a slow downstream — and for work where you want to relay backpressure ([defer](./backpressure)) or signal a hard failure ([nack](./backpressure)) cleanly. ## Why it exists SimpleQ enforces a 15-second timeout on every webhook request. A standard-mode webhook that takes longer is aborted and treated as a failure. That's the right default for fast handlers — but it rules out anything that routinely runs longer: a 40-second Claude generation, a video transcode, a slow third-party API. Ack mode splits the two concerns: the webhook handler returns `2xx` the moment it has the job in hand (well under the 15s ceiling), and the worker reports the real outcome later through one of three callbacks. ## The three outcomes | Callback | When to use | Effect | | --- | --- | --- | | `POST /v1/jobs/:id/ack` | The work succeeded. | Job → `completed`. | | `POST /v1/jobs/:id/nack` `{ retryable }` | The work failed. | `retryable: true` → retry with backoff (counts against `maxAttempts`). `retryable: false` → straight to the DLQ. | | `POST /v1/jobs/:id/defer` `{ retryAfter }` | A downstream rate-limited you. | Job is held and redelivered after `retryAfter` seconds. **No attempt is burned.** See [backpressure](./backpressure). | All three require a queue with `mode: 'ack'`. Authenticate with your API key (`Authorization: Bearer sq_live_...`). ## Minimal worker shape ::: code-group ```js [Express] app.post('/webhook', async (req, res) => { // verifySignature → see https://docs.simpleq.io/concepts/signature-verification if (!verifySignature(req.body, req.headers['x-simpleq-signature'])) { return res.status(401).end(); } // Ack mode: 200 immediately, then run the work out of band. res.status(200).end(); const job = JSON.parse(req.body.toString('utf8')); try { await doTheWork(job.payload); await callback(job.id, 'ack'); // → POST /v1/jobs/:id/ack } catch (err) { await callback(job.id, 'nack', { retryable: isTransient(err) }); // → POST /v1/jobs/:id/nack } // For downstream backpressure (429/503/529), see https://docs.simpleq.io/concepts/backpressure // → callback(job.id, 'defer', { retryAfter }) }); ``` ```python [Python (FastAPI)] @app.post("/webhook") async def webhook( request: Request, background_tasks: BackgroundTasks, x_simpleq_signature: str | None = Header(default=None), ): # verify_signature → see https://docs.simpleq.io/concepts/signature-verification raw = await request.body() # raw bytes, BEFORE parsing if not verify_signature(raw, x_simpleq_signature): return Response(status_code=401) # Ack mode: 200 immediately, then run the work out of band. job = json.loads(raw) # BackgroundTasks runs in-process — if this worker crashes mid-job the # work is lost, but ackTimeout + ackTimeoutAction: "retry" redelivers it. background_tasks.add_task(process_job, job) return Response(status_code=200) async def process_job(job: dict) -> None: try: await do_the_work(job["payload"]) await callback(job["id"], "ack") # → POST /v1/jobs/:id/ack except Exception as err: await callback(job["id"], "nack", {"retryable": is_transient(err)}) # → POST /v1/jobs/:id/nack # For downstream backpressure (429/503/529), see https://docs.simpleq.io/concepts/backpressure # → callback(job["id"], "defer", {"retryAfter": ...}) ``` ::: A complete handler — including the raw-body access needed to preserve the body for signature verification, plus `verifySignature` and `callback` defined — lives in the [generic ack worker example](/examples/generic-ack-worker/). ## When to use ack mode vs standard mode | Use **standard** when | Use **ack** when | | --- | --- | | The handler finishes in well under 15s. | The handler routinely runs longer than 15s. | | You want SimpleQ to mark the job done the moment your webhook returns 2xx. | You need to report success / failure / backpressure separately from "I received it." | | The work is cheap to repeat on retry. | The work is expensive (LLM tokens, downstream quota) and you want explicit control over what gets retried. | If you're unsure, start with standard. Switch to ack the first time you find yourself wishing you could say "I got it, give me a minute" or "the downstream just rate-limited me." ## Staying resilient across redeliveries A worker that succeeds (spends tokens, writes results) then dies before sending `/ack` will be redelivered after `ackTimeout`. SimpleQ guarantees durable delivery, and we make it easy to dedupe redeliveries so your worker stays resilient — see [idempotency](./idempotency) for the pattern. --- ## Backpressure Source: https://docs.simpleq.io/concepts/backpressure When a downstream returns 429, 503, or 529, your work didn't fail — it was rate-limited. SimpleQ models this as **defer**: the job is held, redelivered after a delay, and no attempt is burned. The `maxAttempts` budget is spent on real failures only, so a job can be deferred indefinitely against a sustained rate limit and still complete the moment capacity returns. See [ack mode](./ack-mode) for how defer fits alongside `ack` and `nack`. ## Trigger from ack mode In ack mode, your worker reports a defer with a callback: ``` POST /v1/jobs/:id/defer { "retryAfter": 30, "reason": "anthropic 429" } ``` `retryAfter` is in **seconds** (0 to 3600). This matches the wire format every major provider hands you: Anthropic and OpenAI return `Retry-After: `, Gemini returns `retryDelay: "s"`. Pass the value through. ::: code-group ```js [Express] app.post('/webhook', async (req, res) => { // verifySignature → see https://docs.simpleq.io/concepts/signature-verification // 200 immediately, then process out of band res.status(200).end(); const job = JSON.parse(req.body.toString('utf8')); try { await callAnthropic(job.payload); await callback(job.id, 'ack'); } catch (err) { if (err.status === 429 || err.status === 503) { await callback(job.id, 'defer', { retryAfter: err.headers['retry-after'] ?? 10 }); } else if (err.status === 529) { await callback(job.id, 'defer', { retryAfter: 5, reason: '529 overloaded' }); } else if (err.status >= 400 && err.status < 500) { await callback(job.id, 'nack', { retryable: false }); } else { await callback(job.id, 'nack', { retryable: true }); } } }); ``` ```python [Python (FastAPI)] @app.post("/webhook") async def webhook( request: Request, background_tasks: BackgroundTasks, x_simpleq_signature: str | None = Header(default=None), ): raw = await request.body() # verify before parsing — see https://docs.simpleq.io/concepts/signature-verification if not verify_signature(raw, x_simpleq_signature): return Response(status_code=401) # 200 immediately, then process out of band # BackgroundTasks runs in-process — if this worker crashes mid-job the # work is lost, but ackTimeout + ackTimeoutAction: "retry" redelivers it. background_tasks.add_task(process_job, json.loads(raw)) return Response(status_code=200) async def process_job(job: dict) -> None: try: await call_anthropic(job["payload"]) await callback(job["id"], "ack") except APIStatusError as err: status = err.status_code if status in (429, 503): await callback(job["id"], "defer", {"retryAfter": parse_retry_after(err.response.headers.get("retry-after"))}) elif status == 529: await callback(job["id"], "defer", {"retryAfter": 5, "reason": "529 overloaded"}) elif 400 <= status < 500: await callback(job["id"], "nack", {"retryable": False}) else: await callback(job["id"], "nack", {"retryable": True}) ``` ::: A complete handler, with `verifySignature` and `callback` defined, lives in the [generic ack worker example](/examples/generic-ack-worker/). ## Trigger from standard mode In standard mode, your synchronous webhook handler signals backpressure by **returning a 429, 503, or 529** with a `Retry-After` header. SimpleQ reads the header and holds the job for that many seconds — no attempt burned, same effect as the ack-mode `/defer` call. ::: code-group ```js [Express] app.post('/webhook', express.raw({ type: 'application/json' }), async (req, res) => { // verifySignature → see https://docs.simpleq.io/concepts/signature-verification if (!verifySignature(req.body, req.headers['x-simpleq-signature'])) { return res.status(401).end(); } const job = JSON.parse(req.body.toString('utf8')); try { await doTheWork(job.payload); res.status(200).end(); } catch (err) { if (err.status === 429 || err.status === 503) { // Relay the downstream Retry-After (seconds) — SimpleQ honors it. res.set('retry-after', String(err.headers['retry-after'] ?? 30)).status(err.status).end(); } else { res.status(500).end(); } } }); ``` ```python [Python (FastAPI)] @app.post("/webhook") async def webhook(request: Request, x_simpleq_signature: str | None = Header(default=None)): # Verify over the raw bytes BEFORE parsing — see https://docs.simpleq.io/concepts/signature-verification raw = await request.body() if not verify_signature(raw, x_simpleq_signature): return Response(status_code=401) job = json.loads(raw) try: await do_the_work(job["payload"]) return Response(status_code=200) except APIStatusError as err: if err.status_code in (429, 503): # Relay the downstream Retry-After (seconds) — SimpleQ honors it. retry_after = err.response.headers.get("retry-after", "30") return Response(status_code=err.status_code, headers={"retry-after": str(retry_after)}) return Response(status_code=500) ``` ::: If the response omits `Retry-After`, SimpleQ falls back to a 60-second hold. ## 529 (provider overloaded) 529 means "the upstream is over capacity" — not your fault, not billed, no trustworthy `Retry-After`. Defer with a small fixed delay (5–10 seconds is a good default): ```js callback(job.id, 'defer', { retryAfter: 5, reason: 'anthropic 529 overloaded' }); ``` Because defers don't burn attempts, your job rides out the outage and resumes the moment capacity returns. If you want to cap how long that takes — say, fail over to a different region or model after N attempts — count 529s in your worker and switch to `nack` when you hit the limit. The mechanism is yours to compose. ## Sizing `maxAttempts` Because deferred jobs don't count, `maxAttempts` is a budget for **real** failures only — 5xx, network drops, worker crashes. A small `maxAttempts` (3–4) covers a worker that's genuinely broken; the defer mechanism handles backpressure indefinitely without ever touching that budget. The split lets you size each one for what it actually means. ## Let SimpleQ own the backoff When you relay `Retry-After` to `/defer` (or return it from a standard-mode webhook), SimpleQ becomes the single source of truth for retry timing: - One backoff strategy across all jobs and queues, not whatever each SDK ships with this month. - Rate-aware redelivery — deferred jobs respect the queue's `concurrency` and `rateLimitMax` when they come back, so a burst of recovered jobs doesn't immediately re-saturate the downstream. - Visibility — every defer is recorded in the job's history, so you can see what backed off and why. To get this, set `maxRetries: 0` (or the equivalent) when constructing your provider SDK client. The provider's `Retry-After` then surfaces to your handler, you pass it to SimpleQ, and the queue handles the rest. --- ## Comparison Source: https://docs.simpleq.io/concepts/comparison/ How SimpleQ compares to other queue and task-processing tools. SimpleQ is a managed queue built for AI-heavy backends and API-dependent workloads — it delivers jobs to your webhook with retries, rate limiting, a three-signal ack protocol, and backpressure handling that never wastes your retry budget. ## Quick reference | | SimpleQ | AWS SQS | Cloud Tasks | BullMQ + Redis | Inngest | Trigger.dev | Apache Kafka | RabbitMQ | | --- | --- | --- | --- | --- | --- | --- | --- | --- | | **Category** | Managed queue | Managed queue | Managed queue | Self-hosted queue | Workflow engine | Task orchestration | Event streaming / log | Self-hosted broker | | **Delivery** | Push (webhook) | Pull (polling) | Push (HTTP) | In-process worker | Serverless fn | Managed execution | Pull (consumer polls) | Push/pull (AMQP) | | **Retries** | Configurable backoff | Visibility timeout | Configurable backoff | Configurable backoff | Per-step | Per-task | DIY (retry topics) | Requeue / DLX + TTL | | **Rate limiting** | Per-queue fixed-window | None built-in | Per-queue | Extension (extra Redis) | Per-function | Concurrency control | Client/broker quotas | Prefetch (QoS) | | **Ack mode** | Three-signal: ack / nack / defer | Visibility timeout | No | No | N/A (durable exec) | N/A (managed exec) | Offset commit | Native (ack/nack/reject) | | **Backpressure** | Defer (never burns attempts) | Manual | Retry (burns attempts) | Manual | Step retry | Task retry | Consumer pause/resume | Prefetch + nack/requeue | | **DLQ + replay** | Built-in with API replay | DLQ only (no replay API) | No | Failed job retention | Per-function | Per-task | Dead-letter topic + offset replay | Native DLX (manual replay) | | **Idempotency** | Publish-boundary dedup | Content-based (FIFO only) | Task name dedup | Manual | Event key | Idempotency key | Idempotent producer / EOS | Manual (publisher confirms) | | **Per-job audit** | Full attempt history | CloudWatch aggregates | Cloud Monitoring | Bull Board (if set up) | Per-step logs | Per-task logs | Log offsets (no attempt history) | Mgmt UI stats (no attempt history) | | **Webhook signing** | HMAC-SHA256, per-queue secret | N/A (pull) | OIDC token | N/A (in-process) | Signing key (per-app) | N/A (managed) | N/A (pull, TLS+SASL) | N/A (AMQP, TLS+SASL) | | **AI features** | Anthropic + OpenAI templates, seconds convention, ack-mode escape hatch | None | None | None | None | OpenAI integration | None | None | | **Runs your code?** | No | No | No | Yes (in-process) | Yes (serverless) | Yes (managed) | No | No | | **Workflow engine?** | No | No | No | Flows (basic) | Yes | Yes | No (streaming log) | No | ## What SimpleQ is A managed transport service built for backends that call LLM APIs, deliver webhooks, and sync to third-party APIs. You publish a job via REST API; SimpleQ queues it and POSTs it to your webhook with retries, backoff, rate limiting, and a dead-letter queue. You run the business logic — SimpleQ handles the delivery. ## What SimpleQ is not Not a workflow engine. No durable execution, no step functions, no event coordination. If you need multi-step orchestration, look at Inngest or Trigger.dev. SimpleQ solves the "queue a job and deliver it reliably" problem — and nothing beyond that. ## Key differentiators **Three-signal ack protocol.** Ack (success), nack (failure with a retryable flag), defer (backpressure). Three distinct outcome signals instead of the binary success/fail most queues offer. Once your `/ack`, `/nack`, or `/defer` callback returns 200, the outcome is guaranteed to be acted on. No other managed queue offers this. **Backpressure that never burns retries.** When a downstream returns 429/503/529, the job is deferred and redelivered after the `retryAfter` delay — no attempt counted against `maxAttempts`. A job can be deferred 100 times during a sustained API outage and still complete when capacity returns. The retry budget is reserved for real failures only. **Push delivery.** Unlike SQS (pull-based), SimpleQ POSTs jobs to your webhook. No polling infrastructure, no message-deletion API. Your worker is a plain HTTP endpoint — deploy it anywhere, in any framework, on any cloud. **Queue templates for AI workloads.** `template: "anthropic"` or `template: "openai"` — one field in the create-queue POST configures ack mode, timeout, retry budget, and backoff tuned for each provider's API. Anthropic: 600s timeout for long Claude generations. OpenAI: 300s timeout for faster completions. Both handle rate-limit backpressure via defer. **Publish-boundary idempotency.** Duplicate publish returns the existing job with its canonical ID. Safe-to-retry publishers without client-side dedup tracking. **Per-queue signing secrets.** HMAC-SHA256 with a secret scoped to each queue, not the org. Different teams can own different queues without sharing credentials. If one secret leaks, only that queue is affected. **Per-job attempt history.** Every attempt logged with status, error, webhookStatusCode, and timestamp. Debug attempt 3 of 5 without digging through aggregated metrics. **Seconds convention.** All customer-facing durations are in seconds — matching what Anthropic, OpenAI, and Google return in their `Retry-After` headers. Zero conversion math when relaying backpressure from a downstream provider to SimpleQ's `/defer` endpoint. **DLQ with replay.** Dead-letter queue with single and bulk replay via API. Inspect what failed, fix the root cause, replay — no manual re-publishing. ## Detailed comparisons For deeper trade-off analysis and "when to choose" guidance, see the comparison pages on the marketing site: - [SimpleQ vs AWS SQS](https://simpleq.io/compare/vs-sqs) - [SimpleQ vs Google Cloud Tasks](https://simpleq.io/compare/vs-cloud-tasks) - [SimpleQ vs BullMQ](https://simpleq.io/compare/vs-bullmq) - [SimpleQ vs Inngest](https://simpleq.io/compare/vs-inngest) - [SimpleQ vs Trigger.dev](https://simpleq.io/compare/vs-trigger-dev) - [SimpleQ vs Apache Kafka](https://simpleq.io/compare/vs-kafka) - [SimpleQ vs RabbitMQ](https://simpleq.io/compare/vs-rabbitmq) --- ## Idempotency Source: https://docs.simpleq.io/concepts/idempotency SimpleQ collapses duplicate publishes at the API boundary. Set `idempotencyKey` on publish, and any later publish with the same key on the same queue returns the existing job — same job ID, same payload, no duplicate work queued. Your publisher becomes safe to retry without coordination. ## How to use `idempotencyKey` Pass `idempotencyKey` alongside `payload` on publish: ::: code-group ```bash [cURL] curl -X POST "$SQ_API_URL/v1/queues/my-queue/jobs" \ -H "Authorization: Bearer $SQ_API_KEY" \ -H "content-type: application/json" \ -d '{ "idempotencyKey": "welcome-email-user-123", "payload": { "userId": "123" } }' ``` ```js [Node] const res = await fetch(`${process.env.SQ_API_URL}/v1/queues/my-queue/jobs`, { method: 'POST', headers: { 'content-type': 'application/json', authorization: `Bearer ${process.env.SQ_API_KEY}` }, body: JSON.stringify({ idempotencyKey: 'welcome-email-user-123', payload: { userId: '123' }, }), }); const job = await res.json(); ``` ```python [Python] import os, httpx res = httpx.post( f"{os.environ['SQ_API_URL']}/v1/queues/my-queue/jobs", headers={"authorization": f"Bearer {os.environ['SQ_API_KEY']}"}, json={ "idempotencyKey": "welcome-email-user-123", "payload": {"userId": "123"}, }, ) job = res.json() ``` ::: Use a key that's unique to the **business action** you're triggering: - `welcome-email-user-{userId}` — sending the welcome email exactly once per user - `summarize-doc-{docId}-v{revision}` — summarizing a specific document revision - `daily-digest-{userId}-{YYYY-MM-DD}` — daily-cadence work The response always returns the canonical job (the first one, on a duplicate publish), so you can read the job ID and poll its status without needing to track dedup state on your end. ## Why this matters at the boundary A reliable publisher is one that can safely retry: a network blip, an app restart mid-request, an ambiguous timeout. With `idempotencyKey` in place, every one of those retries is harmless — SimpleQ collapses the duplicates and returns the existing job. Your client code doesn't need to remember what it already sent, and you never accidentally fire the same business action twice because the network burped. ## Reliable transport, so your product can move The publish boundary is where reliability has to start: if the entry point isn't safe to retry, nothing downstream can recover cleanly. With `idempotencyKey` collapsing duplicates and durable delivery handling everything that comes after — retries on failure, defer on backpressure, dead-letter as a last resort — your publisher and your workers both have a stable, predictable foundation. You spend your time on what each job *does*, not on building dedup layers around the transport. --- ## Signature verification Source: https://docs.simpleq.io/concepts/signature-verification Every webhook SimpleQ sends carries an `x-simpleq-signature` header containing an HMAC-SHA256 of the raw request body, keyed with your queue's `signingSecret`. Verify it on every request before processing — the signature is what proves the webhook actually came from SimpleQ, not from anyone who happened to learn your webhook URL. This is the same verification model used by Stripe, GitHub, Slack, and most major webhook providers — so the pattern below should look familiar. ## The header ``` x-simpleq-signature: sha256= ``` The value is `sha256=` followed by the lowercase hex digest of `HMAC-SHA256(rawBody, signingSecret)`. The signature is computed over the raw request body as transmitted, so verification reads the same raw bytes. ## Where to find `signingSecret` Each queue has its own `signingSecret`. That scoping is intentional: a worker only needs the secrets for the queue(s) it handles, and if one secret is ever exposed, only that queue is affected — every other queue's signatures stay valid and unbreakable. It also means different teams can own different queues without sharing credentials. The secret is generated when the queue is created and shown to you once at that moment — capture it then. There are two paths: - **Dashboard** — when you click *Create queue* in the dashboard, the next screen shows the signing secret with a Copy button and the message "copy it now — it won't be shown again." Copy it into your worker's environment (env var, secrets manager) before navigating away. - **API** — the `POST /v1/queues` response includes `signingSecret` in the JSON body. Read it from the response and store it. After that initial reveal, the secret is no longer surfaced by `GET /v1/queues` or `GET /v1/queues/:id` — it stays in your secrets store, where it can do its job without re-exposure. ## Verifying in Express Register `express.raw({ type: 'application/json' })` on the webhook route specifically. The handler receives `req.body` as a `Buffer` (the raw bytes), which is what the signature was computed against. This is the same shape Stripe's Node webhook quickstart uses: ```js import express from 'express'; import crypto from 'node:crypto'; const app = express(); const SIGNING_SECRET = process.env.SQ_SIGNING_SECRET; function verifySignature(rawBody, header) { if (!header) return false; const expected = 'sha256=' + crypto.createHmac('sha256', SIGNING_SECRET).update(rawBody).digest('hex'); const a = Buffer.from(header); const b = Buffer.from(expected); return a.length === b.length && crypto.timingSafeEqual(a, b); } app.post('/webhook', express.raw({ type: 'application/json' }), (req, res) => { if (!verifySignature(req.body, req.headers['x-simpleq-signature'])) { return res.status(401).end(); } const job = JSON.parse(req.body.toString('utf8')); // ... handle the job ... res.status(200).end(); }, ); // Other routes can use global JSON parsing as normal. app.use(express.json()); ``` Two details worth knowing: 1. **`crypto.timingSafeEqual`** — comparing HMACs with `===` leaks information about which byte differed via timing. `timingSafeEqual` compares in constant time. Always use it for signature checks. 2. **Length check before `timingSafeEqual`** — `timingSafeEqual` throws if the two buffers have different lengths. The length check short-circuits cleanly when the header is malformed or missing. ## Verifying in FastAPI (Python) The trap to avoid in FastAPI is the same one Express's `express.raw()` sidesteps: you must verify against the *raw* request bytes. Declaring a Pydantic model (or calling `await request.json()`) hands you re-serialized data — whitespace, key order, and number formatting may all differ from what SimpleQ signed, so the HMAC won't match. Read the body with `await request.body()` first, verify against those exact bytes, and only then parse: ```python import hashlib import hmac import json import os from fastapi import FastAPI, Header, Request, Response app = FastAPI() SIGNING_SECRET = os.environ["SQ_SIGNING_SECRET"] def verify_signature(raw_body: bytes, header: str | None) -> bool: if not header: return False expected = "sha256=" + hmac.new( SIGNING_SECRET.encode(), raw_body, hashlib.sha256 ).hexdigest() # Constant-time compare; tolerates differing lengths without throwing. return hmac.compare_digest(expected, header) @app.post("/webhook") async def webhook( request: Request, x_simpleq_signature: str | None = Header(default=None), ): # Read the raw bytes BEFORE any parsing — this is what the signature # was computed over. Do not declare a Pydantic body param here. raw = await request.body() if not verify_signature(raw, x_simpleq_signature): return Response(status_code=401) job = json.loads(raw) # ... handle the job ... return Response(status_code=200) ``` Two details worth knowing: 1. **`hmac.compare_digest`** — comparing HMACs with `==` leaks information about which byte differed via timing. `compare_digest` compares in constant time. Always use it for signature checks (it's Python's equivalent of Node's `crypto.timingSafeEqual`). 2. **Don't verify against a parsed body** — if you let FastAPI parse the JSON (a Pydantic model param, or `await request.json()`) and then re-serialize it to compute the HMAC, the bytes won't match what SimpleQ signed. Compute the HMAC over the original `raw` bytes, then `json.loads(raw)` for your own use. ## On a failed signature: return 401 and stop If the signature doesn't match, return `401` and do not process the body. A failed signature means one of: - The request didn't come from SimpleQ. - The body was modified in transit. - Your `signingSecret` doesn't match the queue's current secret. In all three cases, processing the payload would be unsafe. SimpleQ won't treat a 401 as job failure — it indicates an authentication problem on the receiver, not a job-level outcome. ## How this fits the trust model The signing secret is the contract: SimpleQ holds it, you hold it, and no one else can forge a message that verifies. As long as your handler checks `x-simpleq-signature` on every request and `signingSecret` stays confidential, the only requests your worker ever processes are real SimpleQ deliveries — verified for both authenticity (it came from us) and integrity (the body wasn't tampered with in transit). --- ## Webhook delivery Source: https://docs.simpleq.io/concepts/webhook-delivery When a job is ready, SimpleQ makes an HTTP `POST` to the `webhookUrl` you configured on the queue. This page is the authoritative contract for that request: the method, the headers, and the exact body your endpoint receives. Your application data is delivered **inside an envelope**, not as the raw request body. SimpleQ wraps your `payload` together with delivery metadata (job id, attempt counters, timestamps), so your handler reads `body.payload`, not `body` directly. ## The request ``` POST Content-Type: application/json x-simpleq-signature: sha256= ``` - `x-simpleq-signature` — HMAC-SHA256 of the raw body, keyed with the queue's `signingSecret`. Verify it on every request before processing. See [Signature verification](/concepts/signature-verification). - The connection has a **15-second timeout** in standard mode. Standard mode is for fast handlers; work that can exceed 15s belongs in [ack mode](/concepts/ack-mode). ## The body The body is a single JSON object — the delivery envelope: ```json { "id": "job_abc123", "queue": "ai-jobs", "payload": { "model": "gpt-4o-mini", "messages": [{ "role": "user", "content": "..." }] }, "attempt": 1, "maxAttempts": 5, "createdAt": "2025-01-15T10:30:00.000Z" } ``` | Field | Type | Meaning | | --- | --- | --- | | `id` | string | The job ID. Use it for the ack-mode callbacks (`POST /v1/jobs/:id/ack` \| `/nack` \| `/defer`) and as a stable dedupe key for [idempotent processing](/concepts/idempotency). | | `queue` | string | The queue this job belongs to. Useful when one worker endpoint serves multiple queues. | | `payload` | object | Your data, exactly as published — delivered verbatim. **This is where your application data lives.** | | `attempt` | number | Which delivery attempt this is, starting at `1`. | | `maxAttempts` | number | The queue's configured maximum attempts. When `attempt === maxAttempts`, a non-2xx response sends the job to the dead-letter queue (if enabled). | | `createdAt` | string | ISO-8601 timestamp of when the job was originally published. Use it to detect stale work. | ## Your response decides what happens next | You return | SimpleQ does | | --- | --- | | `2xx` within 15s | **Standard mode:** the job is marked completed. **Ack mode:** the job moves to `awaiting_ack` and waits for your `/ack`, `/nack`, or `/defer` callback — a `2xx` only confirms receipt, not completion. See [Ack mode](/concepts/ack-mode). | | `429`, `503`, or `529` | Treated as **backpressure**: SimpleQ backs off (honoring a `Retry-After` header if present) and redelivers **without burning an attempt**. See [Backpressure](/concepts/backpressure). | | Any other non-2xx, or no response within 15s | Counts as a **failed attempt**. SimpleQ retries with backoff up to `maxAttempts`, then dead-letters the job (if the DLQ is enabled). | ## A minimal handler Verify the signature, parse the envelope, read `payload`, return `2xx`: ::: code-group ```js [Express] import express from 'express'; import crypto from 'node:crypto'; const app = express(); const SIGNING_SECRET = process.env.SQ_SIGNING_SECRET; function verifySignature(rawBody, header) { if (!header) return false; const expected = 'sha256=' + crypto.createHmac('sha256', SIGNING_SECRET).update(rawBody).digest('hex'); const a = Buffer.from(header); const b = Buffer.from(expected); return a.length === b.length && crypto.timingSafeEqual(a, b); } // Raw body is required so the signature is verified against the exact bytes sent. app.post('/webhook', express.raw({ type: 'application/json' }), (req, res) => { if (!verifySignature(req.body, req.headers['x-simpleq-signature'])) { return res.status(401).end(); } const job = JSON.parse(req.body.toString('utf8')); // job.id, job.queue, job.attempt, job.maxAttempts, job.createdAt const isLastAttempt = job.attempt === job.maxAttempts; doWork(job.payload); // your data lives under .payload res.status(200).end(); }); ``` ```python [Python (FastAPI)] import hashlib import hmac import json import os from fastapi import FastAPI, Header, Request, Response app = FastAPI() SIGNING_SECRET = os.environ["SQ_SIGNING_SECRET"] def verify_signature(raw_body: bytes, header: str | None) -> bool: if not header: return False expected = "sha256=" + hmac.new(SIGNING_SECRET.encode(), raw_body, hashlib.sha256).hexdigest() return hmac.compare_digest(expected, header) @app.post("/webhook") async def webhook(request: Request, x_simpleq_signature: str | None = Header(default=None)): # Read the raw bytes BEFORE parsing — that's what the signature was computed over. raw = await request.body() if not verify_signature(raw, x_simpleq_signature): return Response(status_code=401) job = json.loads(raw) # job["id"], job["queue"], job["attempt"], job["maxAttempts"], job["createdAt"] is_last_attempt = job["attempt"] == job["maxAttempts"] do_work(job["payload"]) # your data lives under "payload" return Response(status_code=200) ``` ::: For complete, copy-paste handlers across Node (Express), Next.js, Python (FastAPI), Go, Java, PHP, and C#/.NET — plus rate-limit handling — see the [examples](/examples/). ## Source of truth The envelope is the `WebhookPayload` type in the SimpleQ platform; this page mirrors it field-for-field. If you ever see a field here that your handler doesn't expect, prefer the live delivery — and the field list above stays in lockstep with the type. --- # Examples ## Anthropic queue Source: https://docs.simpleq.io/examples/anthropic-queue/ A queue pre-configured for Anthropic API workloads, plus a reference worker. The template encodes everything universally true about a Claude workload — ack mode, long `ackTimeout`, retry budget that doesn't get burned by rate limits — so you don't have to choose queue config. The worker translates Anthropic's error signals (`Retry-After`, 529 overload) into SimpleQ's `ack` / `nack` / `defer` callbacks. Together: rate-aware Claude jobs in three POST requests. ::: tip Prerequisite This example uses an [Anthropic API key](https://console.anthropic.com/account/keys) and a SimpleQ API key (create one on the dashboard's API Keys page). ::: ## Create the queue ::: code-group ```bash [cURL] curl -X POST "$SQ_API_URL/v1/queues" \ -H "Authorization: Bearer $SQ_API_KEY" \ -H "content-type: application/json" \ -d '{ "template": "anthropic", "name": "claude", "webhookUrl": "https://your-worker.example.com/webhook" }' ``` ```js [Node] const res = await fetch(`${process.env.SQ_API_URL}/v1/queues`, { method: 'POST', headers: { 'content-type': 'application/json', authorization: `Bearer ${process.env.SQ_API_KEY}` }, body: JSON.stringify({ template: 'anthropic', name: 'claude', webhookUrl: 'https://your-worker.example.com/webhook', }), }); const queue = await res.json(); ``` ```python [Python] import os, httpx res = httpx.post( f"{os.environ['SQ_API_URL']}/v1/queues", headers={"authorization": f"Bearer {os.environ['SQ_API_KEY']}"}, json={ "template": "anthropic", "name": "claude", "webhookUrl": "https://your-worker.example.com/webhook", }, ) queue = res.json() ``` ::: Three fields. `template: "anthropic"` applies the Anthropic defaults; `name` and `webhookUrl` are yours. Override any default by passing it alongside `template` (e.g. `"concurrency": 50` to raise from the default of 20). ::: details What this template expands to | Field | Value | Why | | --- | --- | --- | | `mode` | `"ack"` | Claude calls routinely exceed the 15s webhook ceiling. Ack mode lets the worker return 200 immediately and report the real outcome via `/ack`, `/nack`, or `/defer`. | | `maxAttempts` | `4` | Budgets only genuine failures. 429/503/529 are deferred and don't count against this. | | `concurrency` | `20` | How many in-flight Claude calls at once. Throughput ≈ concurrency ÷ average call duration in seconds. Raise to saturate a high tier. | | `ackTimeout` | `600` (10 min) | Sized above a long Claude generation (extended thinking + large output). | | `ackTimeoutAction` | `"retry"` | If the worker dies silently, redeliver. | | `backoffType` / `backoffDelay` | `"exponential"` / `2`s | Spacing for genuine transient failures (`nack` with `retryable: true`). | | `dlqEnabled` | `true` | Exhausted retries or hard nacks land in the DLQ for inspection. | Rate limiting is deliberately not set — that's account-tier-specific. `defer` on 429/503/529 handles real backpressure. If you want a proactive guard, see [Add a proactive RPM guard](#add-a-proactive-rpm-guard) below. ::: ## Publish a job The payload is passed straight to `anthropic.messages.create()` by the worker, so it's just an Anthropic Messages body: ::: code-group ```bash [cURL] curl -X POST "$SQ_API_URL/v1/queues/claude/jobs" \ -H "Authorization: Bearer $SQ_API_KEY" \ -H "content-type: application/json" \ -d '{ "idempotencyKey": "summarize-doc-4815", "payload": { "model": "claude-sonnet-4-6", "max_tokens": 1024, "messages": [{ "role": "user", "content": "Summarize: ..." }] } }' ``` ```js [Node] const res = await fetch(`${process.env.SQ_API_URL}/v1/queues/claude/jobs`, { method: 'POST', headers: { 'content-type': 'application/json', authorization: `Bearer ${process.env.SQ_API_KEY}` }, body: JSON.stringify({ idempotencyKey: 'summarize-doc-4815', payload: { model: 'claude-sonnet-4-6', max_tokens: 1024, messages: [{ role: 'user', content: 'Summarize: ...' }], }, }), }); const job = await res.json(); ``` ```python [Python] import os, httpx res = httpx.post( f"{os.environ['SQ_API_URL']}/v1/queues/claude/jobs", headers={"authorization": f"Bearer {os.environ['SQ_API_KEY']}"}, json={ "idempotencyKey": "summarize-doc-4815", "payload": { "model": "claude-sonnet-4-6", "max_tokens": 1024, "messages": [{"role": "user", "content": "Summarize: ..."}], }, }, ) job = res.json() ``` ::: `idempotencyKey` dedupes the publish call — a double-publish returns the existing job. See [Idempotency](/concepts/idempotency) for the broader pattern. ## The worker Same shape as the [generic ack worker](/examples/generic-ack-worker/), specialized to Anthropic's error semantics. Pick the tab for your framework. ::: code-group ```js [Express] // worker.mjs import express from 'express'; import crypto from 'node:crypto'; import Anthropic from '@anthropic-ai/sdk'; const anthropic = new Anthropic({ maxRetries: 0 }); const { SQ_API_URL, SQ_API_KEY, SQ_SIGNING_SECRET } = process.env; const app = express(); function verifySignature(rawBody, header) { if (!header) return false; const expected = 'sha256=' + crypto.createHmac('sha256', SQ_SIGNING_SECRET).update(rawBody).digest('hex'); const a = Buffer.from(header); const b = Buffer.from(expected); return a.length === b.length && crypto.timingSafeEqual(a, b); } async function callback(jobId, kind, body = {}) { await fetch(`${SQ_API_URL}/v1/jobs/${jobId}/${kind}`, { method: 'POST', headers: { 'content-type': 'application/json', authorization: `Bearer ${SQ_API_KEY}` }, body: JSON.stringify(body), }); } app.post('/webhook', express.raw({ type: 'application/json' }), async (req, res) => { if (!verifySignature(req.body, req.headers['x-simpleq-signature'])) { return res.status(401).end(); } res.status(200).end(); const job = JSON.parse(req.body.toString('utf8')); try { const message = await anthropic.messages.create(job.payload); console.log(`[${job.id}] completed: ${message.usage?.output_tokens} output tokens`); await callback(job.id, 'ack'); } catch (err) { const status = err?.status; if (status === 429 || status === 503) { await callback(job.id, 'defer', { retryAfter: parseInt(err.headers?.['retry-after']) || 10, reason: `anthropic ${status}` }); } else if (status === 529) { await callback(job.id, 'defer', { retryAfter: 5, reason: 'anthropic 529 overloaded' }); } else if (status >= 400 && status < 500) { await callback(job.id, 'nack', { retryable: false, reason: `anthropic ${status}` }); } else { await callback(job.id, 'nack', { retryable: true, reason: `anthropic ${status ?? 'network'}` }); } } }); app.use(express.json()); app.listen(9000, () => console.log('anthropic ack worker on :9000')); ``` ```ts [Next.js (App Router)] // app/api/simpleq-webhook/route.ts import crypto from 'node:crypto'; import Anthropic from '@anthropic-ai/sdk'; const anthropic = new Anthropic({ maxRetries: 0 }); const { SQ_API_URL, SQ_API_KEY, SQ_SIGNING_SECRET } = process.env; function verifySignature(rawBody: Buffer, header: string | null): boolean { if (!header || !SQ_SIGNING_SECRET) return false; const expected = 'sha256=' + crypto.createHmac('sha256', SQ_SIGNING_SECRET).update(rawBody).digest('hex'); const a = Buffer.from(header); const b = Buffer.from(expected); return a.length === b.length && crypto.timingSafeEqual(a, b); } async function callback(jobId: string, kind: string, body: Record = {}) { await fetch(`${SQ_API_URL}/v1/jobs/${jobId}/${kind}`, { method: 'POST', headers: { 'content-type': 'application/json', authorization: `Bearer ${SQ_API_KEY}` }, body: JSON.stringify(body), }); } export async function POST(req: Request) { const rawBody = Buffer.from(await req.arrayBuffer()); if (!verifySignature(rawBody, req.headers.get('x-simpleq-signature'))) { return new Response(null, { status: 401 }); } const job = JSON.parse(rawBody.toString('utf8')); const response = new Response(null, { status: 200 }); (async () => { try { const message = await anthropic.messages.create(job.payload); console.log(`[${job.id}] completed: ${message.usage?.output_tokens} output tokens`); await callback(job.id, 'ack'); } catch (err: any) { const status = err?.status; if (status === 429 || status === 503) { await callback(job.id, 'defer', { retryAfter: parseInt(err.headers?.['retry-after']) || 10, reason: `anthropic ${status}` }); } else if (status === 529) { await callback(job.id, 'defer', { retryAfter: 5, reason: 'anthropic 529 overloaded' }); } else if (status >= 400 && status < 500) { await callback(job.id, 'nack', { retryable: false, reason: `anthropic ${status}` }); } else { await callback(job.id, 'nack', { retryable: true, reason: `anthropic ${status ?? 'network'}` }); } } })(); return response; } ``` ```python [Python (FastAPI)] # worker.py import hashlib import hmac import json import os import httpx from anthropic import APIStatusError, AsyncAnthropic from fastapi import BackgroundTasks, FastAPI, Header, Request, Response anthropic = AsyncAnthropic(max_retries=0) SQ_API_URL = os.environ["SQ_API_URL"] SQ_API_KEY = os.environ["SQ_API_KEY"] SQ_SIGNING_SECRET = os.environ["SQ_SIGNING_SECRET"] app = FastAPI() def verify_signature(raw_body: bytes, header: str | None) -> bool: if not header: return False expected = "sha256=" + hmac.new(SQ_SIGNING_SECRET.encode(), raw_body, hashlib.sha256).hexdigest() return hmac.compare_digest(expected, header) def parse_retry_after(value: str | None, fallback: int = 10) -> int: try: return int(value) or fallback except (TypeError, ValueError): return fallback # Reuse one client across callbacks for connection pooling. _client = httpx.AsyncClient(timeout=10.0) async def callback(job_id: str, kind: str, body: dict | None = None) -> None: await _client.post( f"{SQ_API_URL}/v1/jobs/{job_id}/{kind}", headers={"content-type": "application/json", "authorization": f"Bearer {SQ_API_KEY}"}, json=body or {}, ) async def process(job: dict) -> None: try: # AsyncAnthropic so a long generation never blocks the event loop. message = await anthropic.messages.create(**job["payload"]) print(f"[{job['id']}] completed: {message.usage.output_tokens} output tokens") await callback(job["id"], "ack") except APIStatusError as err: status = err.status_code if status in (429, 503): retry_after = parse_retry_after(err.response.headers.get("retry-after")) await callback(job["id"], "defer", {"retryAfter": retry_after, "reason": f"anthropic {status}"}) elif status == 529: # overloaded await callback(job["id"], "defer", {"retryAfter": 5, "reason": "anthropic 529 overloaded"}) elif 400 <= status < 500: await callback(job["id"], "nack", {"retryable": False, "reason": f"anthropic {status}"}) else: await callback(job["id"], "nack", {"retryable": True, "reason": f"anthropic {status}"}) except Exception: await callback(job["id"], "nack", {"retryable": True, "reason": "anthropic network"}) @app.post("/webhook") async def webhook( request: Request, background_tasks: BackgroundTasks, x_simpleq_signature: str | None = Header(default=None), ): # Verify over the raw bytes BEFORE parsing. raw = await request.body() if not verify_signature(raw, x_simpleq_signature): return Response(status_code=401) job = json.loads(raw) # BackgroundTasks runs in-process — if this worker crashes mid-job the # work is lost, but ackTimeout + ackTimeoutAction: "retry" redelivers it. background_tasks.add_task(process, job) return Response(status_code=200) # uvicorn worker:app --port 9000 ``` ::: ## Run it ::: code-group ```bash [Node] npm init -y npm i express @anthropic-ai/sdk ANTHROPIC_API_KEY=sk-ant-... \ SQ_API_URL=https://api.simpleq.io \ SQ_API_KEY=sq_live_... \ SQ_SIGNING_SECRET=... \ node worker.mjs ``` ```bash [Python] pip install fastapi uvicorn httpx anthropic ANTHROPIC_API_KEY=sk-ant-... \ SQ_API_URL=https://api.simpleq.io \ SQ_API_KEY=sq_live_... \ SQ_SIGNING_SECRET=... \ uvicorn worker:app --port 9000 ``` ::: Swap `express` (or FastAPI) for your framework of choice. The Anthropic SDK is the same across all tabs. - **`ANTHROPIC_API_KEY`** — from [console.anthropic.com](https://console.anthropic.com/account/keys). - **`SQ_API_URL`** — the SimpleQ API base your worker calls back to: `https://api.simpleq.io`. - **`SQ_API_KEY`** — create one on the dashboard's API Keys page. - **`SQ_SIGNING_SECRET`** — your queue's signing secret. See [Signature verification](/concepts/signature-verification) for where to find it. Point your queue's `webhookUrl` at your worker endpoint (e.g. `http://:9000/webhook`). ## Error routing | Anthropic response | Worker calls | Effect | | --- | --- | --- | | Success | `/ack` | Job → `completed`. | | 429 / 503 (rate-limited / unavailable) | `/defer` with `Retry-After` relayed | Held and redelivered; no attempt burned. | | 529 (overloaded) | `/defer` with 5s fallback | Held and redelivered; no attempt burned. | | 4xx (bad request, auth, not found) | `/nack` `{retryable: false}` | Job → `dead`, lands in DLQ. | | Non-529 5xx, network, SDK timeout | `/nack` `{retryable: true}` | Retried with backoff; counts against `maxAttempts`. | ## Three gotchas worth internalizing **1. Disable the SDK's own retries (`maxRetries: 0`).** The Anthropic SDK retries 429 and ≥500 twice by default with its own backoff. That swallows the error, you never see the `Retry-After`, and you can't relay it to `/defer`. Worse, the SDK's retries and SimpleQ's redelivery fight each other (the classic nested-retry deadlock). With retries off, the 429 surfaces in your `catch`, you read its `Retry-After`, and SimpleQ owns the backoff. The example sets this; don't undo it. **2. 529 never burns an attempt — by design, with a tail.** A 529 is Anthropic being overloaded: not your fault, not billed, no trustworthy `Retry-After`. The worker defers it, so it costs nothing against `maxAttempts`. The tradeoff: during a sustained Anthropic outage the job waits politely rather than dying. If you want a hard ceiling, count 529s in the worker and switch to `nack` after N — or fail over to the same model on Bedrock / Vertex, which run on separate capacity. **3. The template is workload-shaped, not account-shaped.** The defaults encode facts true for any Anthropic workload (ack mode, long `ackTimeout`, retry budget). They don't encode your account's RPM tier — that's account-shaped, not workload-shaped, and tuning is below. ## Add a proactive RPM guard If you know your Anthropic tier and want SimpleQ to space out deliveries below your RPM ceiling, set `rateLimitMax` to ~80% of your RPM and `rateLimitWindow` to 60: ```bash curl -X PUT "$SQ_API_URL/v1/queues/" \ -H "Authorization: Bearer $SQ_API_KEY" \ -H "content-type: application/json" \ -d '{ "rateLimitMax": 800, "rateLimitWindow": 60 }' ``` Two caveats worth knowing: the limiter is per-worker (N workers running = N × the configured rate), and Anthropic's TPM usually binds before RPM — a request counter can't see token cost. The proactive guard shaves the count of 429s; `defer` is what actually saves you at a real limit. --- ## Generic ack worker Source: https://docs.simpleq.io/examples/generic-ack-worker/ A minimal worker that implements the SimpleQ ack-mode protocol with plain HTTP and no provider SDK. It receives a webhook, verifies the signature, responds 200 immediately, runs the work out of band, and reports back with `ack` / `nack` / `defer`. This is the shortest path to a working ack-mode worker. Drop in your own downstream call and you have the integration. ::: tip Prerequisite This example requires a queue created with `mode: 'ack'`. Standard-mode queues use a different flow — see [Ack mode](/concepts/ack-mode) for how the two differ. ::: ## The worker Pick the tab for your language — each tab is a complete, single-file worker you can copy and run. ::: code-group ```js [Node (Express)] import express from 'express'; import crypto from 'node:crypto'; const { SQ_API_URL, SQ_API_KEY, SQ_SIGNING_SECRET } = process.env; const app = express(); // Verify the webhook came from SimpleQ. // See https://docs.simpleq.io/concepts/signature-verification for the full breakdown. function verifySignature(rawBody, header) { if (!header) return false; const expected = 'sha256=' + crypto.createHmac('sha256', SQ_SIGNING_SECRET).update(rawBody).digest('hex'); const a = Buffer.from(header); const b = Buffer.from(expected); return a.length === b.length && crypto.timingSafeEqual(a, b); } // Send /ack, /nack, or /defer back to SimpleQ. async function callback(jobId, kind, body = {}) { const res = await fetch(`${SQ_API_URL}/v1/jobs/${jobId}/${kind}`, { method: 'POST', headers: { 'content-type': 'application/json', authorization: `Bearer ${SQ_API_KEY}`, }, body: JSON.stringify(body), }); if (!res.ok) { console.error(`[${jobId}] ${kind} failed: ${res.status} ${await res.text()}`); } } // Replace this with your real work. // Throw an error with a numeric `.status` property to route HTTP-shaped failures: // 429 / 503 / 529 → defer (backpressure, no attempt burned) // 4xx → nack non-retryable (hard failure) // 5xx / network → nack retryable (retried with backoff) async function doTheWork(payload) { // your code here } app.post( '/webhook', express.raw({ type: 'application/json' }), async (req, res) => { if (!verifySignature(req.body, req.headers['x-simpleq-signature'])) { return res.status(401).end(); } // Ack mode: 200 immediately, then run the work out of band. res.status(200).end(); const job = JSON.parse(req.body.toString('utf8')); try { await processJob(job); } catch (err) { console.error(`[${job.id}] unhandled:`, err); } }, ); async function processJob(job) { try { await doTheWork(job.payload); await callback(job.id, 'ack'); } catch (err) { const status = err?.status; if (status === 429 || status === 503) { await callback(job.id, 'defer', { retryAfter: parseInt(err.headers?.['retry-after']) || 10, reason: `downstream ${status}`, }); } else if (status === 529) { await callback(job.id, 'defer', { retryAfter: 5, reason: 'downstream 529 overloaded' }); } else if (status >= 400 && status < 500) { await callback(job.id, 'nack', { retryable: false, reason: `downstream ${status}` }); } else { await callback(job.id, 'nack', { retryable: true, reason: `downstream ${status ?? 'network'}: ${err?.message ?? 'unknown'}`, }); } } } // Other routes can use global JSON parsing as normal. app.use(express.json()); app.listen(9000, () => { console.log(`generic ack worker on :9000 → ${SQ_API_URL}`); }); ``` ```python [Python (FastAPI)] import hashlib import hmac import json import os import httpx from fastapi import BackgroundTasks, FastAPI, Header, Request, Response SQ_API_URL = os.environ["SQ_API_URL"] SQ_API_KEY = os.environ["SQ_API_KEY"] SQ_SIGNING_SECRET = os.environ["SQ_SIGNING_SECRET"] app = FastAPI() # Raise this from do_the_work to route HTTP-shaped downstream failures. class DownstreamError(Exception): def __init__(self, status=None, message="", retry_after=None): super().__init__(message or f"downstream {status}") self.status = status self.retry_after = retry_after # Verify the webhook came from SimpleQ. # See https://docs.simpleq.io/concepts/signature-verification for the full breakdown. def verify_signature(raw_body: bytes, header: str | None) -> bool: if not header: return False expected = "sha256=" + hmac.new(SQ_SIGNING_SECRET.encode(), raw_body, hashlib.sha256).hexdigest() return hmac.compare_digest(expected, header) # Send /ack, /nack, or /defer back to SimpleQ. # Reuse one client across callbacks for connection pooling. _client = httpx.AsyncClient(timeout=10.0) async def callback(job_id: str, kind: str, body: dict | None = None) -> None: res = await _client.post( f"{SQ_API_URL}/v1/jobs/{job_id}/{kind}", headers={ "content-type": "application/json", "authorization": f"Bearer {SQ_API_KEY}", }, json=body or {}, ) if res.is_error: print(f"[{job_id}] {kind} failed: {res.status_code} {res.text}") # Replace this with your real work. # Raise DownstreamError with a numeric `status` to route HTTP-shaped failures: # 429 / 503 / 529 → defer (backpressure, no attempt burned) # 4xx → nack non-retryable (hard failure) # 5xx / network → nack retryable (retried with backoff) async def do_the_work(payload: dict) -> None: # your code here pass @app.post("/webhook") async def webhook( request: Request, background_tasks: BackgroundTasks, x_simpleq_signature: str | None = Header(default=None), ): # Read the raw body BEFORE parsing — the HMAC is over the exact bytes. raw = await request.body() if not verify_signature(raw, x_simpleq_signature): return Response(status_code=401) # Ack mode: 200 immediately, then run the work out of band. job = json.loads(raw) # BackgroundTasks runs in-process — if this worker crashes mid-job the # work is lost, but ackTimeout + ackTimeoutAction: "retry" redelivers it. background_tasks.add_task(process_job, job) return Response(status_code=200) async def process_job(job: dict) -> None: try: await do_the_work(job["payload"]) await callback(job["id"], "ack") except DownstreamError as err: status = err.status if status == 429 or status == 503: await callback(job["id"], "defer", { "retryAfter": err.retry_after or 10, "reason": f"downstream {status}", }) elif status == 529: await callback(job["id"], "defer", {"retryAfter": 5, "reason": "downstream 529 overloaded"}) elif status is not None and 400 <= status < 500: await callback(job["id"], "nack", {"retryable": False, "reason": f"downstream {status}"}) else: await callback(job["id"], "nack", { "retryable": True, "reason": f"downstream {status or 'network'}: {err}", }) except Exception as err: # Any non-HTTP-shaped error → retry with backoff. await callback(job["id"], "nack", {"retryable": True, "reason": f"downstream network: {err}"}) # Run with: uvicorn worker:app --port 9000 ``` ```go [Go (net/http)] package main import ( "bytes" "crypto/hmac" "crypto/sha256" "encoding/hex" "encoding/json" "fmt" "io" "log" "net/http" "os" "strconv" ) var ( sqAPIURL = os.Getenv("SQ_API_URL") sqAPIKey = os.Getenv("SQ_API_KEY") sqSigningSecret = os.Getenv("SQ_SIGNING_SECRET") ) // A SimpleQ job as delivered to your webhook. Your data is under Payload. type Job struct { ID string `json:"id"` Queue string `json:"queue"` Payload json.RawMessage `json:"payload"` Attempt int `json:"attempt"` MaxAttempts int `json:"maxAttempts"` CreatedAt string `json:"createdAt"` } // Return this from doTheWork to route HTTP-shaped downstream failures. type downstreamError struct { status int // HTTP status from the downstream call (0 = network) message string retryAfter int // seconds; 0 = none } func (e *downstreamError) Error() string { if e.message != "" { return e.message } return fmt.Sprintf("downstream %d", e.status) } // Verify the webhook came from SimpleQ. // See https://docs.simpleq.io/concepts/signature-verification for the full breakdown. func verifySignature(rawBody []byte, header string) bool { if header == "" { return false } mac := hmac.New(sha256.New, []byte(sqSigningSecret)) mac.Write(rawBody) expected := "sha256=" + hex.EncodeToString(mac.Sum(nil)) return hmac.Equal([]byte(expected), []byte(header)) } // Send /ack, /nack, or /defer back to SimpleQ. func callback(jobID, kind string, body map[string]any) { if body == nil { body = map[string]any{} } buf, _ := json.Marshal(body) req, _ := http.NewRequest("POST", fmt.Sprintf("%s/v1/jobs/%s/%s", sqAPIURL, jobID, kind), bytes.NewReader(buf)) req.Header.Set("content-type", "application/json") req.Header.Set("authorization", "Bearer "+sqAPIKey) res, err := http.DefaultClient.Do(req) if err != nil { log.Printf("[%s] %s failed: %v", jobID, kind, err) return } defer res.Body.Close() if res.StatusCode >= 400 { msg, _ := io.ReadAll(res.Body) log.Printf("[%s] %s failed: %d %s", jobID, kind, res.StatusCode, msg) } } // Replace this with your real work. // Return a *downstreamError with a numeric status to route HTTP-shaped failures: // 429 / 503 / 529 → defer (backpressure, no attempt burned) // 4xx → nack non-retryable (hard failure) // 5xx / network → nack retryable (retried with backoff) func doTheWork(payload json.RawMessage) error { // your code here return nil } func webhook(w http.ResponseWriter, r *http.Request) { // Read the raw body BEFORE parsing — the HMAC is over the exact bytes. raw, err := io.ReadAll(r.Body) if err != nil { w.WriteHeader(http.StatusBadRequest) return } if !verifySignature(raw, r.Header.Get("x-simpleq-signature")) { w.WriteHeader(http.StatusUnauthorized) return } var job Job if err := json.Unmarshal(raw, &job); err != nil { w.WriteHeader(http.StatusBadRequest) return } // Ack mode: 200 immediately, then run the work out of band. w.WriteHeader(http.StatusOK) go processJob(job) } func processJob(job Job) { err := doTheWork(job.Payload) if err == nil { callback(job.ID, "ack", nil) return } de, ok := err.(*downstreamError) if !ok { // Any non-HTTP-shaped error → retry with backoff. callback(job.ID, "nack", map[string]any{"retryable": true, "reason": "downstream network: " + err.Error()}) return } status := de.status switch { case status == 429 || status == 503: retryAfter := de.retryAfter if retryAfter == 0 { retryAfter = 10 } callback(job.ID, "defer", map[string]any{ "retryAfter": retryAfter, "reason": fmt.Sprintf("downstream %d", status), }) case status == 529: callback(job.ID, "defer", map[string]any{"retryAfter": 5, "reason": "downstream 529 overloaded"}) case status >= 400 && status < 500: callback(job.ID, "nack", map[string]any{"retryable": false, "reason": fmt.Sprintf("downstream %d", status)}) default: label := strconv.Itoa(status) if status == 0 { label = "network" } callback(job.ID, "nack", map[string]any{ "retryable": true, "reason": fmt.Sprintf("downstream %s: %s", label, de.Error()), }) } } func main() { http.HandleFunc("/webhook", webhook) log.Printf("generic ack worker on :9000 → %s", sqAPIURL) log.Fatal(http.ListenAndServe(":9000", nil)) } ``` ```java [Java (Spring Boot)] // Spring Boot single-file ack worker. Deps (Maven): // org.springframework.boot:spring-boot-starter-web // com.fasterxml.jackson.core:jackson-databind (bundled with starter-web) // Java 17+ (uses java.net.http.HttpClient). package com.example.simpleq; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.stereotype.Component; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestHeader; import org.springframework.web.bind.annotation.RestController; import javax.crypto.Mac; import javax.crypto.spec.SecretKeySpec; import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; import java.nio.charset.StandardCharsets; import java.security.MessageDigest; import java.util.LinkedHashMap; import java.util.Map; @SpringBootApplication @EnableAsync public class Worker { public static void main(String[] args) { // This worker example listens on port 9000 (set server.port=9000). SpringApplication.run(Worker.class, args); System.out.println("generic ack worker on :9000 -> " + System.getenv("SQ_API_URL")); } } @RestController class WebhookController { static final String SQ_SIGNING_SECRET = System.getenv("SQ_SIGNING_SECRET"); static final ObjectMapper MAPPER = new ObjectMapper(); private final JobProcessor processor; WebhookController(JobProcessor processor) { this.processor = processor; } // Verify the webhook came from SimpleQ. // See https://docs.simpleq.io/concepts/signature-verification for the full breakdown. boolean verifySignature(byte[] rawBody, String header) { if (header == null) return false; String expected = "sha256=" + hmacHex(rawBody); // Constant-time compare over the raw bytes. return MessageDigest.isEqual( expected.getBytes(StandardCharsets.UTF_8), header.getBytes(StandardCharsets.UTF_8)); } static String hmacHex(byte[] rawBody) { try { Mac mac = Mac.getInstance("HmacSHA256"); mac.init(new SecretKeySpec(SQ_SIGNING_SECRET.getBytes(StandardCharsets.UTF_8), "HmacSHA256")); byte[] digest = mac.doFinal(rawBody); StringBuilder sb = new StringBuilder(digest.length * 2); for (byte b : digest) sb.append(String.format("%02x", b)); return sb.toString(); } catch (Exception e) { throw new RuntimeException(e); } } @PostMapping(value = "/webhook", consumes = MediaType.ALL_VALUE) ResponseEntity webhook( @RequestBody(required = false) byte[] rawBody, @RequestHeader(value = "x-simpleq-signature", required = false) String signature) { // Read the raw body BEFORE parsing -- the HMAC is over the exact bytes. byte[] raw = rawBody == null ? new byte[0] : rawBody; if (!verifySignature(raw, signature)) { return ResponseEntity.status(HttpStatus.UNAUTHORIZED).build(); } // Ack mode: 200 immediately, then run the work out of band. JsonNode job; try { job = MAPPER.readTree(raw); } catch (Exception e) { return ResponseEntity.status(HttpStatus.BAD_REQUEST).build(); } // Call through the injected proxy bean so @Async actually takes effect // (a same-class self-invocation would bypass the proxy and run synchronously). processor.processJob(job); return ResponseEntity.ok().build(); } } @Component class JobProcessor { static final String SQ_API_URL = System.getenv("SQ_API_URL"); static final String SQ_API_KEY = System.getenv("SQ_API_KEY"); static final ObjectMapper MAPPER = new ObjectMapper(); static final HttpClient HTTP = HttpClient.newHttpClient(); // Raise this from doTheWork to route HTTP-shaped downstream failures. static class DownstreamError extends RuntimeException { final Integer status; final Integer retryAfter; DownstreamError(Integer status, String message, Integer retryAfter) { super(message != null && !message.isEmpty() ? message : "downstream " + status); this.status = status; this.retryAfter = retryAfter; } } // Send /ack, /nack, or /defer back to SimpleQ. void callback(String jobId, String kind, Map body) { try { String json = MAPPER.writeValueAsString(body == null ? Map.of() : body); HttpRequest req = HttpRequest.newBuilder() .uri(URI.create(SQ_API_URL + "/v1/jobs/" + jobId + "/" + kind)) .header("content-type", "application/json") .header("authorization", "Bearer " + SQ_API_KEY) .POST(HttpRequest.BodyPublishers.ofString(json)) .build(); HttpResponse res = HTTP.send(req, HttpResponse.BodyHandlers.ofString()); if (res.statusCode() >= 400) { System.err.printf("[%s] %s failed: %d %s%n", jobId, kind, res.statusCode(), res.body()); } } catch (Exception err) { System.err.printf("[%s] %s failed: %s%n", jobId, kind, err.getMessage()); } } // Replace this with your real work. // Throw DownstreamError with a numeric `status` to route HTTP-shaped failures: // 429 / 503 / 529 -> defer (backpressure, no attempt burned) // 4xx -> nack non-retryable (hard failure) // 5xx / network -> nack retryable (retried with backoff) void doTheWork(JsonNode payload) { // your code here } // @Async runs the work on a separate thread so the 200 returns first. @Async void processJob(JsonNode job) { String jobId = job.path("id").asText(); try { doTheWork(job.path("payload")); callback(jobId, "ack", Map.of()); } catch (DownstreamError err) { Integer status = err.status; if (status != null && (status == 429 || status == 503)) { Map body = new LinkedHashMap<>(); body.put("retryAfter", err.retryAfter != null ? err.retryAfter : 10); body.put("reason", "downstream " + status); callback(jobId, "defer", body); } else if (status != null && status == 529) { Map body = new LinkedHashMap<>(); body.put("retryAfter", 5); body.put("reason", "downstream 529 overloaded"); callback(jobId, "defer", body); } else if (status != null && status >= 400 && status < 500) { Map body = new LinkedHashMap<>(); body.put("retryable", false); body.put("reason", "downstream " + status); callback(jobId, "nack", body); } else { Map body = new LinkedHashMap<>(); body.put("retryable", true); body.put("reason", "downstream " + (status != null ? status : "network") + ": " + err.getMessage()); callback(jobId, "nack", body); } } catch (Exception err) { // Any non-HTTP-shaped error -> retry with backoff. Map body = new LinkedHashMap<>(); body.put("retryable", true); body.put("reason", "downstream network: " + err.getMessage()); callback(jobId, "nack", body); } } } ``` ```php [PHP] status = $status; $this->retry_after = $retry_after; } } // Verify the webhook came from SimpleQ. // See https://docs.simpleq.io/concepts/signature-verification for the full breakdown. function verify_signature($raw_body, $header) { global $SQ_SIGNING_SECRET; if (!$header) return false; $expected = 'sha256=' . hash_hmac('sha256', $raw_body, $SQ_SIGNING_SECRET); // hash_equals is constant-time — never use == on a signature. return hash_equals($expected, $header); } // Send /ack, /nack, or /defer back to SimpleQ. function callback($job_id, $kind, $body = []) { global $SQ_API_URL, $SQ_API_KEY; $ch = curl_init("$SQ_API_URL/v1/jobs/$job_id/$kind"); curl_setopt($ch, CURLOPT_POST, true); curl_setopt($ch, CURLOPT_HTTPHEADER, [ 'content-type: application/json', "authorization: Bearer $SQ_API_KEY", ]); // empty body must still be an object: {} not [] curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode((object) $body)); curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); $res = curl_exec($ch); $code = curl_getinfo($ch, CURLINFO_RESPONSE_CODE); curl_close($ch); if ($code < 200 || $code >= 300) { error_log("[$job_id] $kind failed: $code $res"); } } // Replace this with your real work. // Throw DownstreamError with a numeric `status` to route HTTP-shaped failures: // 429 / 503 / 529 → defer (backpressure, no attempt burned) // 4xx → nack non-retryable (hard failure) // 5xx / network → nack retryable (retried with backoff) function do_the_work($payload) { // your code here } // Read the raw body BEFORE parsing — the HMAC is over the exact bytes. $raw = file_get_contents('php://input'); $header = $_SERVER['HTTP_X_SIMPLEQ_SIGNATURE'] ?? null; if (!verify_signature($raw, $header)) { http_response_code(401); exit; } // Ack mode: 200 immediately, then run the work out of band. http_response_code(200); // KEY PHP GOTCHA: under PHP-FPM, fastcgi_finish_request() flushes the 200 // response to SimpleQ and closes the connection NOW, while this script keeps // running. Without it the response would block on do_the_work below and could // blow the 15s ack ceiling. (On non-FPM SAPIs this function is unavailable — // run worker.php under PHP-FPM for true out-of-band processing.) if (function_exists('fastcgi_finish_request')) { fastcgi_finish_request(); } $job = json_decode($raw, true); try { do_the_work($job['payload']); callback($job['id'], 'ack'); } catch (DownstreamError $err) { $status = $err->status; if ($status === 429 || $status === 503) { callback($job['id'], 'defer', [ 'retryAfter' => $err->retry_after ?: 10, 'reason' => "downstream $status", ]); } elseif ($status === 529) { callback($job['id'], 'defer', ['retryAfter' => 5, 'reason' => 'downstream 529 overloaded']); } elseif ($status !== null && $status >= 400 && $status < 500) { callback($job['id'], 'nack', ['retryable' => false, 'reason' => "downstream $status"]); } else { callback($job['id'], 'nack', [ 'retryable' => true, 'reason' => 'downstream ' . ($status ?? 'network') . ': ' . $err->getMessage(), ]); } } catch (Throwable $err) { // Any non-HTTP-shaped error → retry with backoff. callback($job['id'], 'nack', ['retryable' => true, 'reason' => 'downstream network: ' . $err->getMessage()]); } ``` ```csharp [C#/.NET (ASP.NET Core)] using System.Security.Cryptography; using System.Text; using System.Text.Json; var SQ_API_URL = Environment.GetEnvironmentVariable("SQ_API_URL")!; var SQ_API_KEY = Environment.GetEnvironmentVariable("SQ_API_KEY")!; var SQ_SIGNING_SECRET = Environment.GetEnvironmentVariable("SQ_SIGNING_SECRET")!; var http = new HttpClient(); var builder = WebApplication.CreateBuilder(args); var app = builder.Build(); // Verify the webhook came from SimpleQ. // See https://docs.simpleq.io/concepts/signature-verification for the full breakdown. bool VerifySignature(byte[] rawBody, string? header) { if (string.IsNullOrEmpty(header)) return false; using var hmac = new HMACSHA256(Encoding.UTF8.GetBytes(SQ_SIGNING_SECRET)); var digest = Convert.ToHexString(hmac.ComputeHash(rawBody)).ToLowerInvariant(); var expected = Encoding.UTF8.GetBytes("sha256=" + digest); var actual = Encoding.UTF8.GetBytes(header); // Constant-time compare; FixedTimeEquals also handles the length check. return CryptographicOperations.FixedTimeEquals(expected, actual); } // Send /ack, /nack, or /defer back to SimpleQ. async Task Callback(string jobId, string kind, object? body = null) { var req = new HttpRequestMessage(HttpMethod.Post, $"{SQ_API_URL}/v1/jobs/{jobId}/{kind}") { Content = new StringContent(JsonSerializer.Serialize(body ?? new { }), Encoding.UTF8, "application/json"), }; req.Headers.Add("Authorization", $"Bearer {SQ_API_KEY}"); var res = await http.SendAsync(req); if (!res.IsSuccessStatusCode) Console.Error.WriteLine($"[{jobId}] {kind} failed: {(int)res.StatusCode} {await res.Content.ReadAsStringAsync()}"); } // Replace this with your real work. // Throw DownstreamException with a numeric `Status` to route HTTP-shaped failures: // 429 / 503 / 529 → defer (backpressure, no attempt burned) // 4xx → nack non-retryable (hard failure) // 5xx / network → nack retryable (retried with backoff) async Task DoTheWork(JsonElement payload) { // your code here await Task.CompletedTask; } async Task ProcessJob(JsonElement job) { var jobId = job.GetProperty("id").GetString()!; try { await DoTheWork(job.GetProperty("payload")); await Callback(jobId, "ack"); } catch (DownstreamException err) { var status = err.Status; if (status == 429 || status == 503) await Callback(jobId, "defer", new { retryAfter = err.RetryAfter ?? 10, reason = $"downstream {status}" }); else if (status == 529) await Callback(jobId, "defer", new { retryAfter = 5, reason = "downstream 529 overloaded" }); else if (status is >= 400 and < 500) await Callback(jobId, "nack", new { retryable = false, reason = $"downstream {status}" }); else await Callback(jobId, "nack", new { retryable = true, reason = $"downstream {(status?.ToString() ?? "network")}: {err.Message}" }); } catch (Exception err) { // Any non-HTTP-shaped error → retry with backoff. await Callback(jobId, "nack", new { retryable = true, reason = $"downstream network: {err.Message}" }); } } app.MapPost("/webhook", async (HttpRequest request) => { // Read the raw body BEFORE parsing — the HMAC is over the exact bytes. using var ms = new MemoryStream(); await request.Body.CopyToAsync(ms); var raw = ms.ToArray(); var header = request.Headers["x-simpleq-signature"].FirstOrDefault(); if (!VerifySignature(raw, header)) return Results.Unauthorized(); // Ack mode: 200 immediately, then run the work out of band. var job = JsonSerializer.Deserialize(raw); // Fire-and-forget: survives the response but NOT a process crash — // ackTimeout + ackTimeoutAction: "retry" redelivers anything lost. _ = Task.Run(async () => { try { await ProcessJob(job); } catch (Exception err) { Console.Error.WriteLine($"[{job.GetProperty("id").GetString()}] unhandled: {err}"); } }); return Results.Ok(); }); Console.WriteLine($"generic ack worker on :9000 → {SQ_API_URL}"); app.Run("http://0.0.0.0:9000"); // Raise this from DoTheWork to route HTTP-shaped downstream failures. // NOTE: type declarations must come AFTER all top-level statements. class DownstreamException : Exception { public int? Status { get; } public int? RetryAfter { get; } public DownstreamException(int? status = null, string message = "", int? retryAfter = null) : base(string.IsNullOrEmpty(message) ? $"downstream {status}" : message) { Status = status; RetryAfter = retryAfter; } } ``` ::: ## Run it ::: code-group ```bash [Node] npm init -y npm i express SQ_API_URL=https://api.simpleq.io \ SQ_API_KEY=sq_live_... \ SQ_SIGNING_SECRET=... \ node worker.mjs ``` ```bash [Python] pip install fastapi uvicorn httpx SQ_API_URL=https://api.simpleq.io \ SQ_API_KEY=sq_live_... \ SQ_SIGNING_SECRET=... \ uvicorn worker:app --port 9000 ``` ```bash [Go] SQ_API_URL=https://api.simpleq.io \ SQ_API_KEY=sq_live_... \ SQ_SIGNING_SECRET=... \ go run worker.go ``` ```bash [Java] # spring-boot-starter-web on the classpath; server.port=9000 SQ_API_URL=https://api.simpleq.io \ SQ_API_KEY=sq_live_... \ SQ_SIGNING_SECRET=... \ ./mvnw spring-boot:run -Dspring-boot.run.arguments=--server.port=9000 ``` ```bash [PHP] # True out-of-band processing needs PHP-FPM behind nginx/Caddy (so # fastcgi_finish_request() exists). For a quick local test: SQ_API_URL=https://api.simpleq.io \ SQ_API_KEY=sq_live_... \ SQ_SIGNING_SECRET=... \ php -S 0.0.0.0:9000 worker.php ``` ```bash [C#/.NET] SQ_API_URL=https://api.simpleq.io \ SQ_API_KEY=sq_live_... \ SQ_SIGNING_SECRET=... \ dotnet run ``` ::: - **`SQ_API_URL`** — the SimpleQ API base your worker calls back to: `https://api.simpleq.io`. - **`SQ_API_KEY`** — create one on the dashboard's **API Keys** page. - **`SQ_SIGNING_SECRET`** — your queue's signing secret. See [Signature verification](/concepts/signature-verification) for where to find it (dashboard reveal or `POST /v1/queues` response, depending on how the queue was created). This worker example listens on port 9000, point your queue's `webhookUrl` to your webhook: `http://:9000/webhook`. ## What each piece does | Piece | Concept | | --- | --- | | Verifying `x-simpleq-signature` over the raw body | [Signature verification](/concepts/signature-verification) — prevents forged webhooks. | | Responding `200` before processing | [Ack mode](/concepts/ack-mode) — acknowledges receipt under the 15s ceiling. | | `ack` callback on success | The job is marked complete. | | `defer` callback on 429 / 503 / 529 | [Backpressure](/concepts/backpressure) — hold and redeliver, no attempt burned. | | `nack` `{ retryable: false }` callback on 4xx | Hard failure. Routed to the DLQ if the queue has `dlqEnabled: true`; otherwise dead-ended. | | `nack` `{ retryable: true }` callback on 5xx / network | Transient failure → retry with backoff. | ## Test it with a publish Once the worker is running, publish a job to trigger a delivery: ```bash curl -X POST "$SQ_API_URL/v1/queues//jobs" \ -H "Authorization: Bearer $SQ_API_KEY" \ -H "content-type: application/json" \ -d '{ "payload": { "hello": "world" }, "idempotencyKey": "test-1" }' ``` You should see a `200` on the worker side and the job transition to `completed` (assuming `doTheWork` succeeded). Watch the queue detail page in the dashboard to see the state change in real time. --- ## OpenAI queue Source: https://docs.simpleq.io/examples/openai-queue/ A queue pre-configured for OpenAI API workloads, plus a reference worker. The template encodes everything universally true about an OpenAI workload — ack mode, 5-minute `ackTimeout`, retry budget that doesn't get burned by rate limits — so you don't have to choose queue config. The worker translates OpenAI's error signals (`Retry-After`, 5xx) into SimpleQ's `ack` / `nack` / `defer` callbacks. ::: tip Prerequisite This example uses an [OpenAI API key](https://platform.openai.com/api-keys) and a SimpleQ API key (create one on the dashboard's API Keys page). ::: ## Create the queue ::: code-group ```bash [cURL] curl -X POST "$SQ_API_URL/v1/queues" \ -H "Authorization: Bearer $SQ_API_KEY" \ -H "content-type: application/json" \ -d '{ "template": "openai", "name": "openai-jobs", "webhookUrl": "https://your-worker.example.com/webhook" }' ``` ```js [Node] const res = await fetch(`${process.env.SQ_API_URL}/v1/queues`, { method: 'POST', headers: { 'content-type': 'application/json', authorization: `Bearer ${process.env.SQ_API_KEY}` }, body: JSON.stringify({ template: 'openai', name: 'openai-jobs', webhookUrl: 'https://your-worker.example.com/webhook', }), }); const queue = await res.json(); ``` ```python [Python] import os, httpx res = httpx.post( f"{os.environ['SQ_API_URL']}/v1/queues", headers={"authorization": f"Bearer {os.environ['SQ_API_KEY']}"}, json={ "template": "openai", "name": "openai-jobs", "webhookUrl": "https://your-worker.example.com/webhook", }, ) queue = res.json() ``` ::: Three fields. `template: "openai"` applies the OpenAI defaults; `name` and `webhookUrl` are yours. Override any default by passing it alongside `template` (e.g. `"concurrency": 50` to raise from the default of 20). ::: details What this template expands to | Field | Value | Why | | --- | --- | --- | | `mode` | `"ack"` | OpenAI calls can exceed the 15s webhook ceiling. Ack mode lets the worker return 200 immediately and report the real outcome via `/ack`, `/nack`, or `/defer`. | | `maxAttempts` | `4` | Budgets only genuine failures. 429/503 are deferred and don't count against this. | | `concurrency` | `20` | How many in-flight OpenAI calls at once. Raise to saturate a high tier. | | `ackTimeout` | `300` (5 min) | Sized above a typical OpenAI generation. Raise if using long-context models with large outputs. | | `ackTimeoutAction` | `"retry"` | If the worker dies silently, redeliver. | | `backoffType` / `backoffDelay` | `"exponential"` / `2`s | Spacing for genuine transient failures (`nack` with `retryable: true`). | | `dlqEnabled` | `true` | Exhausted retries or hard nacks land in the DLQ for inspection. | Rate limiting is deliberately not set — that's account-tier-specific. `defer` on 429/503 handles real backpressure. If you want a proactive guard, see [Add a proactive RPM guard](#add-a-proactive-rpm-guard) below. ::: ## Publish a job The payload is passed straight to `openai.chat.completions.create()` by the worker, so it's just an OpenAI Chat Completions body: ::: code-group ```bash [cURL] curl -X POST "$SQ_API_URL/v1/queues/openai-jobs/jobs" \ -H "Authorization: Bearer $SQ_API_KEY" \ -H "content-type: application/json" \ -d '{ "idempotencyKey": "summarize-doc-123", "payload": { "model": "gpt-4o-mini", "messages": [{ "role": "user", "content": "Summarize: ..." }] } }' ``` ```js [Node] const res = await fetch(`${process.env.SQ_API_URL}/v1/queues/openai-jobs/jobs`, { method: 'POST', headers: { 'content-type': 'application/json', authorization: `Bearer ${process.env.SQ_API_KEY}` }, body: JSON.stringify({ idempotencyKey: 'summarize-doc-123', payload: { model: 'gpt-4o-mini', messages: [{ role: 'user', content: 'Summarize: ...' }], }, }), }); const job = await res.json(); ``` ```python [Python] import os, httpx res = httpx.post( f"{os.environ['SQ_API_URL']}/v1/queues/openai-jobs/jobs", headers={"authorization": f"Bearer {os.environ['SQ_API_KEY']}"}, json={ "idempotencyKey": "summarize-doc-123", "payload": { "model": "gpt-4o-mini", "messages": [{"role": "user", "content": "Summarize: ..."}], }, }, ) job = res.json() ``` ::: `idempotencyKey` dedupes the publish call — a double-publish returns the existing job. See [Idempotency](/concepts/idempotency) for the broader pattern. ## The worker Same shape as the [generic ack worker](/examples/generic-ack-worker/), specialized to OpenAI's error semantics. Pick the tab for your framework. ::: code-group ```js [Express] // worker.mjs import express from 'express'; import crypto from 'node:crypto'; import OpenAI from 'openai'; const openai = new OpenAI({ maxRetries: 0 }); const { SQ_API_URL, SQ_API_KEY, SQ_SIGNING_SECRET } = process.env; const app = express(); function verifySignature(rawBody, header) { if (!header) return false; const expected = 'sha256=' + crypto.createHmac('sha256', SQ_SIGNING_SECRET).update(rawBody).digest('hex'); const a = Buffer.from(header); const b = Buffer.from(expected); return a.length === b.length && crypto.timingSafeEqual(a, b); } async function callback(jobId, kind, body = {}) { await fetch(`${SQ_API_URL}/v1/jobs/${jobId}/${kind}`, { method: 'POST', headers: { 'content-type': 'application/json', authorization: `Bearer ${SQ_API_KEY}` }, body: JSON.stringify(body), }); } app.post('/webhook', express.raw({ type: 'application/json' }), async (req, res) => { if (!verifySignature(req.body, req.headers['x-simpleq-signature'])) { return res.status(401).end(); } res.status(200).end(); const job = JSON.parse(req.body.toString('utf8')); try { const message = await openai.chat.completions.create(job.payload); console.log(`[${job.id}] completed: ${message.usage?.total_tokens} tokens`); await callback(job.id, 'ack'); } catch (err) { const status = err?.status; if (status === 429 || status === 503) { await callback(job.id, 'defer', { retryAfter: parseInt(err.headers?.['retry-after']) || 10, reason: `openai ${status}` }); } else if (status >= 400 && status < 500) { await callback(job.id, 'nack', { retryable: false, reason: `openai ${status}` }); } else { await callback(job.id, 'nack', { retryable: true, reason: `openai ${status ?? 'network'}` }); } } }); app.use(express.json()); app.listen(9000, () => console.log('openai ack worker on :9000')); ``` ```ts [Next.js (App Router)] // app/api/simpleq-webhook/route.ts import crypto from 'node:crypto'; import OpenAI from 'openai'; const openai = new OpenAI({ maxRetries: 0 }); const { SQ_API_URL, SQ_API_KEY, SQ_SIGNING_SECRET } = process.env; function verifySignature(rawBody: Buffer, header: string | null): boolean { if (!header || !SQ_SIGNING_SECRET) return false; const expected = 'sha256=' + crypto.createHmac('sha256', SQ_SIGNING_SECRET).update(rawBody).digest('hex'); const a = Buffer.from(header); const b = Buffer.from(expected); return a.length === b.length && crypto.timingSafeEqual(a, b); } async function callback(jobId: string, kind: string, body: Record = {}) { await fetch(`${SQ_API_URL}/v1/jobs/${jobId}/${kind}`, { method: 'POST', headers: { 'content-type': 'application/json', authorization: `Bearer ${SQ_API_KEY}` }, body: JSON.stringify(body), }); } export async function POST(req: Request) { const rawBody = Buffer.from(await req.arrayBuffer()); if (!verifySignature(rawBody, req.headers.get('x-simpleq-signature'))) { return new Response(null, { status: 401 }); } const job = JSON.parse(rawBody.toString('utf8')); const response = new Response(null, { status: 200 }); (async () => { try { const message = await openai.chat.completions.create(job.payload); console.log(`[${job.id}] completed: ${message.usage?.total_tokens} tokens`); await callback(job.id, 'ack'); } catch (err: any) { const status = err?.status; if (status === 429 || status === 503) { await callback(job.id, 'defer', { retryAfter: parseInt(err.headers?.['retry-after']) || 10, reason: `openai ${status}` }); } else if (status >= 400 && status < 500) { await callback(job.id, 'nack', { retryable: false, reason: `openai ${status}` }); } else { await callback(job.id, 'nack', { retryable: true, reason: `openai ${status ?? 'network'}` }); } } })(); return response; } ``` ```python [Python (FastAPI)] # worker.py import hashlib import hmac import json import os import httpx from fastapi import BackgroundTasks, FastAPI, Header, Request, Response from openai import APIStatusError, AsyncOpenAI, RateLimitError openai = AsyncOpenAI(max_retries=0) SQ_API_URL = os.environ["SQ_API_URL"] SQ_API_KEY = os.environ["SQ_API_KEY"] SQ_SIGNING_SECRET = os.environ["SQ_SIGNING_SECRET"] app = FastAPI() def verify_signature(raw_body: bytes, header: str | None) -> bool: if not header: return False expected = "sha256=" + hmac.new(SQ_SIGNING_SECRET.encode(), raw_body, hashlib.sha256).hexdigest() return hmac.compare_digest(expected, header) def parse_retry_after(value: str | None, fallback: int = 10) -> int: try: return int(value) or fallback except (TypeError, ValueError): return fallback # Reuse one client across callbacks for connection pooling. _client = httpx.AsyncClient(timeout=10.0) async def callback(job_id: str, kind: str, body: dict | None = None) -> None: await _client.post( f"{SQ_API_URL}/v1/jobs/{job_id}/{kind}", headers={"content-type": "application/json", "authorization": f"Bearer {SQ_API_KEY}"}, json=body or {}, ) async def process(job: dict) -> None: try: message = await openai.chat.completions.create(**job["payload"]) total = message.usage.total_tokens if message.usage else None print(f"[{job['id']}] completed: {total} tokens") await callback(job["id"], "ack") except RateLimitError as err: # 429 retry_after = parse_retry_after(err.response.headers.get("retry-after")) await callback(job["id"], "defer", {"retryAfter": retry_after, "reason": "openai 429"}) except APIStatusError as err: status = err.status_code if status == 503: retry_after = parse_retry_after(err.response.headers.get("retry-after")) await callback(job["id"], "defer", {"retryAfter": retry_after, "reason": "openai 503"}) elif 400 <= status < 500: await callback(job["id"], "nack", {"retryable": False, "reason": f"openai {status}"}) else: await callback(job["id"], "nack", {"retryable": True, "reason": f"openai {status}"}) except Exception: await callback(job["id"], "nack", {"retryable": True, "reason": "openai network"}) @app.post("/webhook") async def webhook( request: Request, background_tasks: BackgroundTasks, x_simpleq_signature: str | None = Header(default=None), ): # Verify over the raw bytes BEFORE parsing. raw = await request.body() if not verify_signature(raw, x_simpleq_signature): return Response(status_code=401) job = json.loads(raw) # BackgroundTasks runs in-process — if this worker crashes mid-job the # work is lost, but ackTimeout + ackTimeoutAction: "retry" redelivers it. background_tasks.add_task(process, job) return Response(status_code=200) # uvicorn worker:app --port 9000 ``` ::: ## Run it ::: code-group ```bash [Node] npm init -y npm i express openai OPENAI_API_KEY=sk-... \ SQ_API_URL=https://api.simpleq.io \ SQ_API_KEY=sq_live_... \ SQ_SIGNING_SECRET=... \ node worker.mjs ``` ```bash [Python] pip install fastapi uvicorn httpx openai OPENAI_API_KEY=sk-... \ SQ_API_URL=https://api.simpleq.io \ SQ_API_KEY=sq_live_... \ SQ_SIGNING_SECRET=... \ uvicorn worker:app --port 9000 ``` ::: Swap `express` (or FastAPI) for your framework of choice. The OpenAI SDK is the same across all tabs. - **`OPENAI_API_KEY`** — from [platform.openai.com/api-keys](https://platform.openai.com/api-keys). - **`SQ_API_URL`** — the SimpleQ API base your worker calls back to: `https://api.simpleq.io`. - **`SQ_API_KEY`** — create one on the dashboard's API Keys page. - **`SQ_SIGNING_SECRET`** — your queue's signing secret. See [Signature verification](/concepts/signature-verification) for where to find it. Point your queue's `webhookUrl` at your worker endpoint (e.g. `http://:9000/webhook`). ## Error routing | OpenAI response | Worker calls | Effect | | --- | --- | --- | | Success | `/ack` | Job completed. | | 429 / 503 (rate-limited / unavailable) | `/defer` with `Retry-After` relayed | Held and redelivered; no attempt burned. | | 4xx (bad request, auth, not found) | `/nack` `{retryable: false}` | Job dead-lettered. | | Non-503 5xx, network, SDK timeout | `/nack` `{retryable: true}` | Retried with backoff; counts against `maxAttempts`. | ## Three gotchas worth internalizing **1. Disable the SDK's own retries (`maxRetries: 0`).** The OpenAI SDK retries 429 and ≥500 twice by default with its own backoff. That swallows the error, you never see the `Retry-After`, and you can't relay it to `/defer`. Worse, the SDK's retries and SimpleQ's redelivery fight each other. With retries off, the 429 surfaces in your `catch`, you read its `Retry-After`, and SimpleQ owns the backoff. The example sets this; don't undo it. **2. The template is workload-shaped, not account-shaped.** The defaults encode facts true for any OpenAI workload (ack mode, 5-min `ackTimeout`, retry budget). They don't encode your account's RPM/TPM tier — that's account-shaped, not workload-shaped, and tuning is below. **3. OpenAI doesn't have a 529.** Unlike Anthropic, OpenAI uses 429 for both rate limiting and overload. The `Retry-After` header is your signal for both. The worker handles them the same way — defer with the relayed delay. ## Add a proactive RPM guard If you know your OpenAI tier and want SimpleQ to space out deliveries below your RPM ceiling, set `rateLimitMax` to ~80% of your RPM and `rateLimitWindow` to 60: ```bash curl -X PUT "$SQ_API_URL/v1/queues/" \ -H "Authorization: Bearer $SQ_API_KEY" \ -H "content-type: application/json" \ -d '{ "rateLimitMax": 4000, "rateLimitWindow": 60 }' ``` Two caveats: the limiter is per-worker (N workers running = N × the configured rate), and OpenAI's TPM usually binds before RPM for large-context models — a request counter can't see token cost. The proactive guard shaves the count of 429s; `defer` is what actually saves you at a real limit. --- ## Standard mode queue Source: https://docs.simpleq.io/examples/standard-mode-queue/ A standard-mode queue delivers each job to your webhook; your handler does the work and returns `2xx` to mark the job done. This is the simplest way to run jobs on SimpleQ. Your worker verifies the webhook signature, does the work, and replies with a status code — `2xx` if it succeeded, or a `5xx` to have SimpleQ retry. Drop in your own work and you have the integration. ::: tip Prerequisite This example requires a queue created with `mode: 'standard'` — the default, so only `name` and `webhookUrl` are required. ::: ## The worker Pick the tab for your language — each tab is a complete, single-file worker you can copy and run. The handler runs your work inline and replies with a status code. SimpleQ waits up to 15 seconds for the response. ::: code-group ```js [Node (Express)] import express from 'express'; import crypto from 'node:crypto'; const { SQ_SIGNING_SECRET } = process.env; const app = express(); // Verify the webhook came from SimpleQ. // See https://docs.simpleq.io/concepts/signature-verification for the full breakdown. function verifySignature(rawBody, header) { if (!header) return false; const expected = 'sha256=' + crypto.createHmac('sha256', SQ_SIGNING_SECRET).update(rawBody).digest('hex'); const a = Buffer.from(header); const b = Buffer.from(expected); return a.length === b.length && crypto.timingSafeEqual(a, b); } // Replace this with your real work. It MUST finish well under the 15s ceiling. // Return on success; throw to fail the attempt (→ 500, retried with backoff). async function doTheWork(payload) { // your code here } app.post( '/webhook', express.raw({ type: 'application/json' }), async (req, res) => { if (!verifySignature(req.body, req.headers['x-simpleq-signature'])) { return res.status(401).end(); } const job = JSON.parse(req.body.toString('utf8')); try { await doTheWork(job.payload); // A 2xx within 15s marks the job completed. res.status(200).end(); } catch (err) { // Failed attempt → retried with backoff up to maxAttempts, then the DLQ. res.status(500).end(); } }, ); // Other routes can use global JSON parsing as normal. app.use(express.json()); app.listen(9000, () => { console.log('standard worker on :9000'); }); ``` ```python [Python (FastAPI)] import hashlib import hmac import json import os from fastapi import FastAPI, Header, Request, Response SQ_SIGNING_SECRET = os.environ["SQ_SIGNING_SECRET"] app = FastAPI() # Verify the webhook came from SimpleQ. # See https://docs.simpleq.io/concepts/signature-verification for the full breakdown. def verify_signature(raw_body: bytes, header: str | None) -> bool: if not header: return False expected = "sha256=" + hmac.new(SQ_SIGNING_SECRET.encode(), raw_body, hashlib.sha256).hexdigest() return hmac.compare_digest(expected, header) # Replace this with your real work. It MUST finish well under the 15s ceiling. # Return on success; raise to fail the attempt (→ 500, retried with backoff). async def do_the_work(payload: dict) -> None: # your code here pass @app.post("/webhook") async def webhook(request: Request, x_simpleq_signature: str | None = Header(default=None)): # Read the raw body BEFORE parsing — the HMAC is over the exact bytes. raw = await request.body() if not verify_signature(raw, x_simpleq_signature): return Response(status_code=401) job = json.loads(raw) try: await do_the_work(job["payload"]) # A 2xx within 15s marks the job completed. return Response(status_code=200) except Exception: # Failed attempt → retried with backoff up to maxAttempts, then the DLQ. return Response(status_code=500) # Run with: uvicorn worker:app --port 9000 ``` ```go [Go (net/http)] package main import ( "crypto/hmac" "crypto/sha256" "encoding/hex" "encoding/json" "io" "log" "net/http" "os" ) var sqSigningSecret = os.Getenv("SQ_SIGNING_SECRET") // A SimpleQ job as delivered to your webhook. Your data is under Payload. type Job struct { ID string `json:"id"` Queue string `json:"queue"` Payload json.RawMessage `json:"payload"` Attempt int `json:"attempt"` MaxAttempts int `json:"maxAttempts"` CreatedAt string `json:"createdAt"` } // Verify the webhook came from SimpleQ. // See https://docs.simpleq.io/concepts/signature-verification for the full breakdown. func verifySignature(rawBody []byte, header string) bool { if header == "" { return false } mac := hmac.New(sha256.New, []byte(sqSigningSecret)) mac.Write(rawBody) expected := "sha256=" + hex.EncodeToString(mac.Sum(nil)) return hmac.Equal([]byte(expected), []byte(header)) } // Replace this with your real work. It MUST finish well under the 15s ceiling. // Return nil on success; return an error to fail the attempt (→ 500, retried). func doTheWork(payload json.RawMessage) error { // your code here return nil } func webhook(w http.ResponseWriter, r *http.Request) { // Read the raw body BEFORE parsing — the HMAC is over the exact bytes. raw, err := io.ReadAll(r.Body) if err != nil { w.WriteHeader(http.StatusBadRequest) return } if !verifySignature(raw, r.Header.Get("x-simpleq-signature")) { w.WriteHeader(http.StatusUnauthorized) return } var job Job if err := json.Unmarshal(raw, &job); err != nil { w.WriteHeader(http.StatusBadRequest) return } // Run the work synchronously, then let the status code report the outcome. // A 2xx within 15s marks the job completed. if err := doTheWork(job.Payload); err != nil { // Failed attempt → retried with backoff up to maxAttempts, then the DLQ. w.WriteHeader(http.StatusInternalServerError) return } w.WriteHeader(http.StatusOK) } func main() { http.HandleFunc("/webhook", webhook) log.Println("standard worker on :9000") log.Fatal(http.ListenAndServe(":9000", nil)) } ``` ```java [Java (Spring Boot)] // Spring Boot single-file standard-mode worker. Deps (Maven): // org.springframework.boot:spring-boot-starter-web // com.fasterxml.jackson.core:jackson-databind (bundled with starter-web) // Java 17+. package com.example.simpleq; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestHeader; import org.springframework.web.bind.annotation.RestController; import javax.crypto.Mac; import javax.crypto.spec.SecretKeySpec; import java.nio.charset.StandardCharsets; import java.security.MessageDigest; @SpringBootApplication public class Worker { public static void main(String[] args) { // This worker example listens on port 9000 (set server.port=9000). SpringApplication.run(Worker.class, args); System.out.println("standard worker on :9000"); } } @RestController class WebhookController { static final String SQ_SIGNING_SECRET = System.getenv("SQ_SIGNING_SECRET"); static final ObjectMapper MAPPER = new ObjectMapper(); // Verify the webhook came from SimpleQ. // See https://docs.simpleq.io/concepts/signature-verification for the full breakdown. boolean verifySignature(byte[] rawBody, String header) { if (header == null) return false; String expected = "sha256=" + hmacHex(rawBody); // Constant-time compare over the raw bytes. return MessageDigest.isEqual( expected.getBytes(StandardCharsets.UTF_8), header.getBytes(StandardCharsets.UTF_8)); } static String hmacHex(byte[] rawBody) { try { Mac mac = Mac.getInstance("HmacSHA256"); mac.init(new SecretKeySpec(SQ_SIGNING_SECRET.getBytes(StandardCharsets.UTF_8), "HmacSHA256")); byte[] digest = mac.doFinal(rawBody); StringBuilder sb = new StringBuilder(digest.length * 2); for (byte b : digest) sb.append(String.format("%02x", b)); return sb.toString(); } catch (Exception e) { throw new RuntimeException(e); } } // Replace this with your real work. It MUST finish well under the 15s ceiling. // Return on success; throw to fail the attempt (-> 500, retried with backoff). void doTheWork(JsonNode payload) { // your code here } @PostMapping(value = "/webhook", consumes = MediaType.ALL_VALUE) ResponseEntity webhook( @RequestBody(required = false) byte[] rawBody, @RequestHeader(value = "x-simpleq-signature", required = false) String signature) { // Read the raw body BEFORE parsing -- the HMAC is over the exact bytes. byte[] raw = rawBody == null ? new byte[0] : rawBody; if (!verifySignature(raw, signature)) { return ResponseEntity.status(HttpStatus.UNAUTHORIZED).build(); } JsonNode job; try { job = MAPPER.readTree(raw); } catch (Exception e) { return ResponseEntity.status(HttpStatus.BAD_REQUEST).build(); } // Run the work synchronously, then let the status code report the // outcome. A 2xx within 15s marks the job completed. try { doTheWork(job.path("payload")); return ResponseEntity.ok().build(); } catch (Exception err) { // Failed attempt -> retried with backoff up to maxAttempts, then the DLQ. return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build(); } } } ``` ```php [PHP] { // Read the raw body BEFORE parsing — the HMAC is over the exact bytes. using var ms = new MemoryStream(); await request.Body.CopyToAsync(ms); var raw = ms.ToArray(); var header = request.Headers["x-simpleq-signature"].FirstOrDefault(); if (!VerifySignature(raw, header)) return Results.Unauthorized(); var job = JsonSerializer.Deserialize(raw); // Run the work synchronously, then let the status code report the outcome. // A 2xx within 15s marks the job completed. try { await DoTheWork(job.GetProperty("payload")); return Results.Ok(); } catch (Exception) { // Failed attempt → retried with backoff up to maxAttempts, then the DLQ. return Results.StatusCode(500); } }); Console.WriteLine("standard worker on :9000"); app.Run("http://0.0.0.0:9000"); ``` ::: ## Run it ::: code-group ```bash [Node] npm init -y npm i express SQ_SIGNING_SECRET=... \ node worker.mjs ``` ```bash [Python] pip install fastapi uvicorn SQ_SIGNING_SECRET=... \ uvicorn worker:app --port 9000 ``` ```bash [Go] SQ_SIGNING_SECRET=... \ go run worker.go ``` ```bash [Java] # spring-boot-starter-web on the classpath; server.port=9000 SQ_SIGNING_SECRET=... \ ./mvnw spring-boot:run -Dspring-boot.run.arguments=--server.port=9000 ``` ```bash [PHP] # A synchronous handler works on any SAPI. For a quick local test: SQ_SIGNING_SECRET=... \ php -S 0.0.0.0:9000 worker.php ``` ```bash [C#/.NET] SQ_SIGNING_SECRET=... \ dotnet run ``` ::: - **`SQ_SIGNING_SECRET`** — your queue's signing secret. See [Signature verification](/concepts/signature-verification) for where to find it (dashboard reveal or `POST /v1/queues` response, depending on how the queue was created). This worker example listens on port 9000, point your queue's `webhookUrl` to your webhook: `http://:9000/webhook`. ## What each piece does | Piece | Concept | | --- | --- | | Verifying `x-simpleq-signature` over the raw body | [Signature verification](/concepts/signature-verification) — prevents forged webhooks. | | Returning `2xx` after the work completes | [Webhook delivery](/concepts/webhook-delivery) — a `2xx` within 15s marks the job completed. | | Returning `500` (or any other non-2xx, or no response in 15s) | Failed attempt → retried with backoff up to `maxAttempts`, then the DLQ if `dlqEnabled: true`. | ## Relaying backpressure If your work calls a downstream that rate-limits you (HTTP `429` / `503` / `529`), return that same status with a `Retry-After` header instead of `500`. SimpleQ holds the job and redelivers it after the delay **without burning an attempt** — see [Backpressure](/concepts/backpressure) for the pattern. ## Test it with a publish Once the worker is running, publish a job to trigger a delivery: ```bash curl -X POST "$SQ_API_URL/v1/queues//jobs" \ -H "Authorization: Bearer $SQ_API_KEY" \ -H "content-type: application/json" \ -d '{ "payload": { "hello": "world" }, "idempotencyKey": "test-1" }' ``` You should see a `200` on the worker side and the job transition to `completed` (assuming `doTheWork` succeeded). Watch the queue detail page in the dashboard to see the state change in real time. --- ## API reference (condensed from OpenAPI 0.0.1) Queues as a Service. POST a job then SimpleQ queues it and POSTs it to your webhook with retries, backoff, a dead-letter queue, and rate-aware redelivery handled for you. Auth: `Authorization: Bearer sq_live_...` ### POST /v1/queues/{queueName}/jobs Tag: Jobs Publish a job Queues a job for delivery to the queue’s webhook. If `idempotencyKey` matches a previously published job, returns the existing job (200) instead of creating a duplicate. Request body: - `payload`: object (required) — Arbitrary JSON object delivered as-is to your webhook. SimpleQ does not inspect or validate the contents. - `idempotencyKey`: string — Optional key for deduplication. Publishing twice with the same key returns the original job instead of creating a duplicate. - `delay`: number — Delay delivery by this many seconds (max 86400 = 24h). Decimals allowed. Responses: - 200: Idempotent hit — existing job returned - 201: Job created - 400: Validation error - 404: Queue not found ### GET /v1/queues Tag: Queues List queues Responses: - 200: Array of queues (signingSecret omitted) ### POST /v1/queues Tag: Queues Create a queue Creates a queue and generates a `signingSecret` (returned once on creation). Supports named templates that prefill defaults: `anthropic`, `openai`. Explicit fields override template values. Request body: - `name`: string (required) - `webhookUrl`: string (required) - `maxAttempts`: integer - `concurrency`: integer - `dlqEnabled`: boolean - `mode`: string (standard | ack) - `rateLimitMax`: integer | null - `rateLimitWindow`: number - `ackTimeout`: number - `ackTimeoutAction`: string (retry | dead) - `backoffType`: string (fixed | exponential) - `backoffDelay`: number - `template`: string (anthropic | openai) — Optional. Prefills queue defaults from a named template. Explicit fields override the template values. Responses: - 201: Queue created (signingSecret included) - 400: Validation error ### GET /v1/queues/{id} Tag: Queues Get a queue Responses: - 200: Queue (signingSecret omitted) - 404: Queue not found ### PUT /v1/queues/{id} Tag: Queues Update queue config Partial update. `name` cannot be changed after creation. Request body: - `webhookUrl`: string - `maxAttempts`: integer - `concurrency`: integer - `dlqEnabled`: boolean - `mode`: string (standard | ack) - `rateLimitMax`: integer | null - `rateLimitWindow`: number - `ackTimeout`: number - `ackTimeoutAction`: string (retry | dead) - `backoffType`: string (fixed | exponential) - `backoffDelay`: number Responses: - 200: Updated queue (signingSecret included) - 400: Validation error - 404: Queue not found ### DELETE /v1/queues/{id} Tag: Queues Delete a queue Deletes a queue. The queue stops accepting new jobs and the name becomes available to re-use immediately. Existing jobs remain retrievable by job ID for inspection. Responses: - 204: Queue deleted - 404: Queue not found ### GET /v1/queues/{id}/jobs Tag: Queues List jobs in a queue Responses: - 200: Paginated jobs - 404: Queue not found ### GET /v1/queues/{id}/dlq Tag: DLQ List dead-letter jobs Responses: - 200: Paginated DLQ entries - 404: Queue not found ### POST /v1/queues/{id}/dlq/{jobId}/retry Tag: DLQ Retry a single DLQ job Responses: - 201: New job created from DLQ entry - 404: Queue or DLQ job not found - 409: DLQ job already retried ### POST /v1/queues/{id}/dlq/retry Tag: DLQ Bulk retry DLQ jobs Retries up to 1000 DLQ jobs at once. Provide either `jobIds: string[]` or `all: true`. Responses: - 200: Bulk retry result - 400: Validation error - 404: Queue not found ### GET /v1/jobs/{id} Tag: Jobs Get job status Responses: - 200: Job document - 404: Job not found ### POST /v1/jobs/{id}/retry Tag: Jobs Retry a failed job Re-queues a `failed` job. Returns 400 if the job is in any other state. Responses: - 200: Job re-queued - 400: Job is not in failed state - 404: Job not found ### POST /v1/jobs/{id}/ack Tag: Ack mode Acknowledge a job (ack-mode queues only) Signals successful completion of a job in `awaiting_ack`. Only valid for queues with `mode: "ack"`. Responses: - 200: Ack accepted - 400: Job is not awaiting ack, or queue does not use ack mode - 404: Job not found ### POST /v1/jobs/{id}/nack Tag: Ack mode Signal failure for a job (ack-mode queues only) Marks an `awaiting_ack` job as failed. `retryable: true` re-queues with backoff; `retryable: false` dead-letters immediately. Request body: - `retryable`: boolean - `reason`: string Responses: - 200: Nack accepted - 400: Validation error, job not awaiting ack, or queue not in ack mode - 404: Job not found ### POST /v1/jobs/{id}/defer Tag: Ack mode Apply backpressure (ack-mode queues only) Holds an `awaiting_ack` job for `retryAfter` seconds, then redelivers — without burning an attempt. Use when a downstream returns 429/503/529. Request body: - `retryAfter`: number (required) - `reason`: string Responses: - 200: Defer accepted - 400: Validation error, job not awaiting ack, or queue not in ack mode - 404: Job not found