Skip to main content

Clustering

Ratchet can run on multiple nodes against the same store. Ordinary job claiming is coordinated through the database, SKIP LOCKED ensures no two nodes claim the same job, and store-backed leases serialize destructive startup cleanup. ClusterCoordinator remains optional and is only for fast cross-node wakeups.

Architecture

     ┌──────────┐     ┌──────────┐     ┌──────────┐
│ Node A │ │ Node B │ │ Node C │
│ │ │ │ │ │
│ Poller │ │ Poller │ │ Poller │
│ Workers │ │ Workers │ │ Workers │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
│ SKIP LOCKED │ SKIP LOCKED │ SKIP LOCKED
│ │ │
└────────────────┼────────────────┘

┌──────┴──────┐
│ Database │
│ │
│ scheduler_ │
│ job │
│ scheduler_ │
│ node │
│ scheduler_ │
│ lock │
└─────────────┘

Every node runs its own poller and workers. For ordinary job claiming, no node is special — the database is the single source of truth.

How Job Claiming Works

When a node's poller fires, it runs a SELECT ... FOR UPDATE SKIP LOCKED query:

SELECT * FROM scheduler_job
WHERE status = 'PENDING'
AND scheduled_time <= NOW()
ORDER BY (priority + age_boost) DESC, scheduled_time ASC
FOR UPDATE SKIP LOCKED
LIMIT :batchSize;
  • FOR UPDATE locks the selected rows
  • SKIP LOCKED skips rows already locked by another node's in-flight claim

This means each node gets a disjoint set of jobs — no duplicate execution, no distributed lock manager, no coordination protocol. The database handles it.

Optimized Claiming

Ratchet provides two claim paths:

MethodReturnsUse case
claimNextBatch(limit, nodeId)Full JobEntity with payloadWhen you need the complete job immediately
claimNextBatchOptimized(limit, nodeId)Lightweight JobClaimDtoWhen you want to claim fast and load payloads lazily

The optimized path skips deserializing large payload blobs during the claim query, reducing lock hold time in high-throughput clusters.

Recurring Job Claiming

Recurring master jobs have their own claim method:

claimDueRecurring(int limit, String nodeId)

This selects recurring jobs whose next_fire_time has arrived, claims them, and the engine creates a child instance for each execution cycle.

Node Identity and Heartbeats

Each node registers itself in the scheduler_node table:

ColumnDescription
node_idUnique identifier (hostname, UUID, or configured value)
last_heartbeatLast time this node checked in
started_atWhen the node first registered
node_infoOptional JSON metadata (version, IP, etc.)

The engine calls NodeStore.upsertHeartbeat(nodeId, timestamp) periodically. This creates the record on first call and updates it thereafter.

Failure Detection

Stale heartbeats indicate a dead or partitioned node:

// Find nodes that haven't heartbeated in 5 minutes
List<NodeEntity> dead = nodeStore.findInactiveNodesSince(
Instant.now().minus(Duration.ofMinutes(5))
);

// Clean up dead node records
nodeStore.deleteInactiveNodesSince(cutoff);

Jobs that were claimed by a dead node (status RUNNING, picked_by set to the dead node) can be reclaimed after the timeout expires — the poller will pick them up on the next cycle.

Clock Skew

The NodeStore.getDatabaseTime() method returns the database server's current timestamp. Nodes can compare this against their local clock to detect skew. Significant skew (>1 second) should be logged as a warning, since scheduling accuracy depends on clocks being reasonably synchronized.

Cluster Wakeup Notifications

By default, each node polls on its own interval. For time-sensitive jobs (CRITICAL priority, immediate submissions), Ratchet supports cross-node wakeup via the ClusterCoordinator SPI:

public interface ClusterCoordinator {
void notifyNewWork(JobPriority priority);
void registerWakeupListener(Runnable listener);
}

How Wakeup Works

  1. A job is submitted on Node A with CRITICAL priority (or .immediate())
  2. After the transaction commits, JobWakeupService publishes a JobWakeupNotification
  3. The ClusterCoordinator broadcasts it across nodes (JGroups, JMS, Redis pub/sub, etc.)
  4. All nodes receive the notification and immediately wake their pollers

Wakeup Triggers

Not every job triggers a wakeup. The JobWakeupService only notifies for:

  • CRITICAL priority jobs
  • Single jobs submitted with zero delay via user-triggered enqueue
  • Batch parent jobs (to start child distribution quickly)

Normal and low-priority jobs wait for the next poll cycle.

Default: No-Op

Out of the box, Ratchet uses NoOpClusterCoordinator. That is fine for any deployment that can tolerate poll-interval latency for cross-node wakeups, because correctness still comes from the store.

Example: JGroups Implementation

@ApplicationScoped
@Alternative
@Priority(1000)
public class JGroupsClusterCoordinator implements ClusterCoordinator {

private JChannel channel;
private final List<Runnable> listeners = new CopyOnWriteArrayList<>();

@PostConstruct
void init() throws Exception {
channel = new JChannel("jgroups-config.xml");
channel.setReceiver(new ReceiverAdapter() {
@Override
public void receive(Message msg) {
listeners.forEach(Runnable::run);
}
});
channel.connect("ratchet-cluster");
}

@Override
public void notifyNewWork(JobPriority priority) {
try {
channel.send(new ObjectMessage(null, "WAKEUP"));
} catch (Exception e) {
// Log and continue — wakeup is best-effort
}
}

@Override
public void registerWakeupListener(Runnable listener) {
listeners.add(listener);
}
}

Priority Boosting

Long-waiting low-priority jobs get promoted automatically. Each claim orders by raw priority plus floor(wait_minutes / priorityBoostIntervalMinutes) (default: 15).

Boosting is part of claim ordering only; persisted priority is not rewritten. Set RatchetOptions.builder().store(s -> s.priorityBoostIntervalMinutes(0)) to disable the boost.

Distributed Locks

The scheduler_lock table provides advisory locks for operations that must be cluster-wide singletons (e.g., recurring job registration, maintenance tasks):

ColumnDescription
lock_nameUnique lock identifier
locked_byNode that holds the lock
locked_atWhen the lock was acquired
expires_atTTL — lock auto-expires for crash safety

For MongoDB, the lock collection uses a TTL index on expires_at, so expired locks are garbage-collected automatically.

Sizing Guidelines

Cluster sizePoll intervalBatch sizeNotes
1 node1–5s10–50Single-node, no coordination needed
2–5 nodes2–5s10–20SKIP LOCKED handles contention well
5–20 nodes5–10s5–10Reduce batch size to spread work evenly
20+ nodes10–30s5Consider ClusterCoordinator for latency-sensitive work

The key tradeoff: shorter poll intervals mean lower latency but more database load. ClusterCoordinator lets you keep a longer default interval while still responding immediately to urgent work.

See Also