Depending on several requirements, including the system capacity and business needs, a processor can process records differently.
For example, for real-time or near-real time processing, it is more interesting to process small batches of data more often. On the other hand, in case of one-time processing, it is more optimal to adapt the way the component handles batches of data according to the system capacity.
By default, the runtime automatically estimates a group size that it considers good, according to the system capacity, to process the data. This group size can sometimes be too big and not optimal for your needs or for your system to handle effectively and correctly.
Users can then customize this size from the component settings in Talend Studio, by specifying a maxBatchSize
that adapts the size of each group of data to be processed.
The estimated group size logic is automatically implemented when a component is deployed to a Talend application. Besides defining the @BeforeGroup and @AfterGroup logic detailed below, no action is required on the implementation side of the component. |
The component batch processes the data as follows:
-
Case 1 - No
maxBatchSize
is specified in the component configuration. The runtime estimates a group size of 4. Records are processed by groups of 4. -
Case 2 - The runtime estimates a group size of 4 but a
maxBatchSize
of 3 is specified in the component configuration. The system adapts the group size to 3. Records are processed by groups of 3.
Each group is processed as follows until there is no record left:
-
The
@BeforeGroup
method resets a record buffer at the beginning of each group. -
The records of the group are assessed one by one and placed in the buffer as follows: The
@ElementListener
method tests if the buffer size is greater or equal to the definedmaxBatchSize
. If it is, the records are processed. If not, then the current record is buffered. -
The previous step happens for all records of the group. Then the
@AfterGroup
method tests if the buffer is empty.
You can define the following logic in the processor configuration:
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import javax.json.JsonObject;
import org.talend.sdk.component.api.processor.AfterGroup;
import org.talend.sdk.component.api.processor.BeforeGroup;
import org.talend.sdk.component.api.processor.ElementListener;
import org.talend.sdk.component.api.processor.Processor;
@Processor(name = "BulkOutputDemo")
public class BulkProcessor implements Serializable {
private Collection<JsonObject> buffer;
@BeforeGroup
public void begin() {
buffer = new ArrayList<>();
}
@ElementListener
public void bufferize(final JsonObject object) {
buffer.add(object);
}
@AfterGroup
public void commit() {
// save buffered records at once (bulk)
}
}
You can also use the condensed syntax for this kind of processor:
@Processor(name = "BulkOutputDemo")
public class BulkProcessor implements Serializable {
@AfterGroup
public void commit(final Collection<Record> records) {
// save records
}
}
You can learn more about processors in this document.