JobBuilder
Fluent builder API for creating and configuring individual jobs. JobBuilder is obtained from JobSchedulerService.enqueue() or schedule() and provides methods for retry policies, priorities, timeouts, workflows, callbacks, tags, and parameters.
Package: run.ratchet.api
Type: Interface
Basic Usage
JobHandle handle = scheduler.enqueue(() -> orderService.process(orderId))
.withPriority(JobPriority.HIGH)
.withMaxRetries(3)
.withBackoff(BackoffPolicy.EXPONENTIAL, Duration.ofSeconds(10))
.withTimeout(Duration.ofMinutes(5))
.withTags("order-processing", "customer-123")
.withParam("orderId", String.valueOf(orderId))
.onSuccess(ctx -> log.info("Order {} processed", orderId))
.onFailure((ctx, error) -> alertService.sendAlert(error))
.submit();
Configuration Methods
withPriority
JobBuilder withPriority(JobPriority priority)
Sets the execution priority. Higher priority jobs are executed before lower priority ones when multiple jobs are queued.
Parameters:
priority-- the priority level. One ofLOWEST,LOW,NORMAL(default),HIGH,CRITICAL.
scheduler.enqueue(() -> criticalService.process())
.withPriority(JobPriority.CRITICAL)
.submit();
withMaxRetries
JobBuilder withMaxRetries(int retries)
Sets the maximum number of retry attempts after failure. The default is 0 (no retries).
Parameters:
retries-- maximum retry count. Must be non-negative.
scheduler.enqueue(() -> externalApi.call())
.withMaxRetries(5)
.submit();
withBackoff
JobBuilder withBackoff(BackoffPolicy policy, Duration param)
Configures the backoff strategy applied between retry attempts. Only relevant when maxRetries > 0.
Parameters:
policy-- the backoff strategy:NONE(immediate retry),FIXED(constant delay), orEXPONENTIAL(doubling delay).param-- the base delay duration. ForFIXED, this is the constant delay. ForEXPONENTIAL, this is the initial delay.
// Exponential backoff starting at 2 seconds
scheduler.enqueue(() -> paymentService.charge(amount))
.withMaxRetries(4)
.withBackoff(BackoffPolicy.EXPONENTIAL, Duration.ofSeconds(2))
.submit();
// Retries at: 2s, 4s, 8s, 16s
withTimeout
JobBuilder withTimeout(Duration timeout)
Sets the maximum execution duration. Jobs exceeding this limit are forcibly terminated and marked as failed.
Parameters:
timeout-- the maximum execution duration. UseDuration.ZEROto disable timeout.
scheduler.enqueue(() -> longRunningImport.execute())
.withTimeout(Duration.ofMinutes(30))
.submit();
withTags
JobBuilder withTags(String... tags)
Adds one or more tags to the job. Tags are trimmed, converted to lowercase, and stored only if non-null and non-blank. Tags enable filtering and categorization.
Parameters:
tags-- one or more tag strings.
scheduler.enqueue(() -> reportService.generate(reportId))
.withTags("reports", "finance", "q4-2024")
.submit();
withParam
JobBuilder withParam(String key, String value)
Adds a key-value parameter accessible during execution via JobContext.param(). Parameters provide lightweight configuration without serializing complex objects.
Parameters:
key-- parameter key. Must not be null or blank.value-- parameter value. Must not be null.
scheduler.enqueue(() -> emailService.send())
.withParam("recipient", "user@example.com")
.withParam("template", "welcome")
.submit();
// Inside the job:
// String recipient = JobContext.current().param("recipient");
withResource
JobBuilder withResource(String resourceName)
Specifies a shared resource that this job requires. The job will acquire a permit from the resource pool before execution. If no permits are available, the job is rescheduled with a delay.
This enables limiting concurrent access to shared resources regardless of job type (e.g., limiting concurrent API calls to a payment gateway to 5 total).
Parameters:
resourceName-- the name of the resource to acquire. If null or blank, no resource limiting is applied.
scheduler.enqueue(() -> paymentGateway.charge(paymentId))
.withResource("payment-api")
.submit();
withIdempotencyKey
JobBuilder withIdempotencyKey(String key)
Overrides the auto-generated idempotency key. By default, a UUID is generated at builder creation time. A custom idempotency key is globally unique -- once used, that key is consumed forever, preventing duplicate job creation.
Parameters:
key-- the idempotency key. If null or blank, keeps the auto-generated UUID.
Difference from withBusinessKey:
idempotencyKeyis UNIQUE globally -- once used, that key is consumed forever.businessKeyonly blocks active (PENDING/RUNNING) jobs -- allows re-runs after completion.
// Webhook handler: same delivery ID = same job forever
scheduler.enqueue(() -> webhookHandler.process(payload))
.withIdempotencyKey(webhookDeliveryId)
.submit();
withBusinessKey
JobBuilder withBusinessKey(String key)
Sets a business key for preventing concurrent execution against the same entity. Unlike withIdempotencyKey, the business key allows multiple completed jobs with the same key over time -- it only blocks when an active (PENDING/RUNNING) job exists with the same key.
Parameters:
key-- the business key. If null or blank, no concurrent execution blocking is applied.
// Only one sync per user at a time, re-runs allowed after completion
scheduler.enqueue(() -> syncService.syncUser(userId))
.withBusinessKey("sync-user-" + userId)
.submit();
immediate
JobBuilder immediate()
Marks the job for immediate execution notification. The scheduler publishes a wakeup to all cluster nodes, bypassing the normal adaptive polling delay.
Jobs with CRITICAL priority or zero delay are automatically treated as immediate. Use this method explicitly when you need immediate behavior for other configurations.
scheduler.enqueue(() -> urgentService.handle(alert))
.immediate()
.submit();
Callback Methods
onSuccess
JobBuilder onSuccess(SerializableConsumer<JobContext> s)
Registers a callback invoked after successful job completion. The callback receives the JobContext of the completed job.
Parameters:
s-- success callback accepting aJobContext.
scheduler.enqueue(() -> importService.importData(batchId))
.onSuccess(ctx -> log.info("Job {} completed", ctx.jobId()))
.submit();
onFailure
JobBuilder onFailure(SerializableBiConsumer<JobContext, Throwable> f)
Registers a callback invoked if the job fails. The callback receives both the JobContext and the Throwable that caused the failure.
Parameters:
f-- failure callback accepting aJobContextandThrowable.
scheduler.enqueue(() -> riskyService.execute())
.onFailure((ctx, error) -> {
alertService.page("Job " + ctx.jobId() + " failed: " + error.getMessage());
})
.submit();
Workflow Methods
then
JobBuilder then(SerializableCheckedRunnable next)
Adds a task to the sequential chain. Chain tasks execute in order after the primary task completes successfully. If any task in the chain fails, subsequent tasks are not executed (unless configured with workflow branches).
Parameters:
next-- the next task in the chain. Must not be null.
scheduler.enqueue(() -> validateData())
.then(() -> processData())
.then(() -> generateReport())
.then(() -> sendNotification())
.submit();
thenOnSuccess
JobBuilder thenOnSuccess(SerializableCheckedRunnable next)
Schedules a separate job to execute if the current job succeeds. This creates a workflow branch with a SUCCESS condition. Unlike then(), this creates an independent job, not a chain step.
Parameters:
next-- the task to execute on success as a separate job.
scheduler.enqueue(() -> processPayment(orderId))
.thenOnSuccess(() -> sendReceipt(orderId))
.thenOnFailure(() -> refundPayment(orderId))
.submit();
thenOnFailure
JobBuilder thenOnFailure(SerializableCheckedRunnable next)
Schedules a separate job to execute if the current job fails (after all retries are exhausted). This creates a workflow branch with a FAILURE condition.
Parameters:
next-- the task to execute on failure as a separate job.
scheduler.enqueue(() -> processPayment(orderId))
.thenOnFailure(() -> alertTeam(orderId))
.submit();
when
<T> JobBuilder when(
SerializablePredicate<JobResult<T>> condition,
SerializableCheckedRunnable next)
Schedules a job to execute when a custom condition is met. The condition receives the full JobResult<T> of the current job.
Type Parameters:
T-- the type of the job result.
Parameters:
condition-- predicate evaluated against theJobResult.next-- the task to execute when the condition is true.
scheduler.enqueue(() -> analyzeData())
.when(result -> result.isSuccess() && result.getExecutionTimeMs() > 60000,
() -> alertSlowExecution())
.when(result -> result.isFailure(),
() -> notifyAdmins())
.submit();
when (with priority)
<T> JobBuilder when(
SerializablePredicate<JobResult<T>> condition,
SerializableCheckedRunnable next,
int priority)
Same as when() but with an explicit evaluation priority. Lower priority numbers are evaluated first.
Parameters:
condition-- predicate evaluated against theJobResult.next-- the task to execute when the condition is true.priority-- evaluation priority (lower = higher priority, default is 0).
scheduler.enqueue(() -> processOrder())
.when(result -> result.isSuccess(), () -> confirmOrder(), 0) // evaluated first
.when(result -> result.isFailure(), () -> cancelOrder(), 1) // evaluated second
.submit();
whenResult
<T> JobBuilder whenResult(
SerializableFunction<T, Boolean> condition,
SerializableCheckedRunnable next)
Schedules a job based on the return value of the current job. The condition function receives only the value, not the full JobResult.
Type Parameters:
T-- the type of the job's return value.
Parameters:
condition-- function that evaluates the return value and returns a boolean.next-- the task to execute when the condition returns true.
scheduler.enqueue(() -> inventoryService.checkStock(itemId))
.whenResult(stock -> stock > 100, () -> placeOrder(itemId))
.whenResult(stock -> stock > 0 && stock <= 100, () -> alertLowStock(itemId))
.whenResult(stock -> stock == 0, () -> markOutOfStock(itemId))
.submit();
branch
JobBuilder branch(WorkflowCondition condition,
SerializableCheckedRunnable next,
String description)
Adds a workflow branch with an explicit WorkflowCondition and a human-readable description for monitoring and debugging.
Parameters:
condition-- the workflow condition determining when this branch fires.next-- the task to execute.description-- human-readable description for logs and dashboards.
scheduler.enqueue(() -> analyzeMetrics())
.branch(WorkflowCondition.custom(r -> r.getExecutionTimeMs() > 30000),
() -> alertSlowJob(),
"Alert ops when analysis takes over 30 seconds")
.submit();
Submission
submit
JobHandle submit()
Submits the configured job (including the primary task, chain tasks, and workflow branches) to the scheduler for persistence and execution.
Returns: a JobHandle containing the unique job ID.
JobHandle handle = scheduler.enqueue(() -> processData())
.withMaxRetries(3)
.submit();
log.info("Submitted job {}", handle.id());
Accessor Methods
These methods allow reading the configured state of a builder. They are primarily used internally by the scheduler but are part of the public interface.
| Method | Return Type | Description |
|---|---|---|
task() | SerializableCheckedRunnable | The primary task |
chainTasks() | List<SerializableCheckedRunnable> | Immutable list of chained tasks |
workflowBranches() | List<WorkflowBranch> | Immutable list of workflow branches |
opts() | JobOptions | Current job options (defaults to JobOptions.defaults()) |
params() | Map<String, String> | Unmodifiable parameter map |
tags() | List<String> | Unmodifiable tag list |
delay() | Duration | Delay before execution (never null, may be Duration.ZERO) |
idempotencyKey() | String | Idempotency key (never null -- auto-generated UUID or custom) |
businessKey() | String | Business key, or null if not set |
resourceName() | String | Resource name, or null if not set |
isImmediate() | boolean | Whether immediate wakeup is requested |
onSuccess() | SerializableConsumer<JobContext> | Success callback, or null |
onFailure() | SerializableBiConsumer<JobContext, Throwable> | Failure callback, or null |
Example: Complete Configuration
JobHandle handle = scheduler.enqueue(() -> paymentService.charge(orderId, amount))
// Execution options
.withPriority(JobPriority.HIGH)
.withMaxRetries(3)
.withBackoff(BackoffPolicy.EXPONENTIAL, Duration.ofSeconds(5))
.withTimeout(Duration.ofMinutes(2))
// Resource limiting
.withResource("payment-gateway")
// Identity
.withBusinessKey("charge-" + orderId)
.withIdempotencyKey(requestId)
// Metadata
.withTags("payments", "orders")
.withParam("orderId", String.valueOf(orderId))
.withParam("amount", amount.toString())
// Callbacks
.onSuccess(ctx -> log.info("Payment charged for order {}", ctx.param("orderId")))
.onFailure((ctx, err) -> alertService.paymentFailed(ctx.param("orderId"), err))
// Workflow branches
.thenOnSuccess(() -> fulfillmentService.startFulfillment(orderId))
.thenOnFailure(() -> orderService.markPaymentFailed(orderId))
// Immediate cluster notification
.immediate()
.submit();