Implementing batch processing

Depending on several requirements, including the system capacity and business needs, a processor can process records differently.

For example, for real-time or near-real time processing, it is more interesting to process small batches of data more often. On the other hand, in case of one-time processing, it is more optimal to adapt the way the component handles batches of data according to the system capacity.

By default, the runtime automatically estimates a group size that it considers good, according to the system capacity, to process the data. This group size can sometimes be too big and not optimal for your needs or for your system to handle effectively and correctly.

Users can then customize this size from the component settings in Talend Studio, by specifying a maxBatchSize that adapts the size of each group of data to be processed.

The estimated group size logic is automatically implemented when a component is deployed to a Talend application. Besides defining the @BeforeGroup and @AfterGroup logic detailed below, no action is required on the implementation side of the component.

The component batch processes the data as follows:

  • Case 1 - No maxBatchSize is specified in the component configuration. The runtime estimates a group size of 4. Records are processed by groups of 4.

  • Case 2 - The runtime estimates a group size of 4 but a maxBatchSize of 3 is specified in the component configuration. The system adapts the group size to 3. Records are processed by groups of 3.

Processing schema (values are examples):

Batch processing

Each group is processed as follows until there is no record left:

  1. The @BeforeGroup method resets a record buffer at the beginning of each group.

  2. The records of the group are assessed one by one and placed in the buffer as follows: The @ElementListener method tests if the buffer size is greater or equal to the defined maxBatchSize. If it is, the records are processed. If not, then the current record is buffered.

  3. The previous step happens for all records of the group. Then the @AfterGroup method tests if the buffer is empty.

Group execution detail (values are examples):

Group batch processing

You can define the following logic in the processor configuration:

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;

import javax.json.JsonObject;

import org.talend.sdk.component.api.processor.AfterGroup;
import org.talend.sdk.component.api.processor.BeforeGroup;
import org.talend.sdk.component.api.processor.ElementListener;
import org.talend.sdk.component.api.processor.Processor;

@Processor(name = "BulkOutputDemo")
public class BulkProcessor implements Serializable {
    private Collection<JsonObject> buffer;

    @BeforeGroup
    public void begin() {
        buffer = new ArrayList<>();
    }

    @ElementListener
    public void bufferize(final JsonObject object) {
        buffer.add(object);
    }

    @AfterGroup
    public void commit() {
        // save buffered records at once (bulk)
    }
}

You can also use the condensed syntax for this kind of processor:

@Processor(name = "BulkOutputDemo")
public class BulkProcessor implements Serializable {

    @AfterGroup
    public void commit(final Collection<Record> records) {
        // save records
    }
}
When writing tests for your components, you can force the maxBatchSize parameter value by setting it with the following syntax: <configuration prefix>.$maxBatchSize=10.

You can learn more about processors in this document.

Scroll to top