Concurrency & Resilience¶
When you run more than one worker (or one worker pulling a fast queue), several jobs execute at the same time. v3 ships three mechanisms to keep concurrent execution safe and to protect the worker and downstream systems from overload:
Single-instance locking — guarantee a named job never runs twice at once.
Circuit breaker — stop hammering a backend that is failing.
Rate limiting — cap how many jobs a queue processes per minute.
This page explains how each works, how it is wired, and how to configure it. See Configuration for the property reference and Security for the trust-boundary controls.
Single-instance locking¶
A job marked singleInstance() must never have two attempts running concurrently — for example a
nightly aggregation that would corrupt data if it overlapped with itself.
The mechanism is Daycry\Jobs\Execution\SingleInstanceLock, an ownership-token cache lock:
only the holder that acquired the lock can release it.
How it is wired in JobRuntime¶
JobRuntime::run() acquires the lock before invoking the handler and releases it in a finally
block, so a thrown exception or timeout still frees the lock:
$lockName = $context->name ?? $definition->name ?? $definition->handler;
$lockOwner = '';
$locked = false;
if ($definition->singleInstance) {
$lockOwner = bin2hex(random_bytes(16)); // ownership token unique to this attempt
if (! $this->lock->acquire($lockName, $lockOwner, max(120, $timeoutSeconds + 60))) {
// Already held by another attempt -> fail this one so the worker requeues it.
return new ExecutionResult(false, null,
"single-instance job '{$lockName}' is already running", $start, microtime(true), $handlerClass);
}
$locked = true;
}
try {
// beforeRun / handle (under timeout) / afterRun ...
} finally {
if ($locked) {
$this->lock->release($lockName, $lockOwner);
}
}
Key points:
Lock name is the logical job name (
context->name, elsedefinition->name, else the handler key). Two definitions with the same name share a lock.Ownership token is a fresh 16-byte random value per attempt.
release()only deletes the key when the stored owner matches, so a lock that was reaped and reassigned to another worker is never freed by the previous owner.TTL is
max(120, timeoutSeconds + 60)seconds. The lock outlives the job’s timeout, so a crashed worker’s lock eventually expires instead of wedging the job forever.
Contention behaviour → requeue¶
When acquire() returns false (the lock is held by a different owner), the attempt does not
block or sleep. It returns a failed ExecutionResult. The worker then treats it like any other
failure:
with retries left →
nack(delay)so the message is requeued and tried again later (by which time the holder may have finished);with retries exhausted →
abandon().
Note: Because contention is surfaced as a normal failure, give single-instance jobs a few
maxRetriesand a backoff strategy so a transient overlap simply retries instead of dead-lettering. See Retries.
Atomicity caveat¶
public function acquire(string $name, string $owner, int $ttl): bool
{
$cache = $this->resolveCache();
$key = self::KEY_PREFIX . $name; // 'jobs_lock_' . $name
$current = $cache->get($key);
if ($current !== null && $current !== $owner) {
return false;
}
$cache->save($key, $owner, $ttl);
return true;
}
The check-then-set is not strictly atomic unless the cache driver provides native SET NX
semantics. Between the read and the write a competing process could squeeze in. The ownership token
still guarantees that a release() never frees a lock held by a different owner — the property that
matters for cleanup correctness — but for the strongest mutual exclusion use a Redis-backed cache.
Example¶
use Daycry\Jobs\Jobs;
Jobs::define('command', 'app:rebuild-search-index')
->named('rebuild-search-index') // the lock name
->queue('maintenance')
->singleInstance()
->maxRetries(3)
->dispatch();
You can also mark scheduled jobs single-instance in Config\Jobs::init():
$scheduler->define('command', 'app:aggregate')
->named('nightly-aggregate')
->dailyAt('01:00')
->singleInstance();
Circuit breaker¶
When the queue backend itself is failing (broker down, database unreachable), retrying every cycle
just wastes resources and floods logs. Daycry\Jobs\Libraries\CircuitBreaker short-circuits the
worker for a cooldown period after a run of failures.
States¶
State |
Meaning |
|---|---|
|
Normal operation. Failures are counted. |
|
Backend considered unavailable. Cycles are skipped for the cooldown window. |
|
Cooldown elapsed; one probe is allowed to test recovery. |
State is stored in the cache service, so it persists across worker restarts (keyed
circuit_<name>_*).
Transitions¶
In
closed, eachrecordFailure()increments a counter. When it reachesfailureThreshold, the circuit trips toopenand stampsopened_at.In
open,isAvailable()returnsfalseuntilcooldownSecondshave elapsed; then it transitions tohalf_openand allows one probe through (returnstrue).In
half_open, arecordSuccess()resets toclosed; arecordFailure()re-opens the circuit.
How the worker uses it¶
jobs:queue:work builds a breaker named queue_<queue> per cycle and checks it before fetching:
$breaker = new CircuitBreaker('queue_' . $queue,
$this->config->circuitBreakerThreshold,
$this->config->circuitBreakerCooldown);
if (! $breaker->isAvailable()) {
CLI::write("[Circuit Open] backend for '{$queue}' temporarily unavailable.", 'red');
return 'circuit-open'; // worker sleeps pollInterval, then retries
}
try {
$result = $worker->processOnce($queue);
$breaker->recordSuccess();
} catch (Throwable $e) {
$breaker->recordFailure(); // a thrown backend error counts toward the threshold
return 'error';
}
Only thrown backend errors (e.g. a connection failure inside processOnce()) count as failures.
A job whose handler fails is a normal failed attempt and does not trip the breaker — handler
failures are managed by the retry/DLQ path instead.
Configuration¶
Property |
Default |
Description |
|---|---|---|
|
|
Consecutive backend failures before the circuit opens. |
|
|
Seconds the circuit stays open before allowing a probe. |
public int $circuitBreakerThreshold = 5;
public int $circuitBreakerCooldown = 60;
When the circuit is open the worker sleeps pollInterval seconds between cycles (the circuit-open
status is treated like an empty queue for sleep purposes).
Rate limiting¶
Config\Jobs::$queueRateLimits caps how many jobs a queue may process per minute, protecting
fragile downstream systems (third-party APIs, mail relays) from bursts.
Daycry\Jobs\Libraries\RateLimiter implements a cache-based, per-minute token window. The worker
checks it at the start of every cycle:
$limit = $this->config->queueRateLimits[$queue] ?? 0;
if ($limit > 0 && ! (new RateLimiter())->allow($queue, $limit)) {
CLI::write("[Rate Limited] '{$queue}' reached {$limit} jobs/min.", 'yellow');
return 'rate-limited'; // worker sleeps pollInterval, then retries the cycle
}
Configuration¶
Property |
Type |
Default |
Description |
|---|---|---|---|
|
|
|
Max jobs per minute per queue. |
public array $queueRateLimits = [
'mailers' => 60, // at most 60 emails/min
'thirdparty' => 100,
];
Accuracy and the atomic vs fallback path¶
On cache drivers with atomic
increment()(Redis, Memcached, APCu) the counter is server-side atomic. A burst of workers may overshoot the limit by at most one before the next call throttles correctly — the standard token-bucket relaxation.On drivers without atomic increment (file/dummy/null)
RateLimiterfalls back to a best-effort read-modify-write that can undercount by one under a race.
Note: For accurate production rate limiting use a Redis or Memcached cache handler. The limit is a throttle, not a hard quota — treat small overshoots as expected.
RateLimiter also exposes getUsage($queue), reset($queue), and a throttle() variant that
throws JobException::forRateLimitExceeded() instead of returning a boolean.
Putting it together¶
A single worker cycle (jobs:queue:work) applies the controls in this order:
Rate limit — if the queue is over its per-minute cap, skip (
rate-limited).Circuit breaker — if the backend circuit is open, skip (
circuit-open).Fetch + process one message through
QueueWorker(which verifies the signature, applies the single-instance lock insideJobRuntime, runs one attempt, and acks/nacks/abandons).Record success/failure on the breaker.
On
empty,rate-limited,circuit-open, orerror, sleeppollIntervalseconds before the next cycle.
For the attempt/retry model see Attempts and Retries; for tampering and handler-execution controls see Security.