Skip to main content

BatchBuilder

Builders for processing collections of items as coordinated batch jobs. Ratchet provides two batch builders:

  • BatchBuilder -- for in-memory collections where the total size is known up front.
  • StreamingBatchBuilder<T> -- for large datasets read from a Stream, processed in chunks without loading everything into memory.

Package: run.ratchet.api

BatchBuilder

Obtained from JobSchedulerService.enqueueBatch().

forEach

<T extends Serializable> BatchBuilder forEach(
Collection<T> items, SerializableConsumer<T> action)

Applies an action to each item in the collection. Each item becomes a child job in the batch.

Type Parameters:

  • T -- the type of elements; must implement Serializable.

Parameters:

  • items -- the collection of items to process.
  • action -- the operation to perform on each item.

Returns: the BatchBuilder for chaining.

List<Long> orderIds = List.of(1L, 2L, 3L, 4L, 5L);

scheduler.enqueueBatch("Process Orders")
.forEach(orderIds, orderId -> orderService.process(orderId))
.submit();

onProgress

BatchBuilder onProgress(SerializableConsumer<BatchContext> hook)

Registers a progress hook invoked as child jobs complete. The hook receives a BatchContext snapshot with current progress metrics.

Parameters:

  • hook -- a consumer receiving BatchContext updates.

Returns: the BatchBuilder for chaining.

scheduler.enqueueBatch("Import Records")
.forEach(records, record -> importService.importRecord(record))
.onProgress(ctx -> {
log.info("Batch {}: {}% complete ({}/{} items, {} failed)",
ctx.batchId(), ctx.percentDone(),
ctx.completedItems(), ctx.totalItems(), ctx.failedItems());
})
.submit();

thenOnBatchSuccess

BatchBuilder thenOnBatchSuccess(SerializableCheckedRunnable next)

Schedules a job to execute when all child jobs complete successfully (zero failures).

Parameters:

  • next -- the task to execute on batch success.
scheduler.enqueueBatch("Nightly Sync")
.forEach(accounts, acct -> syncService.sync(acct))
.thenOnBatchSuccess(() -> notificationService.sendSyncComplete())
.submit();

thenOnBatchFailure

BatchBuilder thenOnBatchFailure(SerializableCheckedRunnable next)

Schedules a job to execute when one or more child jobs fail.

Parameters:

  • next -- the task to execute on batch failure.
scheduler.enqueueBatch("Data Migration")
.forEach(rows, row -> migrationService.migrate(row))
.thenOnBatchFailure(() -> alertService.migrationPartiallyFailed())
.submit();

thenWhenBatch

BatchBuilder thenWhenBatch(
SerializablePredicate<BatchContext> condition,
SerializableCheckedRunnable next)

Schedules a job when a custom condition on the BatchContext is met.

Parameters:

  • condition -- predicate evaluating the BatchContext.
  • next -- the task to execute when the condition is true.
scheduler.enqueueBatch("Process Items")
.forEach(items, item -> processItem(item))
.thenWhenBatch(ctx -> ctx.failedItems() == 0 && ctx.isComplete(),
() -> archiveResults())
.thenWhenBatch(ctx -> ctx.failedItems() > ctx.totalItems() / 2,
() -> rollbackProcessing())
.submit();

thenWhenSuccessRate

BatchBuilder thenWhenSuccessRate(double minRate, SerializableCheckedRunnable next)

Schedules a job when the success rate meets or exceeds the specified threshold.

Parameters:

  • minRate -- minimum success rate (0.0 to 1.0).
  • next -- the task to execute when the success rate condition is met.
scheduler.enqueueBatch("Email Campaign")
.forEach(recipients, r -> emailService.sendCampaign(r))
.thenWhenSuccessRate(0.95, () -> log.info("Campaign delivered successfully"))
.thenWhenSuccessRate(0.50, () -> alertService.campaignPartialFailure())
.submit();

thenWhenFailureCount

BatchBuilder thenWhenFailureCount(int maxFailures, SerializableCheckedRunnable next)

Schedules a job when the number of failures reaches the specified threshold.

Parameters:

  • maxFailures -- the failure count that triggers the action.
  • next -- the task to execute when the failure count is reached.
scheduler.enqueueBatch("Import Data")
.forEach(rows, row -> importRow(row))
.thenWhenFailureCount(10, () -> alertService.tooManyImportFailures())
.submit();

thenBranch

BatchBuilder thenBranch(WorkflowCondition condition,
SerializableCheckedRunnable next,
String description)

Adds a workflow branch with an explicit WorkflowCondition and description.

Parameters:

  • condition -- the WorkflowCondition determining when this branch fires.
  • next -- the task to execute.
  • description -- human-readable description for monitoring.
