Architecture

v3 is a single, layered architecture. Each layer has one responsibility and communicates with the next through small, immutable value objects. There is no mutable god-object: definitions describe work, handlers contain only business logic, backends move messages, and the worker orchestrates a single attempt per delivery. This page explains the layers, the value objects that flow between them, the worker and cron pipelines, the QueueBackend contract, and the at-least-once delivery guarantee.

If you are new to the package, read Quick Start first; this page is the conceptual map behind it.


Layers

Layer

Namespace

Responsibility

Definition

Daycry\Jobs\Definition

Describe a job. JobBuilder (fluent, mutable accumulator) produces an immutable JobDefinition value object.

Handlers

Daycry\Jobs\Handlers

Business logic only. JobHandlerInterface::handle(JobContext); HandlerRegistry resolves keys and enforces the per-queue allowlist.

Queues

Daycry\Jobs\Queues

Transport. One QueueBackend contract with lease semantics; JobEnvelope is the wire message, JobLease is an in-flight reservation. EnvelopeFactory builds and signs the wire payload; BackendFactory resolves backends from config.

Backends

Daycry\Jobs\Queues\Backends

Concrete transports: SyncBackend, DatabaseBackend, RedisBackend, BeanstalkBackend, ServiceBusBackend.

Signing

Daycry\Jobs\Queues\Signing

EnvelopeSigner — HMAC-SHA256 sign/verify with constant-time comparison.

Execution

Daycry\Jobs\Execution

Run a single attempt. JobRuntime, JobContext, ExecutionResult, Timeout, RetryPolicy/RetryPolicyFixed, IdempotencyGuard, SingleInstanceLock.

Worker

Daycry\Jobs\Worker

QueueWorker integrates the pipeline: fetch → verify → idempotency → run one attempt → ack/nack/abandon. WorkerResult reports the outcome.

Cron

Daycry\Jobs\Cron

Scheduler (fluent registry of JobBuilders) and CronRunner (evaluate due definitions; enqueue or run inline).

Config

Daycry\Jobs\Config

Config\Jobs — handler/backend maps, security allowlists, retry/timeout settings, and init() to register scheduled jobs.

Support

Daycry\Jobs\Libraries, Daycry\Jobs\Metrics

Cross-cutting helpers: CircuitBreaker, RateLimiter, DeadLetterQueue, serializers, and the MetricsCollectorInterface collectors.


Core value objects

The layers talk to each other only through these small, mostly immutable objects:

  • JobDefinition (readonly): what the job is — handler key, payload, scheduling, retry policy, identity. Every withXxx() helper returns a new instance, so a definition can be shared across enqueue sites without spooky action at a distance.

  • JobEnvelope / wire payload: how the definition travels through a queue. The canonical wire object is:

    { "job", "payload", "queue", "priority", "maxRetries", "attempts", "name", "identifier", "idempotencyKey", "schedule", "_sig" }
    

    Built by EnvelopeFactory::toWire(), identical across every backend so a message enqueued via one path can be consumed via another.

  • JobLease (readonly): a reservation on an in-flight message —

    • token: opaque backend handle needed to ack/nack later (DB primary key, Redis processing entry, Service Bus LockToken, Beanstalk job id);

    • ownerToken: a random token minted by the worker at fetch() so a reaped-then-reassigned message cannot be acked by the previous owner;

    • expiresAt: the visibility deadline. After it passes the backend may redeliver; a long-running worker can renew() it.

  • JobContext (readonly): what a handler seespayload, name, queue, attempt, meta. Handlers never receive scheduling or queue state.

  • ExecutionResult: the outcome of one handler run — success, output, error, startedAt, endedAt (and durationSeconds()), and the resolved handlerClass.


Queue pipeline (worker)

