Clustering
Ratchet is designed to run on multiple nodes without additional coordination infrastructure. The database serves as the shared state, SKIP LOCKED ensures safe concurrent job claiming, and scheduler_lock provides singleton execution for recurring scans. For enhanced responsiveness, the ClusterCoordinator SPI enables cross-node wakeup notifications.
Multi-Node Architecture
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Node A │ │ Node B │ │ Node C │
│ │ │ │ │ │
│ Poller │ │ Poller │ │ Poller │
│ Workers │ │ Workers │ │ Workers │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
│ SKIP LOCKED │ SKIP LOCKED │ SKIP LOCKED
│ │ │
└────────────────┼────────────────┘
│
┌──────┴──────┐
│ Database │
│ │
│ scheduler_ │
│ job │
│ scheduler_ │
│ node │
│ scheduler_ │
│ lock │
└─────────────┘
Each node runs its own Poller, claims its own subset of jobs, and executes them independently. No node is special -- there is no "master" or "coordinator" node. The database is the single source of truth.
How SKIP LOCKED Provides Cluster Safety
The core guarantee: no two nodes will claim the same job. This is enforced at the database level through SELECT ... FOR UPDATE SKIP LOCKED:
-- Node A runs:
SELECT * FROM scheduler_job
WHERE status = 'PENDING' AND scheduled_time <= NOW()
ORDER BY (priority + age_boost) DESC, scheduled_time ASC
FOR UPDATE SKIP LOCKED
LIMIT 50;
-- Node B runs the same query simultaneously:
-- Jobs locked by Node A are silently skipped
-- Node B gets a disjoint set of jobs
SQL stores use SKIP LOCKED, and the MongoDB store uses atomic document updates. These are the foundation of Ratchet's multi-node execution -- no external coordination service is required for ordinary job claiming.
StartupCoordinator
Destructive startup tasks are coordinated separately from wakeups. Ratchet's default StartupCoordinator uses a store-backed lease so only one node performs recurring-annotation orphan cleanup during startup:
@Incubating
public interface StartupCoordinator {
boolean tryAcquire(String actionName, Duration leaseTtl);
void release(String actionName);
}
This is distinct from ClusterCoordinator: startup cleanup uses store-backed leases by default, while ClusterCoordinator remains optional and wakeup-focused.
ClusterCoordinator SPI
While SKIP LOCKED guarantees correctness, it doesn't guarantee responsiveness. Without coordination, a job submitted on Node A won't be noticed by Node B until Node B's next poll cycle (which could be up to 60 seconds in deep idle).
The ClusterCoordinator SPI bridges this gap:
@Incubating
public interface ClusterCoordinator {
void notifyNewWork(JobPriority priority);
void registerWakeupListener(Runnable listener);
}
How It Works
- When a job is submitted with immediate or CRITICAL priority, the engine calls
ClusterCoordinator.notifyNewWork(priority) - The coordinator broadcasts this notification to all cluster nodes
- Each node's Poller has registered a wakeup listener via
registerWakeupListener() - When the notification arrives, the Poller exits deep idle and enters burst mode (500ms polling)
Default: NoOpClusterCoordinator
The default implementation does nothing -- wakeup notifications are only delivered locally. This is correct for single-node deployments and acceptable for multi-node deployments where sub-second latency isn't critical.
With the no-op coordinator, jobs submitted on Node A will be picked up by Node B at the next natural poll cycle. The adaptive polling algorithm ensures this happens within seconds during active periods, or up to 60 seconds during deep idle.
Implementing a ClusterCoordinator
You can implement ClusterCoordinator using any messaging technology available in your environment:
JMS-based (Jakarta EE native): publish a best-effort wakeup message to a shared topic and invoke registered listeners from a message-driven bean or other container-managed consumer.
Infinispan/JGroups-based:
@Alternative
@Priority(APPLICATION)
@ApplicationScoped
public class InfinispanClusterCoordinator implements ClusterCoordinator {
@Inject
private CacheContainer cacheContainer;
private Runnable wakeupListener;
@Override
public void notifyNewWork(JobPriority priority) {
// Publish to Infinispan cluster-wide topic
cacheContainer.getCache("ratchet-wakeup")
.put("wakeup-" + System.currentTimeMillis(), priority.name());
}
@Override
public void registerWakeupListener(Runnable listener) {
this.wakeupListener = listener;
// Register Infinispan listener for cache events
}
}
The SPI is intentionally minimal -- just notifyNewWork and registerWakeupListener -- so it can be implemented over any pub/sub mechanism.
Node Identity
Each node needs a unique identifier for job claiming (picked_by), distributed locks, and heartbeat registration. The NodeIdentityProvider SPI provides this:
@Incubating
public interface NodeIdentityProvider {
String getNodeId();
}
Default: DefaultNodeIdentityProvider
The default implementation constructs the node ID from hostname + PID, providing uniqueness across machines and across multiple instances on the same machine.
Custom Implementation
For environments where hostname-based IDs aren't unique or stable (e.g., containers with synthetic hostnames), provide a custom implementation:
@Alternative
@Priority(APPLICATION)
@ApplicationScoped
public class KubernetesNodeIdentityProvider implements NodeIdentityProvider {
private final String nodeId;
public KubernetesNodeIdentityProvider() {
// Use Kubernetes pod name as node ID
this.nodeId = System.getenv("HOSTNAME");
}
@Override
public String getNodeId() {
return nodeId;
}
}
The node ID must be:
- Unique across all nodes in the cluster
- Stable for the lifetime of the application instance
- Reasonable length (stored in
VARCHAR(64)columns)
Node Registration and Heartbeat
Nodes register themselves in the scheduler_node table on startup and update their heartbeat_ts timestamp periodically via heartbeats. The DynamicHeartbeatCalculator adjusts the heartbeat interval based on system load.
scheduler_node:
node_id │ heartbeat_ts │ started_at
──────────┼──────────────────────┼──────────────────────
node-a │ 2024-03-15 10:00:05 │ 2024-03-15 09:58:00
node-b │ 2024-03-15 10:00:03 │ 2024-03-15 09:58:30
node-c │ 2024-03-15 09:55:00 │ 2024-03-15 09:40:00
Orphan Recovery
If a node crashes without gracefully shutting down, its jobs remain in RUNNING status with picked_by set to the dead node. The OrphanRecoveryTimer detects these orphaned jobs:
- Scans for RUNNING jobs whose
picked_bynode hasn't heartbeated recently - Resets orphaned jobs to PENDING status
- Clears the
picked_byandpicked_atfields - The jobs become eligible for polling by any healthy node
This ensures jobs are never permanently lost due to node failures. The recovery interval and stale threshold are configurable.
Distributed Locking
Certain operations must execute on only one node at a time:
- DLQ purge
- Recurring job scheduling
- Job archiving
Ratchet uses database-backed distributed locks via the LockStore SPI:
boolean acquired = lockStore.tryLock(
"dlqPurger",
Duration.ofMinutes(10),
nodeIdentityProvider.getNodeId()
);
The lock is stored in the scheduler_lock table with:
- Lock name (e.g., "dlqPurger")
- Owning node ID
- Expiration time (auto-releases if the holder crashes)
Locks expire automatically after the specified duration, preventing deadlocks if the holder crashes without releasing.
Recurring Job Coordination
Recurring jobs require special handling in a cluster. Without coordination, multiple nodes would each create the next recurring execution, resulting in duplicate runs.
Ratchet handles this through the RecurringScheduler, which uses:
- Business key uniqueness: Each recurring job's business key (from
@Recurring.id()or the programmaticwithBusinessKey()) is active-unique. Only one active (PENDING/RUNNING) job can exist with a given business key. - Distributed locking: The recurring scheduler acquires a lock before scheduling the next execution, preventing race conditions between nodes.
- Orphan annotation maintenance: The
RecurringAnnotationMaintenanceServicecleans up recurring jobs from removed@Recurringannotations on redeployment.
Redeployment Handling
When a new version is deployed with changed @Recurring annotations:
- New recurring jobs are registered for new/modified annotations
- Old recurring jobs for removed annotations are canceled via
cancelRecurringJobByBusinessKey() - The business key ensures at-most-one active instance during the transition
// Cancel recurring jobs by tag (e.g., during feature toggle)
int canceled = scheduler.cancelRecurringJobsByTag("deprecated-feature");
// Cancel by business key (e.g., replacing a specific schedule)
int canceled = scheduler.cancelRecurringJobByBusinessKey("hourly-report");
Graceful Shutdown
The SchedulerLifecycleManager and DrainController coordinate graceful shutdown:
- Drain mode: The
DrainControllersignals the Poller to stop claiming new jobs - In-flight completion: Already-running jobs are allowed to finish within a timeout
- Cache cleanup:
JobTask.clearCaches()releases classloader references - Poller stop: The Poller's scheduling loop terminates
- Heartbeat stop: Node heartbeat ceases
This prevents the "half-executed job" problem during rolling deployments. Kubernetes readiness probes can check the drain controller's status to remove the node from the load balancer before shutdown begins.
Consistency Guarantees
| Guarantee | Mechanism |
|---|---|
| No duplicate execution | SKIP LOCKED + @Version optimistic locking |
| At-least-once delivery | Orphan recovery resets stale RUNNING jobs |
| Idempotency | idempotency_key UNIQUE constraint |
| Active-unique business key | Partial unique index on active statuses |
| Single recurring instance | Business key + distributed lock |
| Crash recovery | Orphan recovery timer + lock expiration |
Ratchet provides at-least-once delivery semantics. In rare cases (node crash after execution but before status update), a job may execute twice. If your job logic requires exactly-once semantics, implement idempotency in your business logic (e.g., using the job's idempotency key).
Related
- Architecture Overview -- Module structure and SPI overview
- Execution Model -- Store-backed claiming details
- Persistence -- Node table, lock table, UUIDv7 generation
- Scheduling -- Recurring job scheduling and adaptive polling