When configuring a processor, it is a best practice to optimize the way your component processes records by implementing bulking.
By default, the runtime automatically estimates a group size that it considers good, according to the system capacity, to process the data. This group size can sometimes be too big and not optimal for your system to handle effectively and correctly, depending on your requirements. For example, if you need to process small batches of records.
To avoid this, you can use a maxBatchSize
with a value that you choose in order to commit before the end of the group is reached.
You don’t need to implement the maxBatchSize logic in your component configuration. It is automatically implemented when the component is deployed in Talend applications.
|
The batch processing is as follows:

In this diagram, you can see that:
-
The runtime estimates a group size of 4.
-
The component configuration specifies a
maxBatchSize
of 3.
Each group is processed one by one according to the group size estimated by the system:
-
The
@BeforeGroup
method resets a record buffer at the beginning of each group. -
The records of the group are assessed one by one and placed in the buffer as follows: The @ElementListener method tests if the buffer size is greater or equal to the defined
maxBatchSize
. If it is, the records are processed. If not, then the current record is buffered. -
The previous step happens for all records of the group. Then the
@AfterGroup
method tests if the buffer is empty. If there are still some records in the buffer, they are processed.
You can define the following logic in the processor configuration:
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import javax.json.JsonObject;
import org.talend.sdk.component.api.processor.AfterGroup;
import org.talend.sdk.component.api.processor.BeforeGroup;
import org.talend.sdk.component.api.processor.ElementListener;
import org.talend.sdk.component.api.processor.Processor;
@Processor(name = "BulkOutputDemo")
public class BulkProcessor implements Serializable {
private Collection<JsonObject> buffer;
@BeforeGroup
public void begin() {
buffer = new ArrayList<>();
}
@ElementListener
public void bufferize(final JsonObject object) {
buffer.add(object);
}
@AfterGroup
public void commit() {
// save buffer records at once (bulk)
}
}
You can learn more about processors in this document.