QueueWorker::processOnce($queue) drives one message end to end:

  1. fetchbackend->fetch($queue) leases one ready message, or returns null (status empty).

  2. verify — when verifyEnvelopeSignature is on and a key is configured, the HMAC _sig is verified over the canonical identity JSON with hash_equals(). A tampered/forged message is abandon()ed (status rejected). An envelope whose payload is not a valid object is also rejected.

  3. idempotency — if the envelope carries an idempotencyKey, IdempotencyGuard::firstRun() short-circuits a duplicate: ack() without running (status skipped-idempotent).

  4. run one attemptJobRuntime::run(definition, context) resolves the handler (enforcing the per-queue allowlist), acquires the single-instance lock if requested, applies the timeout, captures output, and returns an ExecutionResult. It runs once and never sleeps.

  5. decide (the retry decision lives here, not in the runtime):

    • success → ack() (status acked),

    • failure with retries left (attemptIndex < maxRetries) → nack($lease, $delay) where $delay comes from the RetryPolicy (status requeued); the backend requeues,

    • retries exhausted → abandon() (status dead-lettered). abandon() routes to the backend’s native dead-letter where one exists (Beanstalk bury / Service Bus after MaxDeliveryCount) and otherwise just drops the message (Database/Redis). The app-level DeadLetterQueue helper is opt-in and is not invoked by the worker.

Total runs for a job are therefore maxRetries + 1. Backoff is realised by the backend honouring nack($lease, $delaySeconds). See Retries.

                    ┌───────────────────────── QueueWorker::processOnce($queue) ─────────────────────────┐
                    │                                                                                     │
 fetch(lease) ──►  verify signature  ──►  idempotency guard  ──►  run ONE attempt (JobRuntime)           │
     │                   │                       │                        │                              │
   null              tampered                 duplicate                   │                              │
     │                   │                       │            ┌───────────┼───────────────┐              │
     ▼                   ▼                       ▼            ▼           ▼                 ▼              │
  "empty"           abandon()                 ack()        success   failure (retries)  retries exhausted │
                  "rejected"          "skipped-idempotent"   │            │                 │             │
                                                            ack()     nack(delay)        abandon()        │
                                                          "acked"   "requeued"        "dead-lettered"     │
                                                                  (backend requeues)  native DLQ or drop  │
                                                                                      (app DLQ is opt-in) │
                    └─────────────────────────────────────────────────────────────────────────────────┘

The CLI command jobs:queue:work wraps this loop with a circuit breaker, per-queue rate limits and graceful shutdown. See CLI Commands and Concurrency.


Cron pipeline

  1. jobs:cronjob:run builds a Scheduler and calls Config\Jobs::init($scheduler).

  2. CronRunner::run() walks the definitions in topological order of dependsOn() (Scheduler::getExecutionOrder() performs the sort and throws on circular/unknown dependencies).

  3. For each definition it skips disabled jobs (enabled()) and jobs outside the current environments(), then evaluates the cron expression against “now” (or a frozen -testTime).

  4. A due definition with a queue is enqueued via BackendFactory::make(...)->enqueue(); otherwise it runs inline through JobRuntime (one attempt). The runner never sleeps between jobs.

See Scheduling.


The QueueBackend contract

A single contract underpins all five backends:

interface QueueBackend
{
    /** Persist the definition into the queue. Returns the backend-assigned id. */
    public function enqueue(JobDefinition $definition): string;

    /** Lease one ready message, or null when the queue is empty (after any blocking timeout). */
    public function fetch(string $queue): ?JobLease;

    /** Mark the leased message processed; the backend removes it permanently. */
    public function ack(JobLease $lease): bool;

    /** Redeliver the message, optionally after $delaySeconds (retry backoff). */
    public function nack(JobLease $lease, ?int $delaySeconds = null): bool;

    /** Stop holding the lease without retrying (unprocessable / DLQ). */
    public function abandon(JobLease $lease): bool;

    /** Reclaim messages whose lease expired (crashed/stalled worker). Returns the count recovered. */
    public function reapExpired(string $queue, int $visibilityTimeout): int;
}

The contract is stateless with respect to the in-flight message: the worker holds the JobLease and passes it back to ack/nack/abandon, so the backend never has to remember which message a given worker is processing. This replaces the legacy split between a queue interface and a worker interface that coupled the two sides through backend instance state.

Backend

Notes

SyncBackend

enqueue() runs the job inline via JobRuntime and returns a synthetic sync-... id; fetch() is always null, the lease verbs are no-ops returning true. Ideal for tests/CLI.

DatabaseBackend

Atomic reservation; reapExpired() returns rows past the visibility timeout to a runnable state; requeue is an in-place UPDATE (no orphan rows).

RedisBackend

Reliable-queue pattern (waiting/processing lists) with owner tokens; nack(delay) uses a delayed ZSET; reapExpired() recovers stalled leases.

BeanstalkBackend

