Queues & Backends¶
The queue system is the asynchronous heart of daycry/jobs v3. You describe a unit of work with
the fluent builder, dispatch it onto a backend, and a worker process later pulls it off the
queue and runs it. Every backend implements one small, lease-oriented contract —
Daycry\Jobs\Queues\QueueBackend — so the worker pipeline, the cron runner and your application
code are completely decoupled from where messages are actually stored (a SQL table, Redis,
beanstalkd or Azure Service Bus).
This page documents how to enqueue work, the QueueBackend contract and JobLease lifecycle, the
worker pipeline, the at-least-once delivery guarantee, and the precise semantics and
configuration of each shipped backend.
Note: Delivery is at-least-once for every persistent backend. A message may be delivered more than once (after a crash, a lease expiry, or a reaper recovery), so your handlers should be idempotent. See Idempotency.
Enqueueing work¶
There are two entry points, both on the Daycry\Jobs\Jobs facade.
Jobs::define(...)->dispatch()¶
Jobs::define($handler, $payload) opens a fluent JobBuilder. Chain configuration, then call
dispatch() to serialize and enqueue the resulting immutable JobDefinition. With no argument
dispatch() uses the default backend (Config\Jobs::$worker); pass a backend name to override it.
The return value is the backend-assigned identifier (a string).
use Daycry\Jobs\Jobs;
// Enqueue onto the default backend ($worker)
$id = Jobs::define('command', 'app:report')
->queue('reports')
->maxRetries(3)
->dispatch();
// Enqueue onto a specific backend by name
$id = Jobs::define('command', 'app:report')
->queue('reports')
->dispatch('database');
// Schedule for the future (honoured by database/redis/beanstalk/serviceBus)
$id = Jobs::define('url', ['method' => 'POST', 'url' => 'https://example.com/webhook'])
->queue('web')
->scheduledAt(new DateTimeImmutable('+10 minutes'))
->dispatch('redis');
Jobs::backend()¶
When you want to talk to the backend directly (for example to enqueue a pre-built definition, or to
call lease verbs in a test), resolve it with Jobs::backend():
use Daycry\Jobs\Jobs;
$backend = Jobs::backend('redis'); // or Jobs::backend() for the default $worker
$id = $backend->enqueue(
Jobs::define('command', 'app:report')->queue('reports')->toDefinition(),
);
Backends are resolved through Daycry\Jobs\Queues\BackendFactory, which maps a name to a class via
Config\Jobs::$backends and throws QueueException::forInvalidWorker() for an unknown name.
Note: With the default
syncbackend,dispatch()/enqueue()runs the job inline and returns a synthetic id. Nothing is stored and no worker is involved. Switch$worker(or pass a backend name) to a persistent backend for real asynchronous processing.
The QueueBackend contract¶
Every backend implements this interface. The contract is stateless with respect to the in-flight
message: the worker holds a JobLease and passes it back to ack/nack/abandon, so the backend
never has to remember which message a given worker is currently processing.
namespace Daycry\Jobs\Queues;
use Daycry\Jobs\Definition\JobDefinition;
interface QueueBackend
{
public function enqueue(JobDefinition $definition): string;
public function fetch(string $queue): ?JobLease;
public function ack(JobLease $lease): bool;
public function nack(JobLease $lease, ?int $delaySeconds = null): bool;
public function abandon(JobLease $lease): bool;
public function reapExpired(string $queue, int $visibilityTimeout): int;
}
Method |
Responsibility |
|---|---|
|
Persist the definition into the queue. Returns the backend-assigned id. |
|
Lease one ready message, or |
|
Mark the leased message as successfully processed; the backend removes it permanently. |
|
Processing failed — redeliver the message, optionally after |
|
Stop holding the lease without retrying (unprocessable / retries exhausted). Routes to a native dead-letter facility where available; otherwise equivalent to |
|
Reclaim messages whose lease expired (crashed/stalled worker) so they become eligible again. Returns the number of messages recovered. |
The JobLease¶
fetch() returns a Daycry\Jobs\Queues\JobLease (an immutable value object) for each in-flight
message. The same lease is handed back to settle the message.
final readonly class JobLease
{
public JobEnvelope $envelope; // the decoded message
public string $token; // opaque backend identifier needed to settle
public string $ownerToken; // random token minted by the worker at fetch()
public DateTimeImmutable $expiresAt;// visibility deadline
public string $backend; // backend name
}
Field |
Meaning |
|---|---|
|
The decoded |
|
Opaque backend handle used to ack/nack/abandon: the row primary key (database), the raw payload string (redis processing list), the |
|
A random token minted by the worker at |
|
The visibility deadline. Past this time the backend may redeliver the message to another consumer. |
|
The backend name ( |
Helpers:
JobLease::withRelativeExpiry($envelope, $token, $owner, $secondsFromNow, $backend)— factory that computes the deadline as now + N seconds (used by every backend’sfetch()).$lease->isExpired()— whether the visibility deadline has passed. A worker that detects this must not ack, because the message may already have been redelivered.$lease->renew($secondsFromNow)— returns a copy with an extended deadline (token/owner preserved). The redis backend re-stamps its in-flight record onrenewLease().
The wire envelope¶
Every backend serializes the same canonical JSON object (built by
Daycry\Jobs\Queues\EnvelopeFactory::toWire()) so a message enqueued through one path can be
consumed through another:
{
"job": "command",
"payload": "app:report",
"queue": "reports",
"priority": 5,
"maxRetries": 3,
"attempts": 0,
"name": "daily-report",
"identifier": "9f1c2a...",
"idempotencyKey": null,
"schedule": null,
"_sig": "<HMAC-SHA256>"
}
The _sig HMAC is computed over the immutable identity fields only (job, payload, queue,
priority, maxRetries, name, identifier, idempotencyKey) — the mutable attempts and schedule are excluded
so the signature survives a requeue (where backends re-serialize with attempts + 1). See
Envelope signing.
The worker pipeline¶
The long-running worker (Daycry\Jobs\Worker\QueueWorker, driven by the jobs:queue:work command)
runs every message through a fixed pipeline. Each call to processOnce($queue) performs exactly one
cycle:
fetch -> verify signature -> idempotency guard -> run ONE attempt -> ack / nack(backoff) / abandon
fetch — lease one ready message. If the queue is empty, returns a
WorkerResult('empty').verify signature — when
Config\Jobs::$verifyEnvelopeSignatureis true and a signing key is configured, the_sigHMAC is verified. A missing or invalid signature isabandoned and logged ascritical(WorkerResult('rejected')).idempotency guard — if the envelope carries an
idempotencyKeythat was already processed, the message is acked and skipped (WorkerResult('skipped-idempotent')).run one attempt — the job runs exactly once through
JobRuntime(handler resolution, timeout, single-instance lock, output capture). The runtime never loops or sleeps for retries.settle:
success →
ack(message removed) →WorkerResult('acked').failure with retries left (
attemptIndex < maxRetries) →nack($lease, $delay)where$delaycomes from the backoff policy →WorkerResult('requeued').failure with retries exhausted →
abandon($lease)(dead-letter / drop) → loggedcritical→WorkerResult('dead-lettered').
Note: The retry decision lives in the worker, not the runtime. A job runs at most
maxRetries + 1times in total. See Retries & Backoff.
WorkerResult statuses¶
QueueWorker::processOnce() returns a Daycry\Jobs\Worker\WorkerResult whose status is one of:
Status |
Meaning |
|---|---|
|
The queue had no ready message. |
|
The job succeeded and was removed. |
|
The job failed but had retries left; it was nacked with a backoff delay. |
|
The job failed and exhausted its retries; it was abandoned. |
|
The message had an invalid payload or signature; it was abandoned. |
|
The idempotency key had already been processed; the message was acked without running. |
At-least-once delivery¶
Persistent backends guarantee that a successfully enqueued message is delivered at least once.
The worker holds the message under a visibility lease while it runs; the message is only removed on
ack. If the worker crashes between fetch() and ack(), the lease eventually expires and the
message becomes eligible again (via the reaper for
database/redis, or natively for beanstalk/Service Bus). This means:
A job can run more than once. Make handlers idempotent (use
idempotencyKey()for built-in de-duplication — see Idempotency).A
nackre-serializes the message withattempts + 1so the retry counter is never lost on redelivery.
Recovering crashed workers (reaper)¶
If a worker dies while holding a lease, the message is invisible until its visibility timeout
elapses. The jobs:queue:reap command calls QueueBackend::reapExpired() to return such messages
to the ready state:
# Recover stranded messages on the 'reports' queue (default backend)
php spark jobs:queue:reap reports
# Explicit backend
php spark jobs:queue:reap reports --backend redis
Run it periodically (for example once a minute from system cron) for the database and redis
backends. beanstalk and serviceBus recover in-flight messages natively when their
reservation/lock expires, so their reapExpired() is a no-op (returns 0).
The visibility timeout applied by the command is redisProcessingVisibilityTimeout when the backend
is redis, and databaseVisibilityTimeout otherwise (both default to 300 seconds).
Warning: Set the visibility timeout greater than your longest expected job runtime. If a job runs longer than the timeout, the reaper (or the broker) will consider the worker crashed and redeliver the message while it is still running — causing a duplicate execution.
Backends¶
Backends are declared in Config\Jobs::$backends (name → class) and the default is selected by
Config\Jobs::$worker:
public array $backends = [
'sync' => SyncBackend::class,
'database' => DatabaseBackend::class,
'redis' => RedisBackend::class,
'beanstalk' => BeanstalkBackend::class,
'serviceBus' => ServiceBusBackend::class,
];
public string $worker = 'sync'; // default backend when none is specified
Available queue names come from Config\Jobs::$queues (a comma-separated string or a list). The
worker defaults to the first configured queue when you do not name one.
sync¶
Synchronous, in-process. SyncBackend::enqueue() does not store anything — it runs the job
inline through JobRuntime and returns a synthetic id (sync-<hex>). Because work happens at
enqueue time, there is nothing to pull: fetch() always returns null, the lease verbs are no-ops
returning true, and reapExpired() recovers nothing.
// With $worker = 'sync', this runs the command immediately and returns once it finishes.
$id = Jobs::define('command', 'app:report')->dispatch();
Use sync for local development, tests, and CLI flows where a real broker is undesirable. It is the
default $worker.
Note:
maxRetries,nackbackoff and the reaper are meaningless forsyncbecause there is no queue. A failing inline job simply returns a failedExecutionResult.
database¶
Persistent relational storage (the queues table). Reliable, fully inspectable with plain SQL, and a
good default for most applications that already run a database.
Storage and claim. enqueue() inserts a row with status = 'pending'. fetch() atomically
claims one ready row using SELECT ... FOR UPDATE SKIP LOCKED (MySQL 8+, PostgreSQL 9.5+) with an
optimistic-locking fallback (exponential backoff + jitter) for engines that lack SKIP LOCKED
(e.g. SQLite). The claim query is:
SELECT id FROM queues
WHERE queue = ? AND status = 'pending' AND schedule <= ?
AND (available_at IS NULL OR available_at <= ?)
ORDER BY priority ASC, schedule ASC
LIMIT 1
FOR UPDATE SKIP LOCKED;
The claimed row is stamped status = 'in_progress', reserved_at = now, owner_token = <random>.
Lease verbs.
Verb |
Effect on the row |
|---|---|
|
|
|
Requeue in place: same row/id, |
|
|
|
Rows left |
Recovery. Run jobs:queue:reap <queue> periodically. The timeout used is
databaseVisibilityTimeout (default 300s).
Configuration.
public array $database = [
'group' => null, // CI4 connection group (null = default)
'table' => 'queues',
];
public int $databaseVisibilityTimeout = 300; // seconds; > your longest job runtime
Migration. The table and lease columns (reserved_at, available_at, owner_token) plus the
composite claim index are created by:
php spark migrate -n "Daycry\Jobs"
Note: The
queuestable grows indefinitely (acked rows becomecompleted, abandoned rows becomefailed). Schedulejobs:queue:purgeto prune it.
redis¶
Requires the ext-redis extension. A reliable-queue implementation built on four keys per queue
(prefix jobs:):
Key |
Type |
Role |
|---|---|---|
|
LIST |
Ready messages ( |
|
ZSET |
Future / delayed messages (score = due unix timestamp). |
|
LIST |
In-flight messages (atomic move via |
|
HASH |
|
Fetch. fetch() first promotes any due delayed messages ({q}-delayed → {q}-waiting) using a
Lua script (atomic, prevents duplicate promotion under concurrent workers; falls back to a zRem
gate). It then atomically moves one message from waiting to processing via RPOPLPUSH, stamps a
{ts, owner} entry in processing-meta, and returns the lease.
Lease verbs.
Verb |
Effect |
|---|---|
|
Remove the message from |
|
Re-serialize the payload with |
|
Drop from |
|
For every |
Lease renewal. A long-running worker can call renewLease($lease) to re-stamp the
processing-meta timestamp so the reaper does not reclaim a still-running job.
Recovery. Run jobs:queue:reap <queue> --backend redis periodically. The timeout used is
redisProcessingVisibilityTimeout (default 300s).
Configuration.
public int $redisProcessingVisibilityTimeout = 300; // seconds; > your longest job runtime
The connection is taken from the CodeIgniter Cache config (the redis handler settings).
Note:
Config\Jobs::$blockingFetch/$blockingFetchTimeoutare reserved options describing the intent to use blocking reads (BRPOPLPUSH). The shippedjobs:queue:workworker fetches non-blockingly and sleepspollIntervalseconds between empty cycles; see Worker Behaviour.
beanstalk¶
Wraps beanstalkd tubes via pda/pheanstalk v5. Each queue maps to a beanstalkd tube.
Enqueue. enqueue() puts the canonical wire payload onto the tube with a computed delay
(derived from scheduledAt) and a TTR (time-to-run). The TTR is resolved from the definition’s
timeout, falling back to Config\Jobs::$jobTimeout, then to the backend default (3600s). It
returns the wire identifier (stable across requeues), not the volatile beanstalkd job id.
Fetch. fetch() watches the target tube, ignores all other watched tubes, then
reserveWithTimeout(5). A corrupt body is bury-ed so it neither blocks the tube nor is silently
lost.
Lease verbs.
Verb |
Effect |
|---|---|
|
|
|
Re-serialize with |
|
|
|
No-op (returns |
Configuration.
public array $beanstalk = [
'host' => '127.0.0.1',
'port' => 11300,
];
Warning: The TTR is the beanstalkd-side visibility window. If a job runs longer than its TTR, beanstalkd re-serves it to another worker mid-execution. Ensure the resolved TTR (
timeout/jobTimeout) is larger than the real runtime.
serviceBus¶
Azure Service Bus over HTTP using the peek-lock pattern (SAS authentication). Each queue maps to a Service Bus queue/entity.
Enqueue. enqueue() POSTs the canonical wire payload to /<queue>/messages. A future
scheduledAt is mapped to the ScheduledEnqueueTimeUtc broker property.
Fetch. fetch() POSTs /<queue>/messages/head?timeout=<lock> (peek-lock). A 201 Created
holds the message under a LockToken; anything else (204 and beyond) means no usable message.
The messageId + lockToken form the settle location (.../messages/<messageId>/<lockToken>).
Lease verbs.
Verb |
Effect |
|---|---|
|
|
|
Service Bus has no native “unlock with delay”, so a fresh copy of the wire (with |
|
Settle ( |
|
No-op (returns |
Configuration.
public array $serviceBus = [
'url' => '', // https://<namespace>.servicebus.windows.net/<queue>
'issuer' => '', // SAS key name
'secret' => '', // SAS key value — prefer env('SERVICEBUS_SECRET')
];
public int $serviceBusLockTimeout = 60; // seconds requested when peek-locking
Warning: Set
serviceBusLockTimeout>= your maximum job runtime. If the lock expires while the job is still running, the broker redelivers the message to another worker.
Choosing a backend¶
Need |
Recommended |
|---|---|
Inline execution / development / tests |
|
Operational transparency, SQL inspection, no extra infrastructure |
|
Fast in-memory processing, delayed messages |
|
Mature, dedicated tube-based broker |
|
Cloud-native (Azure), managed broker |
|
Security¶
Queue messages are HMAC-signed at enqueue time and verified by the worker, and handlers can be
restricted per queue with Config\Jobs::$queueHandlers. See
Per-queue handler allowlist and
Envelope signing.
See also¶
CLI Commands —
jobs:queue:work,jobs:queue:reap,jobs:queue:purge.Operations — running workers in production, the reaper, scaling, observability.
Scheduling — enqueueing work on a cron schedule.
Retries & Backoff — the retry model and backoff strategies.
Configuration — every backend and worker setting.