BatchBuilder
Builders for processing collections of items as coordinated batch jobs. Ratchet provides two batch builders:
BatchBuilder-- for in-memory collections where the total size is known up front.StreamingBatchBuilder<T>-- for large datasets read from aStream, processed in chunks without loading everything into memory.
Package: run.ratchet.api
BatchBuilder
Obtained from JobSchedulerService.enqueueBatch().
forEach
<T extends Serializable> BatchBuilder forEach(
Collection<T> items, SerializableConsumer<T> action)
Applies an action to each item in the collection. Each item becomes a child job in the batch.
Type Parameters:
T-- the type of elements; must implementSerializable.
Parameters:
items-- the collection of items to process.action-- the operation to perform on each item.
Returns: the BatchBuilder for chaining.
List<Long> orderIds = List.of(1L, 2L, 3L, 4L, 5L);
scheduler.enqueueBatch("Process Orders")
.forEach(orderIds, orderId -> orderService.process(orderId))
.submit();
onProgress
BatchBuilder onProgress(SerializableConsumer<BatchContext> hook)
Registers a progress hook invoked as child jobs complete. The hook receives a BatchContext snapshot with current progress metrics.
Parameters:
hook-- a consumer receivingBatchContextupdates.
Returns: the BatchBuilder for chaining.
scheduler.enqueueBatch("Import Records")
.forEach(records, record -> importService.importRecord(record))
.onProgress(ctx -> {
log.info("Batch {}: {}% complete ({}/{} items, {} failed)",
ctx.batchId(), ctx.percentDone(),
ctx.completedItems(), ctx.totalItems(), ctx.failedItems());
})
.submit();
thenOnBatchSuccess
BatchBuilder thenOnBatchSuccess(SerializableCheckedRunnable next)
Schedules a job to execute when all child jobs complete successfully (zero failures).
Parameters:
next-- the task to execute on batch success.
scheduler.enqueueBatch("Nightly Sync")
.forEach(accounts, acct -> syncService.sync(acct))
.thenOnBatchSuccess(() -> notificationService.sendSyncComplete())
.submit();
thenOnBatchFailure
BatchBuilder thenOnBatchFailure(SerializableCheckedRunnable next)
Schedules a job to execute when one or more child jobs fail.
Parameters:
next-- the task to execute on batch failure.
scheduler.enqueueBatch("Data Migration")
.forEach(rows, row -> migrationService.migrate(row))
.thenOnBatchFailure(() -> alertService.migrationPartiallyFailed())
.submit();
thenWhenBatch
BatchBuilder thenWhenBatch(
SerializablePredicate<BatchContext> condition,
SerializableCheckedRunnable next)
Schedules a job when a custom condition on the BatchContext is met.
Parameters:
condition-- predicate evaluating theBatchContext.next-- the task to execute when the condition is true.
scheduler.enqueueBatch("Process Items")
.forEach(items, item -> processItem(item))
.thenWhenBatch(ctx -> ctx.failedItems() == 0 && ctx.isComplete(),
() -> archiveResults())
.thenWhenBatch(ctx -> ctx.failedItems() > ctx.totalItems() / 2,
() -> rollbackProcessing())
.submit();
thenWhenSuccessRate
BatchBuilder thenWhenSuccessRate(double minRate, SerializableCheckedRunnable next)
Schedules a job when the success rate meets or exceeds the specified threshold.
Parameters:
minRate-- minimum success rate (0.0 to 1.0).next-- the task to execute when the success rate condition is met.
scheduler.enqueueBatch("Email Campaign")
.forEach(recipients, r -> emailService.sendCampaign(r))
.thenWhenSuccessRate(0.95, () -> log.info("Campaign delivered successfully"))
.thenWhenSuccessRate(0.50, () -> alertService.campaignPartialFailure())
.submit();
thenWhenFailureCount
BatchBuilder thenWhenFailureCount(int maxFailures, SerializableCheckedRunnable next)
Schedules a job when the number of failures reaches the specified threshold.
Parameters:
maxFailures-- the failure count that triggers the action.next-- the task to execute when the failure count is reached.
scheduler.enqueueBatch("Import Data")
.forEach(rows, row -> importRow(row))
.thenWhenFailureCount(10, () -> alertService.tooManyImportFailures())
.submit();
thenBranch
BatchBuilder thenBranch(WorkflowCondition condition,
SerializableCheckedRunnable next,
String description)
Adds a workflow branch with an explicit WorkflowCondition and description.
Parameters:
condition-- theWorkflowConditiondetermining when this branch fires.next-- the task to execute.description-- human-readable description for monitoring.
scheduler.enqueueBatch("Complex Batch")
.forEach(items, item -> processItem(item))
.thenBranch(
WorkflowCondition.batchCustom(ctx -> ctx.failedItems() > 5 && ctx.isComplete()),
() -> escalateToOps(),
"Escalate when more than 5 items fail")
.submit();
submit
JobHandle submit()
Submits the configured batch for execution.
Returns: a JobHandle for the batch parent job.
JobHandle handle = scheduler.enqueueBatch("My Batch")
.forEach(items, item -> processItem(item))
.submit();
log.info("Batch submitted with ID {}", handle.id());
StreamingBatchBuilder
Obtained from JobSchedulerService.streamingBatch(). Designed for large datasets where items are read from a Stream and inserted in configurable chunks.
fromStream
<U extends Serializable> StreamingBatchBuilder<U> fromStream(Stream<U> stream)
Sets the input data source for the batch.
Type Parameters:
U-- the type of items; must implementSerializable.
Parameters:
stream-- the stream of items to process.
scheduler.<Long>streamingBatch("Process Users")
.fromStream(userRepository.streamAllIds())
// ...
process
StreamingBatchBuilder<T> process(SerializableCheckedConsumer<T> action)
Configures the processing logic applied to each item. The action can throw checked exceptions.
Parameters:
action-- the processing action for each item.
scheduler.<Long>streamingBatch("Migrate Users")
.fromStream(userIds.stream())
.process(userId -> migrationService.migrateUser(userId))
// ...
withChunkSize
StreamingBatchBuilder<T> withChunkSize(int size)
Sets the number of items per database insert chunk. Default is 500. Tune this based on your database's bulk insert performance.
Parameters:
size-- items per chunk. Must be positive.
scheduler.<Record>streamingBatch("Bulk Import")
.fromStream(records.stream())
.process(record -> importRecord(record))
.withChunkSize(2000)
// ...
onProgress
StreamingBatchBuilder<T> onProgress(Consumer<StreamingBatchContext> hook)
Registers a progress hook called during the stream consumption phase (not during job execution). Receives a StreamingBatchContext with streaming progress.
Parameters:
hook-- a consumer receivingStreamingBatchContextupdates.
scheduler.<Long>streamingBatch("Stream Import")
.fromStream(dataStream)
.process(item -> processItem(item))
.onProgress(ctx -> log.info("Streamed {} items in {} chunks",
ctx.processedItems(), ctx.chunksInserted()))
// ...
onBatchProgress
StreamingBatchBuilder<T> onBatchProgress(SerializableConsumer<BatchContext> hook)
Registers a progress hook called during job execution (after streaming is complete). Receives a BatchContext with execution progress.
Parameters:
hook-- a consumer receivingBatchContextupdates.
scheduler.<Long>streamingBatch("Process Stream")
.fromStream(ids.stream())
.process(id -> processId(id))
.onBatchProgress(ctx -> log.info("Execution: {}% complete", ctx.percentDone()))
// ...
Workflow Methods
StreamingBatchBuilder supports the same workflow methods as BatchBuilder:
StreamingBatchBuilder<T> thenOnBatchSuccess(SerializableCheckedRunnable next)
StreamingBatchBuilder<T> thenOnBatchFailure(SerializableCheckedRunnable next)
StreamingBatchBuilder<T> thenWhenBatch(
SerializablePredicate<BatchContext> condition, SerializableCheckedRunnable next)
StreamingBatchBuilder<T> thenWhenFailureCount(int maxFailures, SerializableCheckedRunnable next)
StreamingBatchBuilder<T> thenWhenSuccessRate(double minRate, SerializableCheckedRunnable next)
start
JobHandle start()
Starts the streaming batch operation. The stream is consumed, items are inserted in chunks, and child jobs are created.
Returns: a JobHandle for the batch parent job.
JobHandle handle = scheduler.<Long>streamingBatch("Full Migration")
.fromStream(repository.streamAll())
.process(id -> migrationService.migrate(id))
.withChunkSize(1000)
.onProgress(ctx -> log.info("Streamed {} items", ctx.processedItems()))
.thenOnBatchSuccess(() -> log.info("Migration complete"))
.thenOnBatchFailure(() -> alertService.migrationFailed())
.start();
BatchBuilder vs StreamingBatchBuilder
| Aspect | BatchBuilder | StreamingBatchBuilder |
|---|---|---|
| Input | Collection<T> (in memory) | Stream<T> (lazy) |
| Memory | Entire collection loaded | Chunked, constant memory |
| Total known at start | Yes | No (stream not exhausted) |
| Progress during creation | N/A | StreamingBatchContext |
| Progress during execution | BatchContext | BatchContext |
| Best for | Small-medium collections | Large datasets, database cursors |
| Submit method | submit() | start() |