nack() re-serialises the payload with attempts+1 via delete + re-put (the native release verb cannot mutate the job body); the requested delay is applied on the re-put. abandon() buries the job (native DLQ). Per-job TTR makes the server recover stalled jobs natively (reapExpired() is a no-op).

ServiceBusBackend

Peek-lock with serviceBusLockTimeout; the broker redelivers on lock expiry.

reapExpired() is exercised by jobs:queue:reap for the Database and Redis backends; Beanstalk and Service Bus recover natively. See Queues & Workers.


At-least-once delivery

Every persistent backend provides at-least-once delivery, not exactly-once. A message can be delivered (and a handler can therefore run) more than once. This happens by design when:

  • a worker crashes after running a job but before ack() — the lease expires and the reaper (or the broker) makes the message eligible again;

  • a nack() requeues a failed attempt for retry;

  • a visibility timeout elapses while a slow job is still running (mitigated by setting the timeout above the maximum expected runtime, or by JobLease::renew()).

Implications for your handlers:

  1. Make handlers idempotent. Running twice should not corrupt state (use upserts, conditional writes, or dedupe on a natural key).

  2. Use idempotencyKey() for hard dedupe. When a definition carries an idempotencyKey, EnvelopeFactory::toWire() serialises it onto the envelope (as a signed identity field) and the worker, via IdempotencyGuard, skips a message whose key was already processed within idempotencyTtl — it acks without running (status skipped-idempotent). This is opt-in. Because delivery is at-least-once and the dedupe is best-effort under crash/redelivery, still keep handlers idempotent at the application level. See Idempotency in depth.

  3. Size visibility timeouts correctly. databaseVisibilityTimeout / redisProcessingVisibilityTimeout / serviceBusLockTimeout must exceed the longest job, or a live job may be reclaimed and run concurrently. See Configuration.

Warning: Owner tokens guard ack/nack so a previous owner cannot ack a reaped-and-reassigned message, but they do not prevent two runs of the same payload — that is the nature of at-least-once. Idempotency is the application-level guarantee.


Execution & resilience

  • One attempt per fetchJobRuntime runs the handler exactly once; the worker owns the retry decision. This eliminates the legacy double-retry (a coordinator loop and a requeue helper both consuming maxRetries).

  • Timeout that interruptsTimeout installs a SIGALRM handler that throws (with pcntl_async_signals(true)), so even CPU-bound code is interrupted at the deadline. Without pcntl it degrades to a documented soft check.

  • Idempotency — opt-in IdempotencyGuard deduplicates by key (TTL idempotencyTtl).

  • Single-instance lockSingleInstanceLock uses an ownership token so a release never frees a lock held by a different owner; contention surfaces as a failed result that the worker requeues.

  • Circuit breaker & rate limitjobs:queue:work skips a failing backend (circuitBreakerThreshold / circuitBreakerCooldown) and honours queueRateLimits.

  • Graceful shutdown — SIGTERM/SIGINT finish the current cycle and exit; the in-flight job is not aborted mid-run.

See Concurrency and Retries.


Security

  • Envelope signingEnvelopeFactory::toWire() signs the immutable identity fields with HMAC-SHA256 at enqueue; the worker rejects tampered/forged messages. Key resolution: Config\Jobs::$signingKeyenv('JOBS_SIGNING_KEY') → Encryption key.

  • Per-queue handler allowlistHandlerRegistry::resolveForQueue() refuses handler keys not listed for a queue (Config\Jobs::$queueHandlers).

  • ShellHandler deny-by-default — an empty $allowedShellCommands rejects everything; execution is via proc_open with an argv array (never /bin/sh -c).

  • EventHandler allowlist — only $allowedEvents may be fired (empty = deny all).

  • UrlHandler anti-SSRF — http/https only, private/reserved IPv4/IPv6 rejected, SSL verification forced, redirects disabled.

See Security.


Extending the architecture

  1. Custom handler — implement JobHandlerInterface (or extend AbstractJobHandler / TypedJobHandler) and register the key in Config\Jobs::$handlers. See Handlers.

  2. Custom backend — implement QueueBackend and register it in Config\Jobs::$backends. See Queues & Workers.

  3. Custom retry policy — implement RetryPolicy and inject it into QueueWorker. See Retries.

  4. Metrics — implement MetricsCollectorInterface and set Config\Jobs::$metricsCollector. See Configuration.