Skip to main content

BatchContext Reference

Immutable context records providing progress information during batch operations. Ratchet provides two context types:

  • BatchContext -- progress during batch job execution (how many items have completed/failed).
  • StreamingBatchContext -- progress during the stream consumption phase (how many items have been read and chunked).

Package: run.ratchet.api

BatchContext

A Java record representing the state and progress of a batch operation. Passed to progress hooks and workflow conditions during batch execution.

public record BatchContext(
UUID batchId,
int totalItems,
int completedItems,
int failedItems
)

Record Components

ComponentTypeDescription
batchIdUUIDUUIDv7 identifier of the batch parent job
totalItemsintTotal number of child jobs in the batch
completedItemsintNumber of child jobs completed successfully
failedItemsintNumber of child jobs that have failed

Computed Methods

isComplete

public boolean isComplete()

Returns true when all items have been processed (completed + failed >= total).

if (ctx.isComplete()) {
log.info("Batch {} is done", ctx.batchId());
}

percentDone

public int percentDone()

Calculates the completion percentage as (completedItems * 100) / totalItems. Returns 100 for empty batches.

Returns: an integer between 0 and 100.

int pct = ctx.percentDone();
log.info("Batch progress: {}%", pct);

successRate

public double successRate()

Calculates the success rate as completedItems / (completedItems + failedItems). Returns 1.0 if no items have been processed yet.

Returns: a double between 0.0 and 1.0.

if (ctx.successRate() < 0.9) {
log.warn("Batch {} success rate below 90%", ctx.batchId());
}

Usage in Progress Hooks

scheduler.enqueueBatch("Process Orders")
.forEach(orders, order -> processOrder(order))
.onProgress(ctx -> {
log.info("Batch {} progress: {}/{} done, {} failed, {}% complete",
ctx.batchId(),
ctx.completedItems(),
ctx.totalItems(),
ctx.failedItems(),
ctx.percentDone());

if (ctx.successRate() < 0.5 && ctx.completedItems() + ctx.failedItems() > 10) {
log.warn("Success rate dropped below 50%");
}
})
.submit();

Usage in Workflow Conditions

scheduler.enqueueBatch("Data Import")
.forEach(rows, row -> importRow(row))
// Perfect batch -- zero failures
.thenWhenBatch(ctx -> ctx.failedItems() == 0 && ctx.isComplete(),
() -> markImportComplete())
// Acceptable -- at least 95% success
.thenWhenBatch(ctx -> ctx.successRate() >= 0.95 && ctx.isComplete(),
() -> acceptWithWarnings())
// Unacceptable -- more than half failed
.thenWhenBatch(ctx -> ctx.failedItems() > ctx.totalItems() / 2,
() -> rollbackImport())
.submit();

StreamingBatchContext

A Java record providing progress information during the streaming phase -- when items are being read from the source stream and inserted as child jobs in chunks.

public record StreamingBatchContext(
UUID batchId,
int processedItems,
int chunksInserted
)

Record Components

ComponentTypeDescription
batchIdUUIDUUIDv7 identifier of the batch parent job being created
processedItemsintCumulative number of items read from the stream
chunksInsertedintNumber of bulk insert operations performed

Convenience Methods

itemsStreamed

public int itemsStreamed()

Alias for processedItems(). Returns the number of items read from the stream so far.

int streamed = ctx.itemsStreamed(); // same as ctx.processedItems()

insertOperations

public int insertOperations()

Alias for chunksInserted(). Returns the number of bulk insert operations performed.

int ops = ctx.insertOperations(); // same as ctx.chunksInserted()

Usage

scheduler.<Long>streamingBatch("Migrate Users")
.fromStream(userRepository.streamAllUserIds())
.process(userId -> migrationService.migrateUser(userId))
.withChunkSize(500)
.onProgress(ctx -> {
log.info("Streaming: {} items read, {} chunks inserted",
ctx.processedItems(), ctx.chunksInserted());

// Estimate database load
int estimatedRows = ctx.chunksInserted() * 500;
log.debug("Approximately {} child jobs created", estimatedRows);
})
.start();

BatchContext vs StreamingBatchContext

AspectBatchContextStreamingBatchContext
PhaseJob executionStream consumption (job creation)
Total knownYes (totalItems)No (stream not yet exhausted)
TracksCompleted + failed itemsItems streamed + chunks inserted
Success rateAvailable via successRate()N/A (jobs haven't run yet)
CompletionisComplete() availableN/A
Used inonProgress(), workflow conditionsonProgress() on StreamingBatchBuilder

Lifecycle

  1. Stream consumption -- StreamingBatchContext is passed to StreamingBatchBuilder.onProgress() as items are read and chunked.
  2. Job execution -- BatchContext is passed to BatchBuilder.onProgress() (or StreamingBatchBuilder.onBatchProgress()) as child jobs complete.
scheduler.<Long>streamingBatch("Full Pipeline")
.fromStream(largeDataset.stream())
.process(item -> processItem(item))
.withChunkSize(1000)
// Phase 1: Stream consumption progress
.onProgress(ctx -> log.info("Reading: {} items in {} chunks",
ctx.processedItems(), ctx.chunksInserted()))
// Phase 2: Execution progress
.onBatchProgress(ctx -> log.info("Executing: {}% complete, {} failed",
ctx.percentDone(), ctx.failedItems()))
.start();

See Also