scheduler.enqueueBatch("Complex Batch")
.forEach(items, item -> processItem(item))
.thenBranch(
WorkflowCondition.batchCustom(ctx -> ctx.failedItems() > 5 && ctx.isComplete()),
() -> escalateToOps(),
"Escalate when more than 5 items fail")
.submit();

submit

JobHandle submit()

Submits the configured batch for execution.

Returns: a JobHandle for the batch parent job.

JobHandle handle = scheduler.enqueueBatch("My Batch")
.forEach(items, item -> processItem(item))
.submit();

log.info("Batch submitted with ID {}", handle.id());

StreamingBatchBuilder

Obtained from JobSchedulerService.streamingBatch(). Designed for large datasets where items are read from a Stream and inserted in configurable chunks.

fromStream

<U extends Serializable> StreamingBatchBuilder<U> fromStream(Stream<U> stream)

Sets the input data source for the batch.

Type Parameters:

  • U -- the type of items; must implement Serializable.

Parameters:

  • stream -- the stream of items to process.
scheduler.<Long>streamingBatch("Process Users")
.fromStream(userRepository.streamAllIds())
// ...

process

StreamingBatchBuilder<T> process(SerializableCheckedConsumer<T> action)

Configures the processing logic applied to each item. The action can throw checked exceptions.

Parameters:

  • action -- the processing action for each item.
scheduler.<Long>streamingBatch("Migrate Users")
.fromStream(userIds.stream())
.process(userId -> migrationService.migrateUser(userId))
// ...

withChunkSize

StreamingBatchBuilder<T> withChunkSize(int size)

Sets the number of items per database insert chunk. Default is 500. Tune this based on your database's bulk insert performance.

Parameters:

  • size -- items per chunk. Must be positive.
scheduler.<Record>streamingBatch("Bulk Import")
.fromStream(records.stream())
.process(record -> importRecord(record))
.withChunkSize(2000)
// ...

onProgress

StreamingBatchBuilder<T> onProgress(Consumer<StreamingBatchContext> hook)

Registers a progress hook called during the stream consumption phase (not during job execution). Receives a StreamingBatchContext with streaming progress.

Parameters:

  • hook -- a consumer receiving StreamingBatchContext updates.
scheduler.<Long>streamingBatch("Stream Import")
.fromStream(dataStream)
.process(item -> processItem(item))
.onProgress(ctx -> log.info("Streamed {} items in {} chunks",
ctx.processedItems(), ctx.chunksInserted()))
// ...

onBatchProgress

StreamingBatchBuilder<T> onBatchProgress(SerializableConsumer<BatchContext> hook)

Registers a progress hook called during job execution (after streaming is complete). Receives a BatchContext with execution progress.

Parameters:

  • hook -- a consumer receiving BatchContext updates.
scheduler.<Long>streamingBatch("Process Stream")
.fromStream(ids.stream())
.process(id -> processId(id))
.onBatchProgress(ctx -> log.info("Execution: {}% complete", ctx.percentDone()))
// ...

Workflow Methods

StreamingBatchBuilder supports the same workflow methods as BatchBuilder:

StreamingBatchBuilder<T> thenOnBatchSuccess(SerializableCheckedRunnable next)
StreamingBatchBuilder<T> thenOnBatchFailure(SerializableCheckedRunnable next)
StreamingBatchBuilder<T> thenWhenBatch(
SerializablePredicate<BatchContext> condition, SerializableCheckedRunnable next)
StreamingBatchBuilder<T> thenWhenFailureCount(int maxFailures, SerializableCheckedRunnable next)
StreamingBatchBuilder<T> thenWhenSuccessRate(double minRate, SerializableCheckedRunnable next)

start

JobHandle start()

Starts the streaming batch operation. The stream is consumed, items are inserted in chunks, and child jobs are created.

Returns: a JobHandle for the batch parent job.

JobHandle handle = scheduler.<Long>streamingBatch("Full Migration")
.fromStream(repository.streamAll())
.process(id -> migrationService.migrate(id))
.withChunkSize(1000)
.onProgress(ctx -> log.info("Streamed {} items", ctx.processedItems()))
.thenOnBatchSuccess(() -> log.info("Migration complete"))
.thenOnBatchFailure(() -> alertService.migrationFailed())
.start();

BatchBuilder vs StreamingBatchBuilder

AspectBatchBuilderStreamingBatchBuilder
InputCollection<T> (in memory)Stream<T> (lazy)
MemoryEntire collection loadedChunked, constant memory
Total known at startYesNo (stream not exhausted)
Progress during creationN/AStreamingBatchContext
Progress during executionBatchContextBatchContext
Best forSmall-medium collectionsLarge datasets, database cursors
Submit methodsubmit()start()

